From ae2b84b2575be93d0aebba5c0b78453836f89f3c Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Fri, 6 Apr 2012 04:33:19 -0400 Subject: [PATCH] Replace deferred_cbs with event_callback-based implementation. --- buffer.c | 13 ++- bufferevent-internal.h | 1 + bufferevent.c | 23 ++--- defer-internal.h | 56 +++--------- evbuffer-internal.h | 1 + evdns.c | 3 +- event-internal.h | 12 ++- event.c | 162 ++++++++++------------------------ http.c | 3 +- include/event2/event_struct.h | 5 +- test/regress_buffer.c | 1 + test/regress_thread.c | 2 +- 12 files changed, 94 insertions(+), 188 deletions(-) diff --git a/buffer.c b/buffer.c index b28c9361..6853edf3 100644 --- a/buffer.c +++ b/buffer.c @@ -404,7 +404,7 @@ evbuffer_defer_callbacks(struct evbuffer *buffer, struct event_base *base) EVBUFFER_LOCK(buffer); buffer->cb_queue = event_base_get_deferred_cb_queue_(base); buffer->deferred_cbs = 1; - event_deferred_cb_init_(&buffer->deferred, + event_deferred_cb_init_(base, &buffer->deferred, evbuffer_deferred_callback, buffer); EVBUFFER_UNLOCK(buffer); return 0; @@ -509,13 +509,12 @@ evbuffer_invoke_callbacks_(struct evbuffer *buffer) } if (buffer->deferred_cbs) { - if (buffer->deferred.queued) - return; - evbuffer_incref_and_lock_(buffer); - if (buffer->parent) - bufferevent_incref_(buffer->parent); + if (event_deferred_cb_schedule_(buffer->cb_queue, &buffer->deferred)) { + evbuffer_incref_and_lock_(buffer); + if (buffer->parent) + bufferevent_incref_(buffer->parent); + } EVBUFFER_UNLOCK(buffer); - event_deferred_cb_schedule_(buffer->cb_queue, &buffer->deferred); } evbuffer_run_callbacks(buffer, 0); diff --git a/bufferevent-internal.h b/bufferevent-internal.h index 461c46e7..5d7e98c0 100644 --- a/bufferevent-internal.h +++ b/bufferevent-internal.h @@ -31,6 +31,7 @@ extern "C" { #endif #include "event2/event-config.h" +#include "event2/event_struct.h" #include "evconfig-private.h" #include "event2/util.h" #include "defer-internal.h" diff --git a/bufferevent.c b/bufferevent.c index 0d4b01d6..9c023ad4 100644 --- a/bufferevent.c +++ b/bufferevent.c @@ -210,10 +210,10 @@ bufferevent_run_deferred_callbacks_unlocked(struct deferred_cb *cb, void *arg) #define SCHEDULE_DEFERRED(bevp) \ do { \ - bufferevent_incref_(&(bevp)->bev); \ - event_deferred_cb_schedule_( \ + if (event_deferred_cb_schedule_( \ event_base_get_deferred_cb_queue_((bevp)->bev.ev_base), \ - &(bevp)->deferred); \ + &(bevp)->deferred)) \ + bufferevent_incref_(&(bevp)->bev); \ } while (0) @@ -227,8 +227,7 @@ bufferevent_run_readcb_(struct bufferevent *bufev) return; if (p->options & BEV_OPT_DEFER_CALLBACKS) { p->readcb_pending = 1; - if (!p->deferred.queued) - SCHEDULE_DEFERRED(p); + SCHEDULE_DEFERRED(p); } else { bufev->readcb(bufev, bufev->cbarg); } @@ -244,8 +243,7 @@ bufferevent_run_writecb_(struct bufferevent *bufev) return; if (p->options & BEV_OPT_DEFER_CALLBACKS) { p->writecb_pending = 1; - if (!p->deferred.queued) - SCHEDULE_DEFERRED(p); + SCHEDULE_DEFERRED(p); } else { bufev->writecb(bufev, bufev->cbarg); } @@ -262,8 +260,7 @@ bufferevent_run_eventcb_(struct bufferevent *bufev, short what) if (p->options & BEV_OPT_DEFER_CALLBACKS) { p->eventcb_pending |= what; p->errno_pending = EVUTIL_SOCKET_ERROR(); - if (!p->deferred.queued) - SCHEDULE_DEFERRED(p); + SCHEDULE_DEFERRED(p); } else { bufev->errorcb(bufev, what, bufev->cbarg); } @@ -326,11 +323,15 @@ bufferevent_init_common_(struct bufferevent_private *bufev_private, } if (options & BEV_OPT_DEFER_CALLBACKS) { if (options & BEV_OPT_UNLOCK_CALLBACKS) - event_deferred_cb_init_(&bufev_private->deferred, + event_deferred_cb_init_( + bufev->ev_base, + &bufev_private->deferred, bufferevent_run_deferred_callbacks_unlocked, bufev_private); else - event_deferred_cb_init_(&bufev_private->deferred, + event_deferred_cb_init_( + bufev->ev_base, + &bufev_private->deferred, bufferevent_run_deferred_callbacks_locked, bufev_private); } diff --git a/defer-internal.h b/defer-internal.h index 114a9dc2..a4c88135 100644 --- a/defer-internal.h +++ b/defer-internal.h @@ -35,43 +35,11 @@ extern "C" { #include -struct deferred_cb; +#define deferred_cb event_callback +#define deferred_cb_queue event_base +struct event_callback; -typedef void (*deferred_cb_fn)(struct deferred_cb *, void *); - -/** A deferred_cb is a callback that can be scheduled to run as part of - * an event_base's event_loop, rather than running immediately. */ -struct deferred_cb { - /** Links to the adjacent active (pending) deferred_cb objects. */ - TAILQ_ENTRY (deferred_cb) cb_next; - /** True iff this deferred_cb is pending in an event_base. */ - unsigned queued : 1; - /** The function to execute when the callback runs. */ - deferred_cb_fn cb; - /** The function's second argument. */ - void *arg; -}; - -/** A deferred_cb_queue is a list of deferred_cb that we can add to and run. */ -struct deferred_cb_queue { - /** Lock used to protect the queue. */ - void *lock; - - /** Which event_base does this queue associate itself with? - * (Used for timing) */ - struct event_base *base; - - /** How many entries are in the queue? */ - int active_count; - - /** Function called when adding to the queue from another thread. */ - void (*notify_fn)(struct deferred_cb_queue *, void *); - void *notify_arg; - - /** Deferred callback management: a list of deferred callbacks to - * run active the active events. */ - TAILQ_HEAD (deferred_cb_list, deferred_cb) deferred_cb_list; -}; +typedef void (*deferred_cb_fn)(struct event_callback *, void *); /** Initialize an empty, non-pending deferred_cb. @@ -80,27 +48,23 @@ struct deferred_cb_queue { @param cb The function to run when the deferred_cb executes. @param arg The function's second argument. */ -void event_deferred_cb_init_(struct deferred_cb *, deferred_cb_fn, void *); +void event_deferred_cb_init_(struct event_base *base, struct event_callback *, deferred_cb_fn, void *); /** Cancel a deferred_cb if it is currently scheduled in an event_base. */ -void event_deferred_cb_cancel_(struct deferred_cb_queue *, struct deferred_cb *); +void event_deferred_cb_cancel_(struct event_base *, struct event_callback *); /** Activate a deferred_cb if it is not currently scheduled in an event_base. - */ -void event_deferred_cb_schedule_(struct deferred_cb_queue *, struct deferred_cb *); -#define LOCK_DEFERRED_QUEUE(q) \ - EVLOCK_LOCK((q)->lock, 0) -#define UNLOCK_DEFERRED_QUEUE(q) \ - EVLOCK_UNLOCK((q)->lock, 0) + Return true iff it was not previously scheduled. + */ +int event_deferred_cb_schedule_(struct event_base *, struct event_callback *); #ifdef __cplusplus } #endif -void event_deferred_cb_queue_init_(struct deferred_cb_queue *); -struct deferred_cb_queue *event_base_get_deferred_cb_queue_(struct event_base *); +#define event_base_get_deferred_cb_queue_(x) (x) #endif /* EVENT_INTERNAL_H_INCLUDED_ */ diff --git a/evbuffer-internal.h b/evbuffer-internal.h index 480c35a3..824739b1 100644 --- a/evbuffer-internal.h +++ b/evbuffer-internal.h @@ -34,6 +34,7 @@ extern "C" { #include "event2/event-config.h" #include "evconfig-private.h" #include "event2/util.h" +#include "event2/event_struct.h" #include "util-internal.h" #include "defer-internal.h" diff --git a/evdns.c b/evdns.c index 41d8b6b2..ff36fc3c 100644 --- a/evdns.c +++ b/evdns.c @@ -836,7 +836,8 @@ reply_schedule_callback(struct request *const req, u32 ttl, u32 err, struct repl d->handle = req->handle; } - event_deferred_cb_init_(&d->deferred, reply_run_callback, + event_deferred_cb_init_(req->base->event_base, + &d->deferred, reply_run_callback, req->user_pointer); event_deferred_cb_schedule_( event_base_get_deferred_cb_queue_(req->base->event_base), diff --git a/event-internal.h b/event-internal.h index 93b7c626..4757836c 100644 --- a/event-internal.h +++ b/event-internal.h @@ -56,13 +56,14 @@ extern "C" { #define ev_pri ev_evcallback.evcb_pri #define ev_flags ev_evcallback.evcb_flags #define ev_closure ev_evcallback.evcb_closure -#define ev_callback ev_evcallback.evcb_callback +#define ev_callback ev_evcallback.evcb_cb_union.evcb_callback #define ev_arg ev_evcallback.evcb_arg /* Possible values for evcb_closure in struct event_callback */ #define EV_CLOSURE_EVENT 0 #define EV_CLOSURE_EVENT_SIGNAL 1 #define EV_CLOSURE_EVENT_PERSIST 2 +#define EV_CLOSURE_CB_SELF 3 /** Structure to define the backend of a given event_base. */ struct eventop { @@ -239,10 +240,6 @@ struct event_base { /** The total size of common_timeout_queues. */ int n_common_timeouts_allocated; - /** List of defered_cb that are active. We run these after the active - * events. */ - struct deferred_cb_queue defer_queue; - /** Mapping from file descriptors to enabled (added) events */ struct event_io_map io; @@ -358,7 +355,7 @@ struct event_config { #endif /* TAILQ_FOREACH */ #define N_ACTIVE_CALLBACKS(base) \ - ((base)->event_count_active + (base)->defer_queue.active_count) + ((base)->event_count_active) int evsig_set_handler_(struct event_base *base, int evsignal, void (*fn)(int)); @@ -366,7 +363,8 @@ int evsig_restore_handler_(struct event_base *base, int evsignal); void event_active_nolock_(struct event *ev, int res, short count); -void event_callback_activate_nolock_(struct event_base *, struct event_callback *); +int event_callback_activate_(struct event_base *, struct event_callback *); +int event_callback_activate_nolock_(struct event_base *, struct event_callback *); int event_callback_cancel_(struct event_base *base, struct event_callback *evcb); diff --git a/event.c b/event.c index c8315911..9a506c82 100644 --- a/event.c +++ b/event.c @@ -506,28 +506,6 @@ event_base_get_features(const struct event_base *base) return base->evsel->features; } -void -event_deferred_cb_queue_init_(struct deferred_cb_queue *cb) -{ - memset(cb, 0, sizeof(struct deferred_cb_queue)); - TAILQ_INIT(&cb->deferred_cb_list); -} - -/** Helper for the deferred_cb queue: wake up the event base. */ -static void -notify_base_cbq_callback(struct deferred_cb_queue *cb, void *baseptr) -{ - struct event_base *base = baseptr; - if (EVBASE_NEED_NOTIFY(base)) - evthread_notify_base(base); -} - -struct deferred_cb_queue * -event_base_get_deferred_cb_queue_(struct event_base *base) -{ - return base ? &base->defer_queue : NULL; -} - void event_enable_debug_mode(void) { @@ -605,11 +583,6 @@ event_base_new_with_config(const struct event_config *cfg) base->th_notify_fd[0] = -1; base->th_notify_fd[1] = -1; - event_deferred_cb_queue_init_(&base->defer_queue); - base->defer_queue.base = base; - base->defer_queue.notify_fn = notify_base_cbq_callback; - base->defer_queue.notify_arg = base; - TAILQ_INIT(&base->active_later_queue); evmap_io_initmap_(&base->io); @@ -682,7 +655,6 @@ event_base_new_with_config(const struct event_config *cfg) int r; EVTHREAD_ALLOC_LOCK(base->th_base_lock, EVTHREAD_LOCKTYPE_RECURSIVE); - base->defer_queue.lock = base->th_base_lock; EVTHREAD_ALLOC_COND(base->current_event_cond); r = evthread_make_base_notifiable(base); if (r<0) { @@ -1464,7 +1436,7 @@ event_process_active_single_queue(struct event_base *base, event_queue_remove_active(base, evcb); event_debug(("event_process_active: event_callback %p, " "closure %d, call %p", - evcb, evcb->evcb_closure, evcb->evcb_callback)); + evcb, evcb->evcb_closure, evcb->evcb_cb_union.evcb_callback)); } if (!(evcb->evcb_flags & EVLIST_INTERNAL)) @@ -1488,6 +1460,10 @@ event_process_active_single_queue(struct event_base *base, (*ev->ev_callback)( ev->ev_fd, ev->ev_res, ev->ev_arg); break; + case EV_CLOSURE_CB_SELF: + EVBASE_RELEASE_LOCK(base, th_base_lock); + evcb->evcb_cb_union.evcb_selfcb(evcb, evcb->evcb_arg); + break; default: EVUTIL_ASSERT(0); } @@ -1518,47 +1494,6 @@ event_process_active_single_queue(struct event_base *base, return count; } -/* - Process up to MAX_DEFERRED of the defered_cb entries in 'queue'. If - *breakptr becomes set to 1, stop. Requires that we start out holding - the lock on 'queue'; releases the lock around 'queue' for each deferred_cb - we process. - */ -static int -event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr, - int max_to_process, const struct timeval *endtime) -{ - int count = 0; - struct deferred_cb *cb; -#define MAX_DEFERRED 16 - if (max_to_process > MAX_DEFERRED) - max_to_process = MAX_DEFERRED; -#undef MAX_DEFERRED - - while ((cb = TAILQ_FIRST(&queue->deferred_cb_list))) { - cb->queued = 0; - TAILQ_REMOVE(&queue->deferred_cb_list, cb, cb_next); - --queue->active_count; - UNLOCK_DEFERRED_QUEUE(queue); - - cb->cb(cb, cb->arg); - - LOCK_DEFERRED_QUEUE(queue); - if (*breakptr) - return -1; - if (++count >= max_to_process) - break; - if (endtime) { - struct timeval now; - update_time_cache(queue->base); - gettime(queue->base, &now); - if (evutil_timercmp(&now, endtime, >=)) - return count; - } - } - return count; -} - /* * Active events are stored in priority queues. Lower priorities are always * process before higher priorities. Low priority events can starve high @@ -1605,8 +1540,6 @@ event_process_active(struct event_base *base) } } - event_process_deferred_callbacks(&base->defer_queue,&base->event_break, - maxcb-c, endtime); base->event_running_priority = -1; return c; } @@ -2558,17 +2491,42 @@ event_active_later_nolock_(struct event *ev, int res) event_callback_activate_later_nolock_(base, event_to_event_callback(ev)); } -void +int +event_callback_activate_(struct event_base *base, + struct event_callback *evcb) +{ + int r; + EVBASE_ACQUIRE_LOCK(base, th_base_lock); + r = event_callback_activate_nolock_(base, evcb); + EVBASE_RELEASE_LOCK(base, th_base_lock); + return r; +} + +int event_callback_activate_nolock_(struct event_base *base, struct event_callback *evcb) { - if (evcb->evcb_flags & EVLIST_ACTIVE_LATER) + int r = 1; + + switch (evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) { + default: + EVUTIL_ASSERT(0); + case EVLIST_ACTIVE_LATER: event_queue_remove_active_later(base, evcb); + r = 0; + break; + case EVLIST_ACTIVE: + return 0; + case 0: + break; + } event_queue_insert_active(base, evcb); if (EVBASE_NEED_NOTIFY(base)) evthread_notify_base(base); + + return r; } void @@ -2628,53 +2586,31 @@ event_callback_cancel_nolock_(struct event_base *base, } void -event_deferred_cb_init_(struct deferred_cb *cb, deferred_cb_fn fn, void *arg) +event_deferred_cb_init_(struct event_base *base, struct event_callback *cb, deferred_cb_fn fn, void *arg) { - memset(cb, 0, sizeof(struct deferred_cb)); - cb->cb = fn; - cb->arg = arg; + if (!base) + base = current_base; + memset(cb, 0, sizeof(*cb)); + cb->evcb_cb_union.evcb_selfcb = fn; + cb->evcb_arg = arg; + cb->evcb_pri = base->nactivequeues - 1; + cb->evcb_closure = EV_CLOSURE_CB_SELF; } void -event_deferred_cb_cancel_(struct deferred_cb_queue *queue, - struct deferred_cb *cb) +event_deferred_cb_cancel_(struct event_base *base, struct event_callback *cb) { - if (!queue) { - if (current_base) - queue = ¤t_base->defer_queue; - else - return; - } - - LOCK_DEFERRED_QUEUE(queue); - if (cb->queued) { - TAILQ_REMOVE(&queue->deferred_cb_list, cb, cb_next); - --queue->active_count; - cb->queued = 0; - } - UNLOCK_DEFERRED_QUEUE(queue); + if (!base) + base = current_base; + event_callback_cancel_(base, cb); } -void -event_deferred_cb_schedule_(struct deferred_cb_queue *queue, - struct deferred_cb *cb) +int +event_deferred_cb_schedule_(struct event_base *base, struct event_callback *cb) { - if (!queue) { - if (current_base) - queue = ¤t_base->defer_queue; - else - return; - } - - LOCK_DEFERRED_QUEUE(queue); - if (!cb->queued) { - cb->queued = 1; - TAILQ_INSERT_TAIL(&queue->deferred_cb_list, cb, cb_next); - ++queue->active_count; - if (queue->notify_fn) - queue->notify_fn(queue, queue->notify_arg); - } - UNLOCK_DEFERRED_QUEUE(queue); + if (!base) + base = current_base; + return event_callback_activate_(base, cb); } static int diff --git a/http.c b/http.c index 786105d3..e5262755 100644 --- a/http.c +++ b/http.c @@ -2197,7 +2197,8 @@ evhttp_connection_base_bufferevent_new(struct event_base *base, struct evdns_bas bufferevent_base_set(base, evcon->bufev); } - event_deferred_cb_init_(&evcon->read_more_deferred_cb, + event_deferred_cb_init_(evcon->base, + &evcon->read_more_deferred_cb, evhttp_deferred_read_cb, evcon); evcon->dns_base = dnsbase; diff --git a/include/event2/event_struct.h b/include/event2/event_struct.h index 701e8eac..cf7b3df3 100644 --- a/include/event2/event_struct.h +++ b/include/event2/event_struct.h @@ -99,7 +99,10 @@ struct event_callback { ev_uint8_t evcb_pri; /* smaller numbers are higher priority */ ev_uint8_t evcb_closure; /* allows us to adopt for different types of events */ - void (*evcb_callback)(evutil_socket_t, short, void *arg); + union { + void (*evcb_callback)(evutil_socket_t, short, void *arg); + void (*evcb_selfcb)(struct event_callback *, void *arg); + } evcb_cb_union; void *evcb_arg; }; diff --git a/test/regress_buffer.c b/test/regress_buffer.c index d183b4f9..d94a8b37 100644 --- a/test/regress_buffer.c +++ b/test/regress_buffer.c @@ -57,6 +57,7 @@ #include "event2/buffer_compat.h" #include "event2/util.h" +#include "defer-internal.h" #include "evbuffer-internal.h" #include "log-internal.h" diff --git a/test/regress_thread.c b/test/regress_thread.c index 091bcb73..9fd49fa6 100644 --- a/test/regress_thread.c +++ b/test/regress_thread.c @@ -439,7 +439,7 @@ load_deferred_queue(void *arg) size_t i; for (i = 0; i < CB_COUNT; ++i) { - event_deferred_cb_init_(&data->cbs[i], deferred_callback, NULL); + event_deferred_cb_init_(data->queue, &data->cbs[i], deferred_callback, NULL); event_deferred_cb_schedule_(data->queue, &data->cbs[i]); SLEEP_MS(1); } -- 2.50.1