From b06b2649b4c7f5feaedea97c31001c14708e4d1f Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Sun, 26 Jul 2009 01:29:39 +0000 Subject: [PATCH] Make "deferred callback queue" independent of event_base. This way, we can more easily have an IOCP bufferevent implementation that does not need an event_base at all. Woot. svn:r1381 --- buffer.c | 6 +-- bufferevent.c | 17 +++++--- defer-internal.h | 35 ++++++++++++++++- evbuffer-internal.h | 5 +-- evdns.c | 4 +- event-internal.h | 8 ++-- event.c | 94 +++++++++++++++++++++++++++++++-------------- 7 files changed, 122 insertions(+), 47 deletions(-) diff --git a/buffer.c b/buffer.c index 870c973a..a67f7f78 100644 --- a/buffer.c +++ b/buffer.c @@ -278,7 +278,7 @@ int evbuffer_defer_callbacks(struct evbuffer *buffer, struct event_base *base) { EVBUFFER_LOCK(buffer, EVTHREAD_WRITE); - buffer->ev_base = base; + buffer->cb_queue = event_base_get_deferred_cb_queue(base); buffer->deferred_cbs = 1; event_deferred_cb_init(&buffer->deferred, evbuffer_deferred_callback, buffer); @@ -361,7 +361,7 @@ evbuffer_invoke_callbacks(struct evbuffer *buffer) if (buffer->deferred.queued) return; _evbuffer_incref(buffer); - event_deferred_cb_schedule(buffer->ev_base, &buffer->deferred); + event_deferred_cb_schedule(buffer->cb_queue, &buffer->deferred); } else { evbuffer_run_callbacks(buffer); } @@ -407,7 +407,7 @@ _evbuffer_decref_and_unlock(struct evbuffer *buffer) } evbuffer_remove_all_callbacks(buffer); if (buffer->deferred_cbs) - event_deferred_cb_cancel(buffer->ev_base, &buffer->deferred); + event_deferred_cb_cancel(buffer->cb_queue, &buffer->deferred); EVBUFFER_UNLOCK(buffer, EVTHREAD_WRITE); if (buffer->own_lock) diff --git a/bufferevent.c b/bufferevent.c index 74097e26..f79ff540 100644 --- a/bufferevent.c +++ b/bufferevent.c @@ -140,6 +140,14 @@ bufferevent_run_deferred_callbacks(struct deferred_cb *_, void *arg) _bufferevent_decref_and_unlock(bufev); } +#define SCHEDULE_DEFERRED(bevp) \ + do { \ + event_deferred_cb_schedule( \ + event_base_get_deferred_cb_queue((bevp)->bev.ev_base), \ + &(bevp)->deferred); \ + } while (0); + + void _bufferevent_run_readcb(struct bufferevent *bufev) { @@ -150,8 +158,7 @@ _bufferevent_run_readcb(struct bufferevent *bufev) p->readcb_pending = 1; if (!p->deferred.queued) { bufferevent_incref(bufev); - event_deferred_cb_schedule( - bufev->ev_base, &p->deferred); + SCHEDULE_DEFERRED(p); } } else { bufev->readcb(bufev, bufev->cbarg); @@ -168,8 +175,7 @@ _bufferevent_run_writecb(struct bufferevent *bufev) p->writecb_pending = 1; if (!p->deferred.queued) { bufferevent_incref(bufev); - event_deferred_cb_schedule( - bufev->ev_base, &p->deferred); + SCHEDULE_DEFERRED(p); } } else { bufev->writecb(bufev, bufev->cbarg); @@ -187,8 +193,7 @@ _bufferevent_run_eventcb(struct bufferevent *bufev, short what) p->errno_pending = EVUTIL_SOCKET_ERROR(); if (!p->deferred.queued) { bufferevent_incref(bufev); - event_deferred_cb_schedule( - bufev->ev_base, &p->deferred); + SCHEDULE_DEFERRED(p); } } else { bufev->errorcb(bufev, what, bufev->cbarg); diff --git a/defer-internal.h b/defer-internal.h index 8eead90f..cde6e903 100644 --- a/defer-internal.h +++ b/defer-internal.h @@ -50,6 +50,20 @@ struct deferred_cb { void *arg; }; + +struct deferred_cb_queue { + void *lock; + + int active_count; + + void (*notify_fn)(struct deferred_cb_queue *, void *); + void *notify_arg; + + /** Deferred callback management: a list of deferred callbacks to + * run active the active events. */ + TAILQ_HEAD (deferred_cb_list, deferred_cb) deferred_cb_list; +}; + /** Initialize an empty, non-pending deferred_cb. @@ -61,15 +75,32 @@ void event_deferred_cb_init(struct deferred_cb *, deferred_cb_fn, void *); /** Cancel a deferred_cb if it is currently scheduled in an event_base. */ -void event_deferred_cb_cancel(struct event_base *, struct deferred_cb *); +void event_deferred_cb_cancel(struct deferred_cb_queue *, struct deferred_cb *); /** Activate a deferred_cb if it is not currently scheduled in an event_base. */ -void event_deferred_cb_schedule(struct event_base *, struct deferred_cb *); +void event_deferred_cb_schedule(struct deferred_cb_queue *, struct deferred_cb *); + +#define LOCK_DEFERRED_QUEUE(q) \ + do { \ + if ((q)->lock) \ + _evthread_locking_fn(EVTHREAD_LOCK|EVTHREAD_WRITE, \ + (q)->lock); \ + } while (0) + +#define UNLOCK_DEFERRED_QUEUE(q) \ + do { \ + if ((q)->lock) \ + _evthread_locking_fn(EVTHREAD_UNLOCK|EVTHREAD_WRITE, \ + (q)->lock); \ + } while (0) #ifdef __cplusplus } #endif +void event_deferred_cb_queue_init(struct deferred_cb_queue *); +struct deferred_cb_queue *event_base_get_deferred_cb_queue(struct event_base *); + #endif /* _EVENT_INTERNAL_H_ */ diff --git a/evbuffer-internal.h b/evbuffer-internal.h index 8e94f607..063777e2 100644 --- a/evbuffer-internal.h +++ b/evbuffer-internal.h @@ -115,9 +115,8 @@ struct evbuffer { unsigned is_overlapped : 1; #endif - /** An event_base associated with this evbuffer. Used to implement - * deferred callbacks. */ - struct event_base *ev_base; + /** Used to implement deferred callbacks. */ + struct deferred_cb_queue *cb_queue; /** For debugging: how many times have we acquired the lock for this * evbuffer? */ diff --git a/evdns.c b/evdns.c index 496c2e88..6962cc19 100644 --- a/evdns.c +++ b/evdns.c @@ -804,7 +804,9 @@ reply_schedule_callback(struct evdns_request *const req, u32 ttl, u32 err, struc event_deferred_cb_init(&d->deferred, reply_run_callback, req->user_pointer); - event_deferred_cb_schedule(req->base->event_base, &d->deferred); + event_deferred_cb_schedule( + event_base_get_deferred_cb_queue(req->base->event_base), + &d->deferred); } /* this processes a parsed reply packet */ diff --git a/event-internal.h b/event-internal.h index 527ee2ce..724135b8 100644 --- a/event-internal.h +++ b/event-internal.h @@ -37,6 +37,7 @@ extern "C" { #include "minheap-internal.h" #include "evsignal-internal.h" #include "mm-internal.h" +#include "defer-internal.h" /* map union members back */ @@ -136,9 +137,7 @@ struct event_base { /** The event whose callback is executing right now */ struct event *current_event; - /** Deferred callback management: a list of deferred callbacks to - * run active the active events. */ - TAILQ_HEAD (deferred_cb_list, deferred_cb) deferred_cb_list; + struct deferred_cb_queue defer_queue; /** Mapping from file descriptors to enabled events */ struct event_io_map io; @@ -209,6 +208,9 @@ struct event_config { } while (0) #endif /* TAILQ_FOREACH */ +#define N_ACTIVE_CALLBACKS(base) \ + ((base)->event_count_active + (base)->defer_queue.active_count) + int _evsig_set_handler(struct event_base *base, int evsignal, void (*fn)(int)); int _evsig_restore_handler(struct event_base *base, int evsignal); diff --git a/event.c b/event.c index 0fea11f4..83526d09 100644 --- a/event.c +++ b/event.c @@ -230,6 +230,27 @@ event_base_get_features(struct event_base *base) return base->evsel->features; } +void +event_deferred_cb_queue_init(struct deferred_cb_queue *cb) +{ + memset(cb, 0, sizeof(struct deferred_cb_queue)); + TAILQ_INIT(&cb->deferred_cb_list); +} + +static void +notify_base_cbq_callback(struct deferred_cb_queue *cb, void *baseptr) +{ + struct event_base *base = baseptr; + if (!EVBASE_IN_THREAD(base)) + evthread_notify_base(base); +} + +struct deferred_cb_queue * +event_base_get_deferred_cb_queue(struct event_base *base) +{ + return base ? &base->defer_queue : NULL; +} + struct event_base * event_base_new_with_config(struct event_config *cfg) { @@ -245,10 +266,13 @@ event_base_new_with_config(struct event_config *cfg) min_heap_ctor(&base->timeheap); TAILQ_INIT(&base->eventqueue); - TAILQ_INIT(&base->deferred_cb_list); base->sig.ev_signal_pair[0] = -1; base->sig.ev_signal_pair[1] = -1; + event_deferred_cb_queue_init(&base->defer_queue); + base->defer_queue.notify_fn = notify_base_cbq_callback; + base->defer_queue.notify_arg = base; + evmap_io_initmap(&base->io); evmap_signal_initmap(&base->sigmap); @@ -301,6 +325,7 @@ event_base_new_with_config(struct event_config *cfg) if (!cfg || !(cfg->flags & EVENT_BASE_FLAG_NOLOCK)) { int r; EVTHREAD_ALLOC_LOCK(base->th_base_lock); + base->defer_queue.lock = base->th_base_lock; EVTHREAD_ALLOC_LOCK(base->current_event_lock); r = evthread_make_base_notifiable(base); if (r<0) { @@ -551,7 +576,7 @@ event_base_priority_init(struct event_base *base, int npriorities) { int i; - if (base->event_count_active || npriorities < 1 + if (N_ACTIVE_CALLBACKS(base) || npriorities < 1 || npriorities >= EVENT_MAX_PRIORITIES) return (-1); @@ -678,23 +703,23 @@ event_process_active_single_queue(struct event_base *base, } static int -event_process_deferred_callbacks(struct event_base *base) +event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr) { int count = 0; struct deferred_cb *cb; - while ((cb = TAILQ_FIRST(&base->deferred_cb_list))) { + while ((cb = TAILQ_FIRST(&queue->deferred_cb_list))) { cb->queued = 0; - TAILQ_REMOVE(&base->deferred_cb_list, cb, cb_next); - --base->event_count_active; - EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock); + TAILQ_REMOVE(&queue->deferred_cb_list, cb, cb_next); + --queue->active_count; + UNLOCK_DEFERRED_QUEUE(queue); cb->cb(cb, cb->arg); ++count; - if (base->event_break) + if (*breakptr) return -1; - EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock); + LOCK_DEFERRED_QUEUE(queue); } return count; } @@ -727,7 +752,7 @@ event_process_active(struct event_base *base) } } - event_process_deferred_callbacks(base); + event_process_deferred_callbacks(&base->defer_queue,&base->event_break); unlock: EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock); @@ -845,7 +870,7 @@ event_base_loop(struct event_base *base, int flags) timeout_correct(base, &tv); tv_p = &tv; - if (!base->event_count_active && !(flags & EVLOOP_NONBLOCK)) { + if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) { timeout_next(base, &tv_p); } else { /* @@ -856,7 +881,7 @@ event_base_loop(struct event_base *base, int flags) } /* If we have no events, we just exit */ - if (!event_haveevents(base) && !base->event_count_active) { + if (!event_haveevents(base) && !N_ACTIVE_CALLBACKS(base)) { event_debug(("%s: no events registered.", __func__)); return (1); } @@ -875,7 +900,7 @@ event_base_loop(struct event_base *base, int flags) timeout_process(base); - if (base->event_count_active) { + if (N_ACTIVE_CALLBACKS(base)) { event_process_active(base); if (!base->event_count_active && (flags & EVLOOP_ONCE)) done = 1; @@ -1377,34 +1402,45 @@ event_deferred_cb_init(struct deferred_cb *cb, deferred_cb_fn fn, void *arg) } void -event_deferred_cb_cancel(struct event_base *base, struct deferred_cb *cb) +event_deferred_cb_cancel(struct deferred_cb_queue *queue, + struct deferred_cb *cb) { - if (!base) - base = current_base; + if (!queue) { + if (current_base) + queue = ¤t_base->defer_queue; + else + return; + } - EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock); + LOCK_DEFERRED_QUEUE(queue); if (cb->queued) { - TAILQ_REMOVE(&base->deferred_cb_list, cb, cb_next); - --base->event_count_active; + TAILQ_REMOVE(&queue->deferred_cb_list, cb, cb_next); + --queue->active_count; cb->queued = 0; } - EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock); + UNLOCK_DEFERRED_QUEUE(queue); } void -event_deferred_cb_schedule(struct event_base *base, struct deferred_cb *cb) +event_deferred_cb_schedule(struct deferred_cb_queue *queue, + struct deferred_cb *cb) { - if (!base) - base = current_base; - EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock); + if (!queue) { + if (current_base) + queue = ¤t_base->defer_queue; + else + return; + } + + LOCK_DEFERRED_QUEUE(queue); if (!cb->queued) { cb->queued = 1; - TAILQ_INSERT_TAIL(&base->deferred_cb_list, cb, cb_next); - ++base->event_count_active; - if (!EVBASE_IN_THREAD(base)) - evthread_notify_base(base); + TAILQ_INSERT_TAIL(&queue->deferred_cb_list, cb, cb_next); + ++queue->active_count; + if (queue->notify_fn) + queue->notify_fn(queue, queue->notify_arg); } - EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock); + UNLOCK_DEFERRED_QUEUE(queue); } static int -- 2.50.1