From: Nick Mathewson Date: Mon, 25 May 2009 23:10:23 +0000 (+0000) Subject: Add a generic mechanism to implement timeouts in bufferevents. X-Git-Tag: release-2.0.3-alpha~209 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=34574db0f8bad98f59e66e62b94a2abec6211d5f;p=libevent Add a generic mechanism to implement timeouts in bufferevents. Paired and asynchronous bufferevents didn't do timeouts, and filtering bufferevents gave them funny semantics. Now they all should all work in a way consistent with what socket bufferevents do now: a [read/write] timeout triggers if [reading/writing] is enabled, and if the timeout is set, and the right amount of time passes without any data getting [added to the input buffer/drained from the output buffer]. svn:r1314 --- diff --git a/bufferevent-internal.h b/bufferevent-internal.h index 7ea92172..547d313c 100644 --- a/bufferevent-internal.h +++ b/bufferevent-internal.h @@ -139,6 +139,38 @@ void _bufferevent_run_readcb(struct bufferevent *bufev); void _bufferevent_run_writecb(struct bufferevent *bufev); void _bufferevent_run_errorcb(struct bufferevent *bufev, short what); +/* ========= + * These next functions implement timeouts for bufferevents that aren't doing + * anything else with ev_read and ev_write, to handle timeouts. + * ========= */ +/** Internal use: Set up the ev_read and ev_write callbacks so that + * the other "generic_timeout" functions will work on it. Call this from + * the constuctor function. */ +void _bufferevent_init_generic_timeout_cbs(struct bufferevent *bev); +/** Internal use: Delete the ev_read and ev_write callbacks if they're pending. + * Call thiss from the destructor function. */ +void _bufferevent_del_generic_timeout_cbs(struct bufferevent *bev); +/** Internal use: Add or delete the generic timeout events as appropriate. + * (If an event is enabled and a timeout is set, we add the event. Otherwise + * we delete it.) Call this from anything that changes the timeout values, + * that enabled EV_READ or EV_WRITE, or that disables EV_READ or EV_WRITE. */ +void _bufferevent_generic_adj_timeouts(struct bufferevent *bev); + +/** Internal use: We have just successfully read data into an inbuf, so + * reset the read timout (if any). */ +#define BEV_RESET_GENERIC_READ_TIMEOUT(bev) \ + do { \ + if (evutil_timerisset(&(bev)->timeout_read)) \ + event_add(&(bev)->ev_read, &(bev)->timeout_read); \ + } while (0) +/** Internal use: We have just successfully written data from an inbuf, so + * reset the read timout (if any). */ +#define BEV_RESET_GENERIC_WRITE_TIMEOUT(bev) \ + do { \ + if (evutil_timerisset(&(bev)->timeout_write)) \ + event_add(&(bev)->ev_write, &(bev)->timeout_write); \ + } while (0) + #define BEV_UPCAST(b) EVUTIL_UPCAST((b), struct bufferevent_private, bev) #define BEV_LOCK(b) do { \ diff --git a/bufferevent.c b/bufferevent.c index 81c5e02c..94a22f6f 100644 --- a/bufferevent.c +++ b/bufferevent.c @@ -569,3 +569,48 @@ bufferevent_get_underlying(struct bufferevent *bev) BEV_UNLOCK(bev); return (res<0) ? NULL : d.ptr; } + +static void +bufferevent_generic_read_timeout_cb(evutil_socket_t fd, short event, void *ctx) +{ + struct bufferevent *bev = ctx; + _bufferevent_run_errorcb(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_READING); +} +static void +bufferevent_generic_write_timeout_cb(evutil_socket_t fd, short event, void *ctx) +{ + struct bufferevent *bev = ctx; + _bufferevent_run_errorcb(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING); +} + +void +_bufferevent_init_generic_timeout_cbs(struct bufferevent *bev) +{ + evtimer_assign(&bev->ev_read, bev->ev_base, + bufferevent_generic_read_timeout_cb, bev); + evtimer_assign(&bev->ev_read, bev->ev_base, + bufferevent_generic_write_timeout_cb, bev); +} + +void +_bufferevent_del_generic_timeout_cbs(struct bufferevent *bev) +{ + event_del(&bev->ev_read); + event_del(&bev->ev_write); +} + +void +_bufferevent_generic_adj_timeouts(struct bufferevent *bev) +{ + const short enabled = bev->enabled; + if ((enabled & EV_READ) && evutil_timerisset(&bev->timeout_read)) + event_add(&bev->ev_read, &bev->timeout_read); + else + event_del(&bev->ev_read); + + if ((enabled & EV_WRITE) && evutil_timerisset(&bev->timeout_write)) + event_add(&bev->ev_write, &bev->timeout_write); + else + event_del(&bev->ev_write); +} + diff --git a/bufferevent_async.c b/bufferevent_async.c index 878a8a0f..ea7ec29e 100644 --- a/bufferevent_async.c +++ b/bufferevent_async.c @@ -63,7 +63,6 @@ static int be_async_enable(struct bufferevent *, short); static int be_async_disable(struct bufferevent *, short); static void be_async_destruct(struct bufferevent *); -static void be_async_adj_timeouts(struct bufferevent *); static int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode); static int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *); @@ -73,7 +72,7 @@ const struct bufferevent_ops bufferevent_ops_async = { be_async_enable, be_async_disable, be_async_destruct, - be_async_adj_timeouts, + _bufferevent_generic_adj_timeouts, be_async_flush, be_async_ctrl, }; @@ -162,10 +161,13 @@ be_async_outbuf_callback(struct evbuffer *buf, if (cbinfo->n_added || cbinfo->n_deleted) bev_async_consider_writing(bev_async); - if (cbinfo->n_deleted && - bev->writecb != NULL && - evbuffer_get_length(bev->output) <= bev->wm_write.low) - _bufferevent_run_writecb(bev); + if (cbinfo->n_deleted) { + BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); + + if (bev->writecb != NULL && + evbuffer_get_length(bev->output) <= bev->wm_write.low) + _bufferevent_run_writecb(bev); + } BEV_UNLOCK(bev); } @@ -190,10 +192,13 @@ be_async_inbuf_callback(struct evbuffer *buf, if (cbinfo->n_added || cbinfo->n_deleted) bev_async_consider_reading(bev_async); - if (cbinfo->n_added && - evbuffer_get_length(bev->input) >= bev->wm_read.low && - bev->readcb != NULL) - _bufferevent_run_readcb(bev); + if (cbinfo->n_added) { + BEV_RESET_GENERIC_READ_TIMEOUT(bev); + + if (evbuffer_get_length(bev->input) >= bev->wm_read.low && + bev->readcb != NULL) + _bufferevent_run_readcb(bev); + } BEV_UNLOCK(bev); } @@ -203,6 +208,8 @@ be_async_enable(struct bufferevent *buf, short what) { struct bufferevent_async *bev_async = upcast(buf); + _bufferevent_generic_adj_timeouts(buf); + /* If we newly enable reading or writing, and we aren't reading or writing already, consider launching a new read or write. */ @@ -219,17 +226,18 @@ be_async_disable(struct bufferevent *bev, short what) /* XXXX If we disable reading or writing, we may want to consider * canceling any in-progress read or write operation, though it might * not work. */ + + _bufferevent_generic_adj_timeouts(bev); + return 0; } static void be_async_destruct(struct bufferevent *bev) { + _bufferevent_del_generic_timeout_cbs(bev); } -static void -be_async_adj_timeouts(struct bufferevent *bev) -{ -} + static int be_async_flush(struct bufferevent *bev, short what, enum bufferevent_flush_mode mode) @@ -281,6 +289,8 @@ bufferevent_async_new(struct event_base *base, evbuffer_defer_callbacks(bev->input, base); evbuffer_defer_callbacks(bev->output, base); + _bufferevent_init_generic_timeout_cbs(&bev_a->bev.bev); + return bev; err: bufferevent_free(&bev_a->bev.bev); diff --git a/bufferevent_filter.c b/bufferevent_filter.c index 3b51e500..585b3c25 100644 --- a/bufferevent_filter.c +++ b/bufferevent_filter.c @@ -63,7 +63,6 @@ static int be_filter_enable(struct bufferevent *, short); static int be_filter_disable(struct bufferevent *, short); static void be_filter_destruct(struct bufferevent *); -static void be_filter_adj_timeouts(struct bufferevent *); static void be_filter_readcb(struct bufferevent *, void *); static void be_filter_writecb(struct bufferevent *, void *); @@ -103,7 +102,7 @@ const struct bufferevent_ops bufferevent_ops_filter = { be_filter_enable, be_filter_disable, be_filter_destruct, - be_filter_adj_timeouts, + _bufferevent_generic_adj_timeouts, be_filter_flush, be_filter_ctrl, }; @@ -208,6 +207,8 @@ bufferevent_filter_new(struct bufferevent *underlying, bufev_f->outbuf_cb = evbuffer_add_cb(downcast(bufev_f)->output, bufferevent_filtered_outbuf_cb, bufev_f); + _bufferevent_init_generic_timeout_cbs(downcast(bufev_f)); + return downcast(bufev_f); } @@ -221,12 +222,15 @@ be_filter_destruct(struct bufferevent *bev) if (bevf->bev.options & BEV_OPT_CLOSE_ON_FREE) bufferevent_free(bevf->underlying); + + _bufferevent_del_generic_timeout_cbs(bev); } static int be_filter_enable(struct bufferevent *bev, short event) { struct bufferevent_filtered *bevf = upcast(bev); + _bufferevent_generic_adj_timeouts(bev); return bufferevent_enable(bevf->underlying, event); } @@ -234,23 +238,10 @@ static int be_filter_disable(struct bufferevent *bev, short event) { struct bufferevent_filtered *bevf = upcast(bev); + _bufferevent_generic_adj_timeouts(bev); return bufferevent_disable(bevf->underlying, event); } -static void -be_filter_adj_timeouts(struct bufferevent *bev) -{ - struct bufferevent_filtered *bevf = upcast(bev); - struct timeval *r = NULL, *w = NULL; - - if (bev->timeout_read.tv_sec >= 0) - r = &bev->timeout_read; - if (bev->timeout_write.tv_sec >= 0) - w = &bev->timeout_write; - - bufferevent_set_timeouts(bevf->underlying, r, w); -} - static enum bufferevent_filter_result be_filter_process_input(struct bufferevent_filtered *bevf, enum bufferevent_flush_mode state, @@ -283,6 +274,9 @@ be_filter_process_input(struct bufferevent_filtered *bevf, evbuffer_get_length(bevf->underlying->input) && !be_readbuf_full(bevf, state)); + if (*processed_out) + BEV_RESET_GENERIC_READ_TIMEOUT(bev); + return res; } @@ -359,6 +353,9 @@ be_filter_process_output(struct bufferevent_filtered *bevf, evbuffer_cb_set_flags(bufev->output,bevf->outbuf_cb, EVBUFFER_CB_ENABLED); + if (*processed_out) + BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev); + return res; } @@ -395,8 +392,8 @@ be_filter_readcb(struct bufferevent *underlying, void *_me) res = be_filter_process_input(bevf, state, &processed_any); if (processed_any && - evbuffer_get_length(bufev->input) >= bufev->wm_read.low && - bufev->readcb != NULL) + evbuffer_get_length(bufev->input) >= bufev->wm_read.low && + bufev->readcb != NULL) _bufferevent_run_readcb(bufev); } diff --git a/bufferevent_pair.c b/bufferevent_pair.c index e6924046..a061cb8c 100644 --- a/bufferevent_pair.c +++ b/bufferevent_pair.c @@ -83,13 +83,13 @@ bufferevent_pair_elt_new(struct event_base *base, mm_free(bufev); return NULL; } - /* XXX set read timeout event */ - /* XXX set write timeout event */ if (!evbuffer_add_cb(bufev->bev.bev.output, be_pair_outbuf_cb, bufev)) { bufferevent_free(downcast(bufev)); return NULL; } + _bufferevent_init_generic_timeout_cbs(&bufev->bev.bev); + return bufev; } @@ -155,6 +155,9 @@ be_pair_transfer(struct bufferevent *src, struct bufferevent *dst, evbuffer_add_buffer(dst->input, src->output); } + BEV_RESET_GENERIC_READ_TIMEOUT(dst); + BEV_RESET_GENERIC_WRITE_TIMEOUT(dst); + src_size = evbuffer_get_length(src->output); dst_size = evbuffer_get_length(dst->input); @@ -201,6 +204,8 @@ be_pair_enable(struct bufferevent *bufev, short events) struct bufferevent_pair *bev_p = upcast(bufev); struct bufferevent_pair *partner = bev_p->partner; + _bufferevent_generic_adj_timeouts(bufev); + /* We're starting to read! Does the other side have anything to write?*/ if ((events & EV_READ) && partner && be_pair_wants_to_talk(partner, bev_p)) { @@ -217,6 +222,7 @@ be_pair_enable(struct bufferevent *bufev, short events) static int be_pair_disable(struct bufferevent *bev, short events) { + _bufferevent_generic_adj_timeouts(bev); return 0; } @@ -229,12 +235,8 @@ be_pair_destruct(struct bufferevent *bev) bev_p->partner->partner = NULL; bev_p->partner = NULL; } -} -static void -be_pair_adj_timeouts(struct bufferevent *bev) -{ - /* TODO: implement. */ + _bufferevent_del_generic_timeout_cbs(bev); } static int @@ -271,7 +273,7 @@ const struct bufferevent_ops bufferevent_ops_pair = { be_pair_enable, be_pair_disable, be_pair_destruct, - be_pair_adj_timeouts, + _bufferevent_generic_adj_timeouts, be_pair_flush, NULL, /* ctrl */ };