#define SCHEDULE_DEFERRED(bevp) \
do { \
+ bufferevent_incref(&(bevp)->bev); \
event_deferred_cb_schedule( \
event_base_get_deferred_cb_queue((bevp)->bev.ev_base), \
&(bevp)->deferred); \
- } while (0);
+ } while (0)
void
return;
if (p->options & BEV_OPT_DEFER_CALLBACKS) {
p->readcb_pending = 1;
- if (!p->deferred.queued) {
- bufferevent_incref(bufev);
+ if (!p->deferred.queued)
SCHEDULE_DEFERRED(p);
- }
} else {
bufev->readcb(bufev, bufev->cbarg);
}
return;
if (p->options & BEV_OPT_DEFER_CALLBACKS) {
p->writecb_pending = 1;
- if (!p->deferred.queued) {
- bufferevent_incref(bufev);
+ if (!p->deferred.queued)
SCHEDULE_DEFERRED(p);
- }
} else {
bufev->writecb(bufev, bufev->cbarg);
}
if (p->options & BEV_OPT_DEFER_CALLBACKS) {
p->eventcb_pending |= what;
p->errno_pending = EVUTIL_SOCKET_ERROR();
- if (!p->deferred.queued) {
- bufferevent_incref(bufev);
+ if (!p->deferred.queued)
SCHEDULE_DEFERRED(p);
- }
} else {
bufev->errorcb(bufev, what, bufev->cbarg);
}
}
/*
- Process all the defered_cb entries in 'queue'. If *breakptr becomes set to
- 1, stop. Requires that we start out holding the lock on 'queue'; releases
- the lock around 'queue' for each deferred_cb we process.
+ Process up to MAX_DEFERRED of the defered_cb entries in 'queue'. If
+ *breakptr becomes set to 1, stop. Requires that we start out holding
+ the lock on 'queue'; releases the lock around 'queue' for each deferred_cb
+ we process.
*/
static int
event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr)
int count = 0;
struct deferred_cb *cb;
+#define MAX_DEFERRED 16
while ((cb = TAILQ_FIRST(&queue->deferred_cb_list))) {
cb->queued = 0;
TAILQ_REMOVE(&queue->deferred_cb_list, cb, cb_next);
UNLOCK_DEFERRED_QUEUE(queue);
cb->cb(cb, cb->arg);
- ++count;
- if (*breakptr)
- return -1;
LOCK_DEFERRED_QUEUE(queue);
+ if (*breakptr)
+ return -1;
+ if (++count == MAX_DEFERRED)
+ break;
}
+#undef MAX_DEFERRED
return count;
}
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
+#ifndef WIN32
+#include <unistd.h>
+#endif
#ifdef _EVENT_HAVE_PTHREADS
#include <pthread.h>
#include "event2/event_struct.h"
#include "event2/thread.h"
#include "evthread-internal.h"
+#include "defer-internal.h"
#include "regress.h"
#include "tinytest_macros.h"
;
}
+#define CB_COUNT 128
+#define QUEUE_THREAD_COUNT 8
+
+#ifdef WIN32
+#define SLEEP_MS(ms) Sleep(ms)
+#else
+#define SLEEP_MS(ms) usleep((ms) * 1000)
+#endif
+
+struct deferred_test_data {
+ struct deferred_cb cbs[CB_COUNT];
+ struct deferred_cb_queue *queue;
+};
+
+static time_t timer_start = 0;
+static time_t timer_end = 0;
+static unsigned callback_count = 0;
+static THREAD_T load_threads[QUEUE_THREAD_COUNT];
+static struct deferred_test_data deferred_data[QUEUE_THREAD_COUNT];
+
+static void
+deferred_callback(struct deferred_cb *cb, void *arg)
+{
+ SLEEP_MS(1);
+ callback_count += 1;
+}
+
+static THREAD_FN
+load_deferred_queue(void *arg)
+{
+ struct deferred_test_data *data = arg;
+ size_t i;
+
+ for (i = 0; i < CB_COUNT; ++i) {
+ event_deferred_cb_init(&data->cbs[i], deferred_callback, NULL);
+ event_deferred_cb_schedule(data->queue, &data->cbs[i]);
+ SLEEP_MS(1);
+ }
+
+ THREAD_RETURN();
+}
+
+static void
+timer_callback(evutil_socket_t fd, short what, void *arg)
+{
+ timer_end = time(NULL);
+}
+
+static void
+start_threads_callback(evutil_socket_t fd, short what, void *arg)
+{
+ int i;
+
+ for (i = 0; i < QUEUE_THREAD_COUNT; ++i) {
+ THREAD_START(load_threads[i], load_deferred_queue,
+ &deferred_data[i]);
+ }
+}
+
+static void
+thread_deferred_cb_skew(void *arg)
+{
+ struct basic_test_data *data = arg;
+ struct timeval tv_timer = {4, 0};
+ struct event event_threads;
+ struct deferred_cb_queue *queue;
+ int i;
+
+ queue = event_base_get_deferred_cb_queue(data->base);
+ tt_assert(queue);
+
+ for (i = 0; i < QUEUE_THREAD_COUNT; ++i)
+ deferred_data[i].queue = queue;
+
+ timer_start = time(NULL);
+ event_base_once(data->base, -1, EV_TIMEOUT, timer_callback, NULL,
+ &tv_timer);
+ event_base_once(data->base, -1, EV_TIMEOUT, start_threads_callback,
+ NULL, NULL);
+ event_base_dispatch(data->base);
+
+ TT_BLATHER(("callback count, %u", callback_count));
+ tt_int_op(timer_end - timer_start, ==, 4);
+
+end:
+ for (i = 0; i < QUEUE_THREAD_COUNT; ++i)
+ THREAD_JOIN(load_threads[i]);
+}
+
#define TEST(name) \
{ #name, thread_##name, TT_FORK|TT_NEED_THREADS|TT_NEED_BASE, \
&basic_setup, NULL }
struct testcase_t thread_testcases[] = {
TEST(basic),
TEST(conditions_simple),
+ TEST(deferred_cb_skew),
END_OF_TESTCASES
};