From 61ee18b8b1d2ac0025955b3f949531c712fb7527 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Ond=C5=99ej=20Kuzn=C3=ADk?= Date: Tue, 3 Dec 2013 22:49:57 +0000 Subject: [PATCH] Add an option to trigger bufferevent I/O callbacks --- bufferevent-internal.h | 10 ++- bufferevent.c | 55 ++++++++++----- bufferevent_async.c | 7 +- bufferevent_filter.c | 10 ++- bufferevent_openssl.c | 13 +--- bufferevent_pair.c | 13 +--- bufferevent_sock.c | 8 +-- include/event2/bufferevent.h | 26 +++++++ test/regress_bufferevent.c | 127 +++++++++++++++++++++++++++++++++++ 9 files changed, 213 insertions(+), 56 deletions(-) diff --git a/bufferevent-internal.h b/bufferevent-internal.h index 0c4df871..0fa690b9 100644 --- a/bufferevent-internal.h +++ b/bufferevent-internal.h @@ -341,14 +341,20 @@ int bufferevent_decref_and_unlock_(struct bufferevent *bufev); /** Internal: If callbacks are deferred and we have a read callback, schedule * a readcb. Otherwise just run the readcb. */ -void bufferevent_run_readcb_(struct bufferevent *bufev); +void bufferevent_run_readcb_(struct bufferevent *bufev, int options); /** Internal: If callbacks are deferred and we have a write callback, schedule * a writecb. Otherwise just run the writecb. */ -void bufferevent_run_writecb_(struct bufferevent *bufev); +void bufferevent_run_writecb_(struct bufferevent *bufev, int options); /** Internal: If callbacks are deferred and we have an eventcb, schedule * it to run with events "what". Otherwise just run the eventcb. */ void bufferevent_run_eventcb_(struct bufferevent *bufev, short what); +/** Internal: Run or schedule (if deferred or options contain + * BEV_TRIG_DEFER_CALLBACKS) I/O callbacks specified in iotype. + * Must already hold the bufev lock. */ +void bufferevent_trigger_nolock_(struct bufferevent *bufev, short iotype, int options); + + /** Internal: Add the event 'ev' with timeout tv, unless tv is set to 0, in * which case add ev with no timeout. */ int bufferevent_add_event_(struct event *ev, const struct timeval *tv); diff --git a/bufferevent.c b/bufferevent.c index 96ce109f..fd95941b 100644 --- a/bufferevent.c +++ b/bufferevent.c @@ -219,14 +219,15 @@ bufferevent_run_deferred_callbacks_unlocked(struct event_callback *cb, void *arg void -bufferevent_run_readcb_(struct bufferevent *bufev) +bufferevent_run_readcb_(struct bufferevent *bufev, int options) { /* Requires that we hold the lock and a reference */ struct bufferevent_private *p = EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); if (bufev->readcb == NULL) return; - if (p->options & BEV_OPT_DEFER_CALLBACKS) { + if ((p->options & BEV_OPT_DEFER_CALLBACKS) || + (options & BEV_TRIG_DEFER_CALLBACKS)) { p->readcb_pending = 1; SCHEDULE_DEFERRED(p); } else { @@ -235,14 +236,15 @@ bufferevent_run_readcb_(struct bufferevent *bufev) } void -bufferevent_run_writecb_(struct bufferevent *bufev) +bufferevent_run_writecb_(struct bufferevent *bufev, int options) { /* Requires that we hold the lock and a reference */ struct bufferevent_private *p = EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); if (bufev->writecb == NULL) return; - if (p->options & BEV_OPT_DEFER_CALLBACKS) { + if ((p->options & BEV_OPT_DEFER_CALLBACKS) || + (options & BEV_TRIG_DEFER_CALLBACKS)) { p->writecb_pending = 1; SCHEDULE_DEFERRED(p); } else { @@ -250,6 +252,25 @@ bufferevent_run_writecb_(struct bufferevent *bufev) } } +void +bufferevent_trigger_nolock_(struct bufferevent *bufev, short iotype, int options) +{ + if ((iotype & EV_READ) && ((options & BEV_TRIG_IGNORE_WATERMARKS) || + evbuffer_get_length(bufev->input) >= bufev->wm_read.low)) + bufferevent_run_readcb_(bufev, options); + if ((iotype & EV_WRITE) && ((options & BEV_TRIG_IGNORE_WATERMARKS) || + evbuffer_get_length(bufev->output) <= bufev->wm_write.low)) + bufferevent_run_writecb_(bufev, options); +} + +void +bufferevent_trigger(struct bufferevent *bufev, short iotype, int options) +{ + bufferevent_incref_and_lock_(bufev); + bufferevent_trigger_nolock_(bufev, iotype, options); + bufferevent_decref_and_unlock_(bufev); +} + void bufferevent_run_eventcb_(struct bufferevent *bufev, short what) { @@ -322,20 +343,18 @@ bufferevent_init_common_(struct bufferevent_private *bufev_private, event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS"); return -1; } - if (options & BEV_OPT_DEFER_CALLBACKS) { - if (options & BEV_OPT_UNLOCK_CALLBACKS) - event_deferred_cb_init_( - &bufev_private->deferred, - event_base_get_npriorities(base) / 2, - bufferevent_run_deferred_callbacks_unlocked, - bufev_private); - else - event_deferred_cb_init_( - &bufev_private->deferred, - event_base_get_npriorities(base) / 2, - bufferevent_run_deferred_callbacks_locked, - bufev_private); - } + if (options & BEV_OPT_UNLOCK_CALLBACKS) + event_deferred_cb_init_( + &bufev_private->deferred, + event_base_get_npriorities(base) / 2, + bufferevent_run_deferred_callbacks_unlocked, + bufev_private); + else + event_deferred_cb_init_( + &bufev_private->deferred, + event_base_get_npriorities(base) / 2, + bufferevent_run_deferred_callbacks_locked, + bufev_private); bufev_private->options = options; diff --git a/bufferevent_async.c b/bufferevent_async.c index 0152fd16..4e686479 100644 --- a/bufferevent_async.c +++ b/bufferevent_async.c @@ -458,8 +458,7 @@ read_complete(struct event_overlapped *eo, ev_uintptr_t key, if (bev_a->ok) { if (ok && nbytes) { BEV_RESET_GENERIC_READ_TIMEOUT(bev); - if (evbuffer_get_length(bev->input) >= bev->wm_read.low) - bufferevent_run_readcb_(bev); + bufferevent_trigger_nolock_(bev, EV_READ, 0); bev_async_consider_reading(bev_a); } else if (!ok) { what |= BEV_EVENT_ERROR; @@ -502,9 +501,7 @@ write_complete(struct event_overlapped *eo, ev_uintptr_t key, if (bev_a->ok) { if (ok && nbytes) { BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); - if (evbuffer_get_length(bev->output) <= - bev->wm_write.low) - bufferevent_run_writecb_(bev); + bufferevent_trigger_nolock_(bev, EV_WRITE, 0); bev_async_consider_writing(bev_a); } else if (!ok) { what |= BEV_EVENT_ERROR; diff --git a/bufferevent_filter.c b/bufferevent_filter.c index cc02230c..cb1c0097 100644 --- a/bufferevent_filter.c +++ b/bufferevent_filter.c @@ -376,10 +376,9 @@ be_filter_process_output(struct bufferevent_filtered *bevf, /* Or if we have filled the underlying output buffer. */ !be_underlying_writebuf_full(bevf,state)); - if (processed && - evbuffer_get_length(bufev->output) <= bufev->wm_write.low) { + if (processed) { /* call the write callback.*/ - bufferevent_run_writecb_(bufev); + bufferevent_trigger_nolock_(bufev, EV_WRITE, 0); if (res == BEV_OK && (bufev->enabled & EV_WRITE) && @@ -442,9 +441,8 @@ be_filter_readcb(struct bufferevent *underlying, void *me_) /* XXX This should be in process_input, not here. There are * other places that can call process-input, and they should * force readcb calls as needed. */ - if (processed_any && - evbuffer_get_length(bufev->input) >= bufev->wm_read.low) - bufferevent_run_readcb_(bufev); + if (processed_any) + bufferevent_trigger_nolock_(bufev, EV_READ, 0); bufferevent_decref_and_unlock_(bufev); } diff --git a/bufferevent_openssl.c b/bufferevent_openssl.c index 1ce124f9..ed9e4a3d 100644 --- a/bufferevent_openssl.c +++ b/bufferevent_openssl.c @@ -709,8 +709,7 @@ do_write(struct bufferevent_openssl *bev_ssl, int atmost) if (bev_ssl->underlying) BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); - if (evbuffer_get_length(output) <= bev->wm_write.low) - bufferevent_run_writecb_(bev); + bufferevent_trigger_nolock_(bev, EV_WRITE, 0); } return result; } @@ -824,11 +823,8 @@ consider_reading(struct bufferevent_openssl *bev_ssl) if (all_result_flags & OP_MADE_PROGRESS) { struct bufferevent *bev = &bev_ssl->bev.bev; - struct evbuffer *input = bev->input; - if (evbuffer_get_length(input) >= bev->wm_read.low) { - bufferevent_run_readcb_(bev); - } + bufferevent_trigger_nolock_(bev, EV_READ, 0); } if (!bev_ssl->underlying) { @@ -852,11 +848,8 @@ consider_writing(struct bufferevent_openssl *bev_ssl) r = do_read(bev_ssl, 1024); /* XXXX 1024 is a hack */ if (r & OP_MADE_PROGRESS) { struct bufferevent *bev = &bev_ssl->bev.bev; - struct evbuffer *input = bev->input; - if (evbuffer_get_length(input) >= bev->wm_read.low) { - bufferevent_run_readcb_(bev); - } + bufferevent_trigger_nolock_(bev, EV_READ, 0); } if (r & (OP_ERR|OP_BLOCKED)) break; diff --git a/bufferevent_pair.c b/bufferevent_pair.c index 4d467260..eb3da3e3 100644 --- a/bufferevent_pair.c +++ b/bufferevent_pair.c @@ -151,7 +151,7 @@ static void be_pair_transfer(struct bufferevent *src, struct bufferevent *dst, int ignore_wm) { - size_t src_size, dst_size; + size_t dst_size; size_t n; evbuffer_unfreeze(src->output, 1); @@ -182,15 +182,8 @@ be_pair_transfer(struct bufferevent *src, struct bufferevent *dst, BEV_DEL_GENERIC_WRITE_TIMEOUT(dst); } - src_size = evbuffer_get_length(src->output); - dst_size = evbuffer_get_length(dst->input); - - if (dst_size >= dst->wm_read.low) { - bufferevent_run_readcb_(dst); - } - if (src_size <= src->wm_write.low) { - bufferevent_run_writecb_(src); - } + bufferevent_trigger_nolock_(dst, EV_READ, 0); + bufferevent_trigger_nolock_(src, EV_WRITE, 0); done: evbuffer_freeze(src->output, 1); evbuffer_freeze(dst->input, 0); diff --git a/bufferevent_sock.c b/bufferevent_sock.c index 5ce4953b..82983ed7 100644 --- a/bufferevent_sock.c +++ b/bufferevent_sock.c @@ -184,8 +184,7 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg) bufferevent_decrement_read_buckets_(bufev_p, res); /* Invoke the user callback - must always be called last */ - if (evbuffer_get_length(input) >= bufev->wm_read.low) - bufferevent_run_readcb_(bufev); + bufferevent_trigger_nolock_(bufev, EV_READ, 0); goto done; @@ -294,9 +293,8 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg) * Invoke the user callback if our buffer is drained or below the * low watermark. */ - if ((res || !connected) && - evbuffer_get_length(bufev->output) <= bufev->wm_write.low) { - bufferevent_run_writecb_(bufev); + if (res || !connected) { + bufferevent_trigger_nolock_(bufev, EV_WRITE, 0); } goto done; diff --git a/include/event2/bufferevent.h b/include/event2/bufferevent.h index efe0617b..af6f7cde 100644 --- a/include/event2/bufferevent.h +++ b/include/event2/bufferevent.h @@ -558,6 +558,32 @@ int bufferevent_flush(struct bufferevent *bufev, short iotype, enum bufferevent_flush_mode mode); +/** + Flags for bufferevent_trigger(_event) that modify when and how to trigger + the callback. +*/ +enum bufferevent_trigger_options { + /** trigger the callback regardless of the watermarks */ + BEV_TRIG_IGNORE_WATERMARKS = (1<<0), + + /** defer even if the callbacks are not */ + BEV_TRIG_DEFER_CALLBACKS = (1<<1), +}; + +/** + Triggers bufferevent data callbacks. + + The function will honor watermarks unless options contain + BEV_TRIG_IGNORE_WATERMARKS. If the options contain BEV_OPT_DEFER_CALLBACKS, + the callbacks are deferred. + + @param bufev the bufferevent object + @param iotype either EV_READ or EV_WRITE or both. + @param options + */ +void bufferevent_trigger(struct bufferevent *bufev, short iotype, + int options); + /** @name Filtering support diff --git a/test/regress_bufferevent.c b/test/regress_bufferevent.c index 89c405bf..874d6018 100644 --- a/test/regress_bufferevent.c +++ b/test/regress_bufferevent.c @@ -447,6 +447,7 @@ sender_errorcb(struct bufferevent *bev, short what, void *ctx) } static int bufferevent_connect_test_flags = 0; +static int bufferevent_trigger_test_flags = 0; static int n_strings_read = 0; static int n_reads_invoked = 0; @@ -812,6 +813,122 @@ end: bufferevent_free(bev2); } +static void +trigger_failure_cb(evutil_socket_t fd, short what, void *ctx) +{ + TT_FAIL(("The triggered callback did not fire or the machine is really slow (try increasing timeout).")); +} + +static void +trigger_readcb_triggered(struct bufferevent *bev, void *ctx) +{ + struct event_base *base = ctx; + + TT_BLATHER(("Read successfully triggered.")); + n_reads_invoked++; + event_base_loopexit(base, NULL); +} + +static void +trigger_readcb(struct bufferevent *bev, void *ctx) +{ + struct timeval timeout = { 30, 0 }; + struct event_base *base = ctx; + size_t low, high, len; + int expected_reads; + + TT_BLATHER(("Read invoked on %d.", (int)bufferevent_getfd(bev))); + expected_reads = ++n_reads_invoked; + + bufferevent_setcb(bev, trigger_readcb_triggered, NULL, reader_eventcb, ctx); + + bufferevent_getwatermark(bev, EV_READ, &low, &high); + len = evbuffer_get_length(bufferevent_get_input(bev)); + + bufferevent_setwatermark(bev, EV_READ, len + 1, 0); + bufferevent_trigger(bev, EV_READ, bufferevent_trigger_test_flags); + /* no callback expected */ + tt_int_op(n_reads_invoked, ==, expected_reads); + + if ((bufferevent_trigger_test_flags & BEV_TRIG_DEFER_CALLBACKS) || + (bufferevent_connect_test_flags & BEV_OPT_DEFER_CALLBACKS)) { + /* will be deferred */ + } else { + expected_reads++; + } + + event_base_once(base, -1, EV_TIMEOUT, trigger_failure_cb, NULL, &timeout); + + bufferevent_trigger(bev, EV_READ, + bufferevent_trigger_test_flags | BEV_TRIG_IGNORE_WATERMARKS); + tt_int_op(n_reads_invoked, ==, expected_reads); + + bufferevent_setwatermark(bev, EV_READ, low, high); +end: + ; +} + +static void +test_bufferevent_trigger(void *arg) +{ + struct basic_test_data *data = arg; + struct evconnlistener *lev=NULL; + struct bufferevent *bev=NULL; + struct sockaddr_in localhost; + struct sockaddr_storage ss; + struct sockaddr *sa; + ev_socklen_t slen; + + int be_flags=BEV_OPT_CLOSE_ON_FREE; + int trig_flags=0; + + if (strstr((char*)data->setup_data, "defer")) { + be_flags |= BEV_OPT_DEFER_CALLBACKS; + } + bufferevent_connect_test_flags = be_flags; + + if (strstr((char*)data->setup_data, "postpone")) { + trig_flags |= BEV_TRIG_DEFER_CALLBACKS; + } + bufferevent_trigger_test_flags = trig_flags; + + memset(&localhost, 0, sizeof(localhost)); + + localhost.sin_port = 0; /* pick-a-port */ + localhost.sin_addr.s_addr = htonl(0x7f000001L); + localhost.sin_family = AF_INET; + sa = (struct sockaddr *)&localhost; + lev = evconnlistener_new_bind(data->base, listen_cb, data->base, + LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE, + 16, sa, sizeof(localhost)); + tt_assert(lev); + + sa = (struct sockaddr *)&ss; + slen = sizeof(ss); + if (regress_get_listener_addr(lev, sa, &slen) < 0) { + tt_abort_perror("getsockname"); + } + + tt_assert(!evconnlistener_enable(lev)); + bev = bufferevent_socket_new(data->base, -1, be_flags); + tt_assert(bev); + bufferevent_setcb(bev, trigger_readcb, NULL, reader_eventcb, data->base); + + bufferevent_enable(bev, EV_READ); + + tt_want(!bufferevent_socket_connect(bev, sa, sizeof(localhost))); + + event_base_dispatch(data->base); + + tt_int_op(n_reads_invoked, ==, 2); +end: + if (lev) + evconnlistener_free(lev); + + if (bev) + bufferevent_free(bev); +} + struct testcase_t bufferevent_testcases[] = { LEGACY(bufferevent, TT_ISOLATED), @@ -842,6 +959,16 @@ struct testcase_t bufferevent_testcases[] = { TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"filter" }, { "bufferevent_timeout_filter_pair", test_bufferevent_timeouts, TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"filter pair" }, + { "bufferevent_trigger", test_bufferevent_trigger, TT_FORK|TT_NEED_BASE, + &basic_setup, (void*)"" }, + { "bufferevent_trigger_defer", test_bufferevent_trigger, + TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"defer" }, + { "bufferevent_trigger_postpone", test_bufferevent_trigger, + TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup, + (void*)"postpone" }, + { "bufferevent_trigger_defer_postpone", test_bufferevent_trigger, + TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup, + (void*)"defer postpone" }, #ifdef EVENT__HAVE_LIBZ LEGACY(bufferevent_zlib, TT_ISOLATED), #else -- 2.40.0