From 1becc4c4e656213d0f71938029a08a2da04dc97f Mon Sep 17 00:00:00 2001 From: Nick Mathewson <nickm@torproject.org> Date: Mon, 13 Apr 2009 03:08:11 +0000 Subject: [PATCH] Refactor new elements of bufferevent into bufferevent_private structure This way we don't expose more of a bufferevent than we need to. One motivation is to make it easier to automatically get deferred callbacks with a bufferevent without exposing the deferred_cb structure. svn:r1169 --- bufferevent-internal.h | 18 ++++++++++++- bufferevent.c | 41 ++++++++++++++++++----------- bufferevent_filter.c | 34 +++++++++++++----------- bufferevent_pair.c | 35 ++++++++++++------------ bufferevent_sock.c | 12 ++++++--- include/event2/bufferevent_struct.h | 7 ----- 6 files changed, 87 insertions(+), 60 deletions(-) diff --git a/bufferevent-internal.h b/bufferevent-internal.h index 1d4b80d1..f369659f 100644 --- a/bufferevent-internal.h +++ b/bufferevent-internal.h @@ -32,6 +32,22 @@ extern "C" { #include "event-config.h" #include "evutil.h" +#include "defer-internal.h" + +struct bufferevent_private { + struct bufferevent bev; + + /** Evbuffer callback to enforce watermarks on input. */ + struct evbuffer_cb_entry *read_watermarks_cb; + + /** If set, read is suspended until evbuffer some. */ + unsigned read_suspended : 1; + + enum bufferevent_options options; + + int refcnt; + void *lock; +}; /** Implementation table for a bufferevent: holds function pointers and other @@ -81,7 +97,7 @@ extern const struct bufferevent_ops bufferevent_ops_filter; extern const struct bufferevent_ops bufferevent_ops_pair; /** Initialize the shared parts of a bufferevent. */ -int bufferevent_init_common(struct bufferevent *, struct event_base *, const struct bufferevent_ops *, enum bufferevent_options options); +int bufferevent_init_common(struct bufferevent_private *, struct event_base *, const struct bufferevent_ops *, enum bufferevent_options options); /** For internal use: temporarily stop all reads on bufev, because its * read buffer is too full. */ diff --git a/bufferevent.c b/bufferevent.c index 53459a29..69b86702 100644 --- a/bufferevent.c +++ b/bufferevent.c @@ -63,17 +63,21 @@ void bufferevent_wm_suspend_read(struct bufferevent *bufev) { - if (!bufev->read_suspended) { + struct bufferevent_private *bufev_private = + EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); + if (!bufev_private->read_suspended) { bufev->be_ops->disable(bufev, EV_READ); - bufev->read_suspended = 1; + bufev_private->read_suspended = 1; } } void bufferevent_wm_unsuspend_read(struct bufferevent *bufev) { - if (bufev->read_suspended) { - bufev->read_suspended = 0; + struct bufferevent_private *bufev_private = + EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); + if (bufev_private->read_suspended) { + bufev_private->read_suspended = 0; if (bufev->enabled & EV_READ) bufev->be_ops->enable(bufev, EV_READ); } @@ -103,10 +107,13 @@ bufferevent_inbuf_wm_cb(struct evbuffer *buf, } int -bufferevent_init_common(struct bufferevent *bufev, struct event_base *base, - const struct bufferevent_ops *ops, - enum bufferevent_options options) +bufferevent_init_common(struct bufferevent_private *bufev_private, + struct event_base *base, + const struct bufferevent_ops *ops, + enum bufferevent_options options) { + struct bufferevent *bufev = &bufev_private->bev; + if ((bufev->input = evbuffer_new()) == NULL) return -1; @@ -130,7 +137,7 @@ bufferevent_init_common(struct bufferevent *bufev, struct event_base *base, */ bufev->enabled = EV_WRITE; - bufev->options = options; + bufev_private->options = options; return 0; } @@ -196,8 +203,10 @@ bufferevent_read_buffer(struct bufferevent *bufev, struct evbuffer *buf) int bufferevent_enable(struct bufferevent *bufev, short event) { + struct bufferevent_private *bufev_private = + EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); short impl_events = event; - if (bufev->read_suspended) + if (bufev_private->read_suspended) impl_events &= ~EV_READ; bufev->enabled |= event; @@ -272,6 +281,8 @@ void bufferevent_setwatermark(struct bufferevent *bufev, short events, size_t lowmark, size_t highmark) { + struct bufferevent_private *bufev_private = + EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); if (events & EV_WRITE) { bufev->wm_write.low = lowmark; @@ -287,14 +298,14 @@ bufferevent_setwatermark(struct bufferevent *bufev, short events, enable the callback if needed, and see if we should suspend/bufferevent_wm_unsuspend. */ - if (bufev->read_watermarks_cb == NULL) { - bufev->read_watermarks_cb = + if (bufev_private->read_watermarks_cb == NULL) { + bufev_private->read_watermarks_cb = evbuffer_add_cb(bufev->input, bufferevent_inbuf_wm_cb, bufev); } evbuffer_cb_set_flags(bufev->input, - bufev->read_watermarks_cb, + bufev_private->read_watermarks_cb, EVBUFFER_CB_ENABLED); if (EVBUFFER_LENGTH(bufev->input) > highmark) @@ -303,10 +314,10 @@ bufferevent_setwatermark(struct bufferevent *bufev, short events, bufferevent_wm_unsuspend_read(bufev); } else { /* There is now no high-water mark for read. */ - if (bufev->read_watermarks_cb) + if (bufev_private->read_watermarks_cb) evbuffer_cb_set_flags(bufev->input, - bufev->read_watermarks_cb, - EVBUFFER_CB_DISABLED); + bufev_private->read_watermarks_cb, + EVBUFFER_CB_DISABLED); bufferevent_wm_unsuspend_read(bufev); } } diff --git a/bufferevent_filter.c b/bufferevent_filter.c index fcd090be..54fd23c8 100644 --- a/bufferevent_filter.c +++ b/bufferevent_filter.c @@ -76,7 +76,7 @@ static void bufferevent_filtered_outbuf_cb(struct evbuffer *buf, const struct evbuffer_cb_info *info, void *arg); struct bufferevent_filtered { - struct bufferevent bev; + struct bufferevent_private bev; /** The bufferevent that we read/write filterd data from/to. */ struct bufferevent *underlying; @@ -116,12 +116,12 @@ upcast(struct bufferevent *bev) if (bev->be_ops != &bufferevent_ops_filter) return NULL; bev_f = (void*)( ((char*)bev) - - evutil_offsetof(struct bufferevent_filtered, bev) ); - assert(bev_f->bev.be_ops == &bufferevent_ops_filter); + evutil_offsetof(struct bufferevent_filtered, bev.bev)); + assert(bev_f->bev.bev.be_ops == &bufferevent_ops_filter); return bev_f; } -#define downcast(bev_f) (&(bev_f)->bev) +#define downcast(bev_f) (&(bev_f)->bev.bev) /** Return 1 iff bevf's underlying bufferevent's output buffer is at or * over its high watermark such that we should not write to it in a given @@ -195,10 +195,10 @@ bufferevent_filter_new(struct bufferevent *underlying, bufferevent_setcb(bufev_f->underlying, be_filter_readcb, be_filter_writecb, be_filter_errorcb, bufev_f); - bufev_f->outbuf_cb = evbuffer_add_cb(bufev_f->bev.output, + bufev_f->outbuf_cb = evbuffer_add_cb(downcast(bufev_f)->output, bufferevent_filtered_outbuf_cb, bufev_f); - return &bufev_f->bev; + return downcast(bufev_f); } static void @@ -209,7 +209,7 @@ be_filter_destruct(struct bufferevent *bev) if (bevf->free_context) bevf->free_context(bevf->context); - if (bev->options & BEV_OPT_CLOSE_ON_FREE) + if (bevf->bev.options & BEV_OPT_CLOSE_ON_FREE) bufferevent_free(bevf->underlying); } @@ -247,28 +247,29 @@ be_filter_process_input(struct bufferevent_filtered *bevf, int *processed_out) { enum bufferevent_filter_result res; + struct bufferevent *bev = downcast(bevf); if (state == BEV_NORMAL) { /* If we're in 'normal' mode, don't urge data on the filter * unless we're reading data and under our high-water mark.*/ - if (!(bevf->bev.enabled & EV_READ) || + if (!(bev->enabled & EV_READ) || be_readbuf_full(bevf, state)) return BEV_OK; } do { ssize_t limit = -1; - if (state == BEV_NORMAL && bevf->bev.wm_read.high) - limit = bevf->bev.wm_read.high - - EVBUFFER_LENGTH(bevf->bev.input); + if (state == BEV_NORMAL && bev->wm_read.high) + limit = bev->wm_read.high - + EVBUFFER_LENGTH(bev->input); res = bevf->process_in(bevf->underlying->input, - bevf->bev.input, limit, state, bevf->context); + bev->input, limit, state, bevf->context); if (res == BEV_OK) *processed_out = 1; } while (res == BEV_OK && - (bevf->bev.enabled & EV_READ) && + (bev->enabled & EV_READ) && EVBUFFER_LENGTH(bevf->underlying->input) && !be_readbuf_full(bevf, state)); @@ -312,7 +313,7 @@ be_filter_process_output(struct bufferevent_filtered *bevf, limit = bevf->underlying->wm_write.high - EVBUFFER_LENGTH(bevf->underlying->output); - res = bevf->process_out(bevf->bev.output, + res = bevf->process_out(downcast(bevf)->output, bevf->underlying->output, limit, state, @@ -405,10 +406,11 @@ static void be_filter_errorcb(struct bufferevent *underlying, short what, void *_me) { struct bufferevent_filtered *bevf = _me; + struct bufferevent *bev = downcast(bevf); /* All we can really to is tell our own errorcb. */ - if (bevf->bev.errorcb) - bevf->bev.errorcb(&bevf->bev, what, bevf->bev.cbarg); + if (bev->errorcb) + bev->errorcb(bev, what, bev->cbarg); } static int diff --git a/bufferevent_pair.c b/bufferevent_pair.c index 2551d3de..6fed0e03 100644 --- a/bufferevent_pair.c +++ b/bufferevent_pair.c @@ -42,7 +42,7 @@ #include "util-internal.h" struct bufferevent_pair { - struct bufferevent bev; + struct bufferevent_private bev; struct bufferevent_pair *partner; struct deferred_cb deferred_write_cb; struct deferred_cb deferred_read_cb; @@ -57,12 +57,12 @@ upcast(struct bufferevent *bev) struct bufferevent_pair *bev_p; if (bev->be_ops != &bufferevent_ops_pair) return NULL; - bev_p = EVUTIL_UPCAST(bev, struct bufferevent_pair, bev); - assert(bev_p->bev.be_ops == &bufferevent_ops_pair); + bev_p = EVUTIL_UPCAST(bev, struct bufferevent_pair, bev.bev); + assert(bev_p->bev.bev.be_ops == &bufferevent_ops_pair); return bev_p; } -#define downcast(bev_pair) (&(bev_pair)->bev) +#define downcast(bev_pair) (&(bev_pair)->bev.bev) /* XXX Handle close */ @@ -100,7 +100,7 @@ bufferevent_pair_elt_new(struct event_base *base, } /* XXX set read timeout event */ /* XXX set write timeout event */ - if (!evbuffer_add_cb(bufev->bev.output, be_pair_outbuf_cb, bufev)) { + if (!evbuffer_add_cb(bufev->bev.bev.output, be_pair_outbuf_cb, bufev)) { bufferevent_free(downcast(bufev)); return NULL; } @@ -128,10 +128,10 @@ bufferevent_pair_new(struct event_base *base, enum bufferevent_options options, bufev1->partner = bufev2; bufev2->partner = bufev1; - evbuffer_freeze(bufev1->bev.input, 0); - evbuffer_freeze(bufev1->bev.output, 1); - evbuffer_freeze(bufev2->bev.input, 0); - evbuffer_freeze(bufev2->bev.output, 1); + evbuffer_freeze(downcast(bufev1)->input, 0); + evbuffer_freeze(downcast(bufev1)->output, 1); + evbuffer_freeze(downcast(bufev2)->input, 0); + evbuffer_freeze(downcast(bufev2)->output, 1); pair[0] = downcast(bufev1); pair[1] = downcast(bufev2); @@ -180,11 +180,13 @@ done: } static inline int -be_pair_wants_to_talk(struct bufferevent *src, struct bufferevent *dst) +be_pair_wants_to_talk(struct bufferevent_pair *src, + struct bufferevent_pair *dst) { - return (src->enabled & EV_WRITE) && - (dst->enabled & EV_READ) && !dst->read_suspended && - evbuffer_get_length(src->output); + return (downcast(src)->enabled & EV_WRITE) && + (downcast(dst)->enabled & EV_READ) && + !dst->bev.read_suspended && + evbuffer_get_length(downcast(src)->output); } static void @@ -197,8 +199,7 @@ be_pair_outbuf_cb(struct evbuffer *outbuf, if (info->n_added > info->n_deleted && partner) { /* We got more data. If the other side's reading, then hand it over. */ - if (be_pair_wants_to_talk(downcast(bev_pair), - downcast(partner))) { + if (be_pair_wants_to_talk(bev_pair, partner)) { be_pair_transfer(downcast(bev_pair), downcast(partner), 0); } } @@ -212,12 +213,12 @@ be_pair_enable(struct bufferevent *bufev, short events) /* We're starting to read! Does the other side have anything to write?*/ if ((events & EV_READ) && partner && - be_pair_wants_to_talk(downcast(partner), bufev)) { + be_pair_wants_to_talk(partner, bev_p)) { be_pair_transfer(downcast(partner), bufev, 0); } /* We're starting to write! Does the other side want to read? */ if ((events & EV_WRITE) && partner && - be_pair_wants_to_talk(bufev, downcast(partner))) { + be_pair_wants_to_talk(bev_p, partner)) { be_pair_transfer(bufev, downcast(partner), 0); } return 0; diff --git a/bufferevent_sock.c b/bufferevent_sock.c index 561bec79..0e5e3a7b 100644 --- a/bufferevent_sock.c +++ b/bufferevent_sock.c @@ -225,16 +225,18 @@ struct bufferevent * bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, enum bufferevent_options options) { + struct bufferevent_private *bufev_p; struct bufferevent *bufev; - if ((bufev = mm_calloc(1, sizeof(struct bufferevent))) == NULL) + if ((bufev_p = mm_calloc(1, sizeof(struct bufferevent_private)))== NULL) return NULL; - if (bufferevent_init_common(bufev, base, &bufferevent_ops_socket, + if (bufferevent_init_common(bufev_p, base, &bufferevent_ops_socket, options) < 0) { - mm_free(bufev); + mm_free(bufev_p); return NULL; } + bufev = &bufev_p->bev; event_assign(&bufev->ev_read, bufev->ev_base, fd, EV_READ|EV_PERSIST, bufferevent_readcb, bufev); @@ -306,6 +308,8 @@ be_socket_disable(struct bufferevent *bufev, short event) static void be_socket_destruct(struct bufferevent *bufev) { + struct bufferevent_private *bufev_p = + EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); evutil_socket_t fd; assert(bufev->be_ops == &bufferevent_ops_socket); @@ -314,7 +318,7 @@ be_socket_destruct(struct bufferevent *bufev) event_del(&bufev->ev_read); event_del(&bufev->ev_write); - if (bufev->options & BEV_OPT_CLOSE_ON_FREE) + if (bufev_p->options & BEV_OPT_CLOSE_ON_FREE) EVUTIL_CLOSESOCKET(fd); } diff --git a/include/event2/bufferevent_struct.h b/include/event2/bufferevent_struct.h index a667e89f..eb4a86e4 100644 --- a/include/event2/bufferevent_struct.h +++ b/include/event2/bufferevent_struct.h @@ -105,16 +105,9 @@ struct bufferevent { struct timeval timeout_read; struct timeval timeout_write; - /** Evbuffer callback to enforce watermarks on input. */ - struct evbuffer_cb_entry *read_watermarks_cb; - /** Events that are currently enabled: currently EV_READ and EV_WRITE are supported. */ short enabled; - /** If set, read is suspended until evbuffer some. */ - unsigned read_suspended : 1; /* */ - - enum bufferevent_options options; }; #ifdef __cplusplus -- 2.40.0