From 17a14f1af2ace0201baa1b5bbba031296e62d879 Mon Sep 17 00:00:00 2001 From: Christopher Davis Date: Wed, 1 Sep 2010 11:04:57 -0700 Subject: [PATCH] Only process up to MAX_DEFERRED deferred_cbs at a time. If threads queue callbacks while event_process_deferred_callbacks is running, the loop may spin long enough to significantly skew timers. A unit test stressing this behavior is also in this commit. --- bufferevent.c | 15 +++---- event.c | 16 +++++--- test/regress_thread.c | 94 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 109 insertions(+), 16 deletions(-) diff --git a/bufferevent.c b/bufferevent.c index 9923bbe0..53b07f1f 100644 --- a/bufferevent.c +++ b/bufferevent.c @@ -206,10 +206,11 @@ bufferevent_run_deferred_callbacks_unlocked(struct deferred_cb *_, void *arg) #define SCHEDULE_DEFERRED(bevp) \ do { \ + bufferevent_incref(&(bevp)->bev); \ event_deferred_cb_schedule( \ event_base_get_deferred_cb_queue((bevp)->bev.ev_base), \ &(bevp)->deferred); \ - } while (0); + } while (0) void @@ -222,10 +223,8 @@ _bufferevent_run_readcb(struct bufferevent *bufev) return; if (p->options & BEV_OPT_DEFER_CALLBACKS) { p->readcb_pending = 1; - if (!p->deferred.queued) { - bufferevent_incref(bufev); + if (!p->deferred.queued) SCHEDULE_DEFERRED(p); - } } else { bufev->readcb(bufev, bufev->cbarg); } @@ -241,10 +240,8 @@ _bufferevent_run_writecb(struct bufferevent *bufev) return; if (p->options & BEV_OPT_DEFER_CALLBACKS) { p->writecb_pending = 1; - if (!p->deferred.queued) { - bufferevent_incref(bufev); + if (!p->deferred.queued) SCHEDULE_DEFERRED(p); - } } else { bufev->writecb(bufev, bufev->cbarg); } @@ -261,10 +258,8 @@ _bufferevent_run_eventcb(struct bufferevent *bufev, short what) if (p->options & BEV_OPT_DEFER_CALLBACKS) { p->eventcb_pending |= what; p->errno_pending = EVUTIL_SOCKET_ERROR(); - if (!p->deferred.queued) { - bufferevent_incref(bufev); + if (!p->deferred.queued) SCHEDULE_DEFERRED(p); - } } else { bufev->errorcb(bufev, what, bufev->cbarg); } diff --git a/event.c b/event.c index 8e8c324b..d71932ce 100644 --- a/event.c +++ b/event.c @@ -1285,9 +1285,10 @@ event_process_active_single_queue(struct event_base *base, } /* - Process all the defered_cb entries in 'queue'. If *breakptr becomes set to - 1, stop. Requires that we start out holding the lock on 'queue'; releases - the lock around 'queue' for each deferred_cb we process. + Process up to MAX_DEFERRED of the defered_cb entries in 'queue'. If + *breakptr becomes set to 1, stop. Requires that we start out holding + the lock on 'queue'; releases the lock around 'queue' for each deferred_cb + we process. */ static int event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr) @@ -1295,6 +1296,7 @@ event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr) int count = 0; struct deferred_cb *cb; +#define MAX_DEFERRED 16 while ((cb = TAILQ_FIRST(&queue->deferred_cb_list))) { cb->queued = 0; TAILQ_REMOVE(&queue->deferred_cb_list, cb, cb_next); @@ -1302,12 +1304,14 @@ event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr) UNLOCK_DEFERRED_QUEUE(queue); cb->cb(cb, cb->arg); - ++count; - if (*breakptr) - return -1; LOCK_DEFERRED_QUEUE(queue); + if (*breakptr) + return -1; + if (++count == MAX_DEFERRED) + break; } +#undef MAX_DEFERRED return count; } diff --git a/test/regress_thread.c b/test/regress_thread.c index 34cf64b1..675e350e 100644 --- a/test/regress_thread.c +++ b/test/regress_thread.c @@ -33,6 +33,9 @@ #include #include #include +#ifndef WIN32 +#include +#endif #ifdef _EVENT_HAVE_PTHREADS #include @@ -46,6 +49,7 @@ #include "event2/event_struct.h" #include "event2/thread.h" #include "evthread-internal.h" +#include "defer-internal.h" #include "regress.h" #include "tinytest_macros.h" @@ -312,12 +316,102 @@ end: ; } +#define CB_COUNT 128 +#define QUEUE_THREAD_COUNT 8 + +#ifdef WIN32 +#define SLEEP_MS(ms) Sleep(ms) +#else +#define SLEEP_MS(ms) usleep((ms) * 1000) +#endif + +struct deferred_test_data { + struct deferred_cb cbs[CB_COUNT]; + struct deferred_cb_queue *queue; +}; + +static time_t timer_start = 0; +static time_t timer_end = 0; +static unsigned callback_count = 0; +static THREAD_T load_threads[QUEUE_THREAD_COUNT]; +static struct deferred_test_data deferred_data[QUEUE_THREAD_COUNT]; + +static void +deferred_callback(struct deferred_cb *cb, void *arg) +{ + SLEEP_MS(1); + callback_count += 1; +} + +static THREAD_FN +load_deferred_queue(void *arg) +{ + struct deferred_test_data *data = arg; + size_t i; + + for (i = 0; i < CB_COUNT; ++i) { + event_deferred_cb_init(&data->cbs[i], deferred_callback, NULL); + event_deferred_cb_schedule(data->queue, &data->cbs[i]); + SLEEP_MS(1); + } + + THREAD_RETURN(); +} + +static void +timer_callback(evutil_socket_t fd, short what, void *arg) +{ + timer_end = time(NULL); +} + +static void +start_threads_callback(evutil_socket_t fd, short what, void *arg) +{ + int i; + + for (i = 0; i < QUEUE_THREAD_COUNT; ++i) { + THREAD_START(load_threads[i], load_deferred_queue, + &deferred_data[i]); + } +} + +static void +thread_deferred_cb_skew(void *arg) +{ + struct basic_test_data *data = arg; + struct timeval tv_timer = {4, 0}; + struct event event_threads; + struct deferred_cb_queue *queue; + int i; + + queue = event_base_get_deferred_cb_queue(data->base); + tt_assert(queue); + + for (i = 0; i < QUEUE_THREAD_COUNT; ++i) + deferred_data[i].queue = queue; + + timer_start = time(NULL); + event_base_once(data->base, -1, EV_TIMEOUT, timer_callback, NULL, + &tv_timer); + event_base_once(data->base, -1, EV_TIMEOUT, start_threads_callback, + NULL, NULL); + event_base_dispatch(data->base); + + TT_BLATHER(("callback count, %u", callback_count)); + tt_int_op(timer_end - timer_start, ==, 4); + +end: + for (i = 0; i < QUEUE_THREAD_COUNT; ++i) + THREAD_JOIN(load_threads[i]); +} + #define TEST(name) \ { #name, thread_##name, TT_FORK|TT_NEED_THREADS|TT_NEED_BASE, \ &basic_setup, NULL } struct testcase_t thread_testcases[] = { TEST(basic), TEST(conditions_simple), + TEST(deferred_cb_skew), END_OF_TESTCASES }; -- 2.40.0