]> granicus.if.org Git - libevent/commitdiff
Only process up to MAX_DEFERRED deferred_cbs at a time.
authorChristopher Davis <chrisd@torproject.org>
Wed, 1 Sep 2010 18:04:57 +0000 (11:04 -0700)
committerChristopher Davis <chrisd@torproject.org>
Wed, 8 Sep 2010 08:22:22 +0000 (01:22 -0700)
If threads queue callbacks while event_process_deferred_callbacks is
running, the loop may spin long enough to significantly skew timers.
A unit test stressing this behavior is also in this commit.

bufferevent.c
event.c
test/regress_thread.c

index 9923bbe0dab4019a769c3f473276d0fec9a638b9..53b07f1f975a23b89891d6d6e79847d5d03c45a7 100644 (file)
@@ -206,10 +206,11 @@ bufferevent_run_deferred_callbacks_unlocked(struct deferred_cb *_, void *arg)
 
 #define SCHEDULE_DEFERRED(bevp)                                                \
        do {                                                            \
+               bufferevent_incref(&(bevp)->bev);                       \
                event_deferred_cb_schedule(                             \
                        event_base_get_deferred_cb_queue((bevp)->bev.ev_base), \
                        &(bevp)->deferred);                             \
-       } while (0);
+       } while (0)
 
 
 void
@@ -222,10 +223,8 @@ _bufferevent_run_readcb(struct bufferevent *bufev)
                return;
        if (p->options & BEV_OPT_DEFER_CALLBACKS) {
                p->readcb_pending = 1;
-               if (!p->deferred.queued) {
-                       bufferevent_incref(bufev);
+               if (!p->deferred.queued)
                        SCHEDULE_DEFERRED(p);
-               }
        } else {
                bufev->readcb(bufev, bufev->cbarg);
        }
@@ -241,10 +240,8 @@ _bufferevent_run_writecb(struct bufferevent *bufev)
                return;
        if (p->options & BEV_OPT_DEFER_CALLBACKS) {
                p->writecb_pending = 1;
-               if (!p->deferred.queued) {
-                       bufferevent_incref(bufev);
+               if (!p->deferred.queued)
                        SCHEDULE_DEFERRED(p);
-               }
        } else {
                bufev->writecb(bufev, bufev->cbarg);
        }
@@ -261,10 +258,8 @@ _bufferevent_run_eventcb(struct bufferevent *bufev, short what)
        if (p->options & BEV_OPT_DEFER_CALLBACKS) {
                p->eventcb_pending |= what;
                p->errno_pending = EVUTIL_SOCKET_ERROR();
-               if (!p->deferred.queued) {
-                       bufferevent_incref(bufev);
+               if (!p->deferred.queued)
                        SCHEDULE_DEFERRED(p);
-               }
        } else {
                bufev->errorcb(bufev, what, bufev->cbarg);
        }
diff --git a/event.c b/event.c
index 8e8c324b02efd14e0813be8109265ebba8eb2d01..d71932cea6f421b26ca3a358cf6dea3b0e5803e8 100644 (file)
--- a/event.c
+++ b/event.c
@@ -1285,9 +1285,10 @@ event_process_active_single_queue(struct event_base *base,
 }
 
 /*
-   Process all the defered_cb entries in 'queue'.  If *breakptr becomes set to
-   1, stop.  Requires that we start out holding the lock on 'queue'; releases
-   the lock around 'queue' for each deferred_cb we process.
+   Process up to MAX_DEFERRED of the defered_cb entries in 'queue'.  If
+   *breakptr becomes set to 1, stop.  Requires that we start out holding
+   the lock on 'queue'; releases the lock around 'queue' for each deferred_cb
+   we process.
  */
 static int
 event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr)
@@ -1295,6 +1296,7 @@ event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr)
        int count = 0;
        struct deferred_cb *cb;
 
+#define MAX_DEFERRED 16
        while ((cb = TAILQ_FIRST(&queue->deferred_cb_list))) {
                cb->queued = 0;
                TAILQ_REMOVE(&queue->deferred_cb_list, cb, cb_next);
@@ -1302,12 +1304,14 @@ event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr)
                UNLOCK_DEFERRED_QUEUE(queue);
 
                cb->cb(cb, cb->arg);
-               ++count;
-               if (*breakptr)
-                       return -1;
 
                LOCK_DEFERRED_QUEUE(queue);
+               if (*breakptr)
+                       return -1;
+               if (++count == MAX_DEFERRED)
+                       break;
        }
+#undef MAX_DEFERRED
        return count;
 }
 
index 34cf64b1599b6ecf0ed19b557f554b8b89318cc3..675e350e1f426a8684af84ea8db7cd5547e821b2 100644 (file)
@@ -33,6 +33,9 @@
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
+#ifndef WIN32
+#include <unistd.h>
+#endif
 
 #ifdef _EVENT_HAVE_PTHREADS
 #include <pthread.h>
@@ -46,6 +49,7 @@
 #include "event2/event_struct.h"
 #include "event2/thread.h"
 #include "evthread-internal.h"
+#include "defer-internal.h"
 #include "regress.h"
 #include "tinytest_macros.h"
 
@@ -312,12 +316,102 @@ end:
        ;
 }
 
+#define CB_COUNT 128
+#define QUEUE_THREAD_COUNT 8
+
+#ifdef WIN32
+#define SLEEP_MS(ms) Sleep(ms)
+#else
+#define SLEEP_MS(ms) usleep((ms) * 1000)
+#endif
+
+struct deferred_test_data {
+       struct deferred_cb cbs[CB_COUNT];
+       struct deferred_cb_queue *queue;
+};
+
+static time_t timer_start = 0;
+static time_t timer_end = 0;
+static unsigned callback_count = 0;
+static THREAD_T load_threads[QUEUE_THREAD_COUNT];
+static struct deferred_test_data deferred_data[QUEUE_THREAD_COUNT];
+
+static void
+deferred_callback(struct deferred_cb *cb, void *arg)
+{
+       SLEEP_MS(1);
+       callback_count += 1;
+}
+
+static THREAD_FN
+load_deferred_queue(void *arg)
+{
+       struct deferred_test_data *data = arg;
+       size_t i;
+
+       for (i = 0; i < CB_COUNT; ++i) {
+               event_deferred_cb_init(&data->cbs[i], deferred_callback, NULL);
+               event_deferred_cb_schedule(data->queue, &data->cbs[i]);
+               SLEEP_MS(1);
+       }
+
+       THREAD_RETURN();
+}
+
+static void
+timer_callback(evutil_socket_t fd, short what, void *arg)
+{
+       timer_end = time(NULL);
+}
+
+static void
+start_threads_callback(evutil_socket_t fd, short what, void *arg)
+{
+       int i;
+
+       for (i = 0; i < QUEUE_THREAD_COUNT; ++i) {
+               THREAD_START(load_threads[i], load_deferred_queue,
+                               &deferred_data[i]);
+       }
+}
+
+static void
+thread_deferred_cb_skew(void *arg)
+{
+       struct basic_test_data *data = arg;
+       struct timeval tv_timer = {4, 0};
+       struct event event_threads;
+       struct deferred_cb_queue *queue;
+       int i;
+
+       queue = event_base_get_deferred_cb_queue(data->base);
+       tt_assert(queue);
+
+       for (i = 0; i < QUEUE_THREAD_COUNT; ++i)
+               deferred_data[i].queue = queue;
+
+       timer_start = time(NULL);
+       event_base_once(data->base, -1, EV_TIMEOUT, timer_callback, NULL,
+                       &tv_timer);
+       event_base_once(data->base, -1, EV_TIMEOUT, start_threads_callback,
+                       NULL, NULL);
+       event_base_dispatch(data->base);
+
+       TT_BLATHER(("callback count, %u", callback_count));
+       tt_int_op(timer_end - timer_start, ==, 4);
+
+end:
+       for (i = 0; i < QUEUE_THREAD_COUNT; ++i)
+               THREAD_JOIN(load_threads[i]);
+}
+
 #define TEST(name)                                                     \
            { #name, thread_##name, TT_FORK|TT_NEED_THREADS|TT_NEED_BASE, \
              &basic_setup, NULL }
 struct testcase_t thread_testcases[] = {
        TEST(basic),
        TEST(conditions_simple),
+       TEST(deferred_cb_skew),
        END_OF_TESTCASES
 };