From cba59e53253bb396f192e40a002c6dd9835df51c Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Thu, 5 Apr 2012 12:38:18 -0400 Subject: [PATCH] Refactor the callback part of an event into its own event_callback type This shouldn't have any visible effect, but it's necessary or advisible for a few changes and cleanups I would like to make, including: * Replacing the deferred queue with a type that works more as if it were an event. * Introducing a useful "activate this on the next round through the event loop" state for events and deferreds. * Adding an "on until further notice" status for events, to allow a saner win32-hybrid approach. * Eventually, making all user callbacks first-class things with event-like semantics. --- event-internal.h | 26 +++-- event.c | 199 +++++++++++++++++++++------------- include/event2/event_struct.h | 19 ++-- 3 files changed, 153 insertions(+), 91 deletions(-) diff --git a/event-internal.h b/event-internal.h index 60b58c6c..e4025cf6 100644 --- a/event-internal.h +++ b/event-internal.h @@ -53,10 +53,16 @@ extern "C" { #define ev_ncalls ev_.ev_signal.ev_ncalls #define ev_pncalls ev_.ev_signal.ev_pncalls -/* Possible values for ev_closure in struct event. */ -#define EV_CLOSURE_NONE 0 -#define EV_CLOSURE_SIGNAL 1 -#define EV_CLOSURE_PERSIST 2 +#define ev_pri ev_evcallback.evcb_pri +#define ev_flags ev_evcallback.evcb_flags +#define ev_closure ev_evcallback.evcb_closure +#define ev_callback ev_evcallback.evcb_callback +#define ev_arg ev_evcallback.evcb_arg + +/* Possible values for evcb_closure in struct event_callback */ +#define EV_CLOSURE_EVENT 0 +#define EV_CLOSURE_EVENT_SIGNAL 1 +#define EV_CLOSURE_EVENT_PERSIST 2 /** Structure to define the backend of a given event_base. */ struct eventop { @@ -170,6 +176,8 @@ extern int event_debug_mode_on_; #define EVENT_DEBUG_MODE_IS_ON() (0) #endif +TAILQ_HEAD(evcallback_list, event_callback); + struct event_base { /** Function pointers and other data to describe this event_base's * backend. */ @@ -210,11 +218,11 @@ struct event_base { int running_loop; /* Active event management. */ - /** An array of nactivequeues queues for active events (ones that - * have triggered, and whose callbacks need to be called). Low + /** An array of nactivequeues queues for active event_callbacks (ones + * that have triggered, and whose callbacks need to be called). Low * priority numbers are more important, and stall higher ones. */ - struct event_list *activequeues; + struct evcallback_list *activequeues; /** The length of the activequeues array */ int nactivequeues; @@ -266,7 +274,7 @@ struct event_base { int current_event_waiters; #endif /** The event whose callback is executing right now */ - struct event *current_event; + struct event_callback *current_event; #ifdef _WIN32 /** IOCP support structure, if IOCP is enabled. */ @@ -355,6 +363,8 @@ int evsig_restore_handler_(struct event_base *base, int evsignal); void event_active_nolock_(struct event *ev, int res, short count); +void event_callback_activate_nolock_(struct event_base *, struct event_callback *); + /* FIXME document. */ void event_base_add_virtual_(struct event_base *base); diff --git a/event.c b/event.c index c5c118cc..3307db1b 100644 --- a/event.c +++ b/event.c @@ -135,10 +135,10 @@ static inline int event_add_internal(struct event *ev, const struct timeval *tv, int tv_is_absolute); static inline int event_del_internal(struct event *ev); -static void event_queue_insert_active(struct event_base *, struct event *); +static void event_queue_insert_active(struct event_base *, struct event_callback *); static void event_queue_insert_timeout(struct event_base *, struct event *); static void event_queue_insert_inserted(struct event_base *, struct event *); -static void event_queue_remove_active(struct event_base *, struct event *); +static void event_queue_remove_active(struct event_base *, struct event_callback *); static void event_queue_remove_timeout(struct event_base *, struct event *); static void event_queue_remove_inserted(struct event_base *, struct event *); #ifdef USE_REINSERT_TIMEOUT @@ -424,6 +424,19 @@ event_base_update_cache_time(struct event_base *base) return 0; } +static inline struct event * +event_callback_to_event(struct event_callback *evcb) +{ + EVUTIL_ASSERT((evcb->evcb_flags & EVLIST_INIT)); + return EVUTIL_UPCAST(evcb, struct event, ev_evcallback); +} + +static inline struct event_callback * +event_to_event_callback(struct event *ev) +{ + return &ev->ev_evcallback; +} + struct event_base * event_init(void) { @@ -777,13 +790,19 @@ event_base_free(struct event_base *base) mm_free(base->common_timeout_queues); for (i = 0; i < base->nactivequeues; ++i) { - for (ev = TAILQ_FIRST(&base->activequeues[i]); ev; ) { - struct event *next = TAILQ_NEXT(ev, ev_active_next); - if (!(ev->ev_flags & EVLIST_INTERNAL)) { - event_del(ev); - ++n_deleted; + struct event_callback *evcb, *next; + for (evcb = TAILQ_FIRST(&base->activequeues[i]); evcb; ) { + next = TAILQ_NEXT(evcb, evcb_active_next); + if (evcb->evcb_flags & EVLIST_INIT) { + ev = event_callback_to_event(evcb); + if (!(ev->ev_flags & EVLIST_INTERNAL)) { + event_del(ev); + ++n_deleted; + } + } else { + event_queue_remove_active(base, evcb); } - ev = next; + evcb = next; } } @@ -1093,8 +1112,8 @@ event_base_priority_init(struct event_base *base, int npriorities) } /* Allocate our priority queues */ - base->activequeues = (struct event_list *) - mm_calloc(npriorities, sizeof(struct event_list)); + base->activequeues = (struct evcallback_list *) + mm_calloc(npriorities, sizeof(struct evcallback_list)); if (base->activequeues == NULL) { event_warn("%s: calloc", __func__); goto err; @@ -1393,51 +1412,63 @@ event_persist_closure(struct event_base *base, struct event *ev) releasing the lock as we go. This function requires that the lock be held when it's invoked. Returns -1 if we get a signal or an event_break that means we should stop processing any active events now. Otherwise returns - the number of non-internal events that we processed. + the number of non-internal event_callbacks that we processed. */ static int event_process_active_single_queue(struct event_base *base, - struct event_list *activeq, + struct evcallback_list *activeq, int max_to_process, const struct timeval *endtime) { - struct event *ev; + struct event_callback *evcb; int count = 0; EVUTIL_ASSERT(activeq != NULL); - for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) { - if (ev->ev_events & EV_PERSIST) - event_queue_remove_active(base, ev); - else - event_del_internal(ev); - if (!(ev->ev_flags & EVLIST_INTERNAL)) + for (evcb = TAILQ_FIRST(activeq); evcb; evcb = TAILQ_FIRST(activeq)) { + struct event *ev=NULL; + if (evcb->evcb_flags & EVLIST_INIT) { + ev = event_callback_to_event(evcb); + + if (ev->ev_events & EV_PERSIST) + event_queue_remove_active(base, evcb); + else + event_del_internal(ev); + event_debug(( + "event_process_active: event: %p, %s%scall %p", + ev, + ev->ev_res & EV_READ ? "EV_READ " : " ", + ev->ev_res & EV_WRITE ? "EV_WRITE " : " ", + ev->ev_callback)); + } else { + event_queue_remove_active(base, evcb); + event_debug(("event_process_active: event_callback %p, " + "closure %d, call %p", + evcb, evcb->evcb_closure, evcb->evcb_callback)); + } + + if (!(evcb->evcb_flags & EVLIST_INTERNAL)) ++count; - event_debug(( - "event_process_active: event: %p, %s%scall %p", - ev, - ev->ev_res & EV_READ ? "EV_READ " : " ", - ev->ev_res & EV_WRITE ? "EV_WRITE " : " ", - ev->ev_callback)); - base->current_event = ev; + base->current_event = evcb; #ifndef EVENT__DISABLE_THREAD_SUPPORT base->current_event_waiters = 0; #endif - switch (ev->ev_closure) { - case EV_CLOSURE_SIGNAL: + switch (evcb->evcb_closure) { + case EV_CLOSURE_EVENT_SIGNAL: event_signal_closure(base, ev); break; - case EV_CLOSURE_PERSIST: + case EV_CLOSURE_EVENT_PERSIST: event_persist_closure(base, ev); break; - default: - case EV_CLOSURE_NONE: + case EV_CLOSURE_EVENT: EVBASE_RELEASE_LOCK(base, th_base_lock); (*ev->ev_callback)( ev->ev_fd, ev->ev_res, ev->ev_arg); break; + default: + EVUTIL_ASSERT(0); } EVBASE_ACQUIRE_LOCK(base, th_base_lock); @@ -1517,7 +1548,7 @@ static int event_process_active(struct event_base *base) { /* Caller must hold th_base_lock */ - struct event_list *activeq = NULL; + struct evcallback_list *activeq = NULL; int i, c = 0; const struct timeval *endtime; struct timeval tv; @@ -1865,13 +1896,13 @@ event_assign(struct event *ev, struct event_base *base, evutil_socket_t fd, shor "EV_READ or EV_WRITE", __func__); return -1; } - ev->ev_closure = EV_CLOSURE_SIGNAL; + ev->ev_closure = EV_CLOSURE_EVENT_SIGNAL; } else { if (events & EV_PERSIST) { evutil_timerclear(&ev->ev_io_timeout); - ev->ev_closure = EV_CLOSURE_PERSIST; + ev->ev_closure = EV_CLOSURE_EVENT_PERSIST; } else { - ev->ev_closure = EV_CLOSURE_NONE; + ev->ev_closure = EV_CLOSURE_EVENT; } } @@ -1922,8 +1953,11 @@ event_base_get_running_event(struct event_base *base) { struct event *ev = NULL; EVBASE_ACQUIRE_LOCK(base, th_base_lock); - if (EVBASE_IN_THREAD(base)) - ev = base->current_event; + if (EVBASE_IN_THREAD(base)) { + struct event_callback *evcb = base->current_event; + if (evcb->evcb_flags & EVLIST_INIT) + ev = event_callback_to_event(evcb); + } EVBASE_RELEASE_LOCK(base, th_base_lock); return ev; } @@ -2192,7 +2226,8 @@ event_add_internal(struct event *ev, const struct timeval *tv, * until the callback is done before we mess with the event, or else * we can race on ev_ncalls and ev_pncalls below. */ #ifndef EVENT__DISABLE_THREAD_SUPPORT - if (base->current_event == ev && (ev->ev_events & EV_SIGNAL) + if (base->current_event == event_to_event_callback(ev) && + (ev->ev_events & EV_SIGNAL) && !EVBASE_IN_THREAD(base)) { ++base->current_event_waiters; EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock); @@ -2232,7 +2267,7 @@ event_add_internal(struct event *ev, const struct timeval *tv, * * If tv_is_absolute, this was already set. */ - if (ev->ev_closure == EV_CLOSURE_PERSIST && !tv_is_absolute) + if (ev->ev_closure == EV_CLOSURE_EVENT_PERSIST && !tv_is_absolute) ev->ev_io_timeout = *tv; #ifndef USE_REINSERT_TIMEOUT @@ -2256,7 +2291,7 @@ event_add_internal(struct event *ev, const struct timeval *tv, } } - event_queue_remove_active(base, ev); + event_queue_remove_active(base, event_to_event_callback(ev)); } gettime(base, &now); @@ -2356,7 +2391,8 @@ event_del_internal(struct event *ev) * user-supplied argument. */ base = ev->ev_base; #ifndef EVENT__DISABLE_THREAD_SUPPORT - if (base->current_event == ev && !EVBASE_IN_THREAD(base)) { + if (base->current_event == event_to_event_callback(ev) && + !EVBASE_IN_THREAD(base)) { ++base->current_event_waiters; EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock); } @@ -2384,7 +2420,7 @@ event_del_internal(struct event *ev) } if (ev->ev_flags & EVLIST_ACTIVE) - event_queue_remove_active(base, ev); + event_queue_remove_active(base, event_to_event_callback(ev)); if (ev->ev_flags & EVLIST_INSERTED) { event_queue_remove_inserted(base, ev); @@ -2452,7 +2488,8 @@ event_active_nolock_(struct event *ev, int res, short ncalls) if (ev->ev_events & EV_SIGNAL) { #ifndef EVENT__DISABLE_THREAD_SUPPORT - if (base->current_event == ev && !EVBASE_IN_THREAD(base)) { + if (base->current_event == event_to_event_callback(ev) && + !EVBASE_IN_THREAD(base)) { ++base->current_event_waiters; EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock); } @@ -2461,7 +2498,14 @@ event_active_nolock_(struct event *ev, int res, short ncalls) ev->ev_pncalls = NULL; } - event_queue_insert_active(base, ev); + event_callback_activate_nolock_(base, event_to_event_callback(ev)); +} + +void +event_callback_activate_nolock_(struct event_base *base, + struct event_callback *evcb) +{ + event_queue_insert_active(base, evcb); if (EVBASE_NEED_NOTIFY(base)) evthread_notify_base(base); @@ -2585,13 +2629,13 @@ timeout_process(struct event_base *base) #error "Mismatch for value of EVLIST_INTERNAL" #endif /* These are a fancy way to spell - if (~ev->ev_flags & EVLIST_INTERNAL) + if (flags & EVLIST_INTERNAL) base->event_count--/++; */ -#define DECR_EVENT_COUNT(base,ev) \ - ((base)->event_count -= (~((ev)->ev_flags >> 4) & 1)) -#define INCR_EVENT_COUNT(base, ev) \ - ((base)->event_count += (~((ev)->ev_flags >> 4) & 1)) +#define DECR_EVENT_COUNT(base,flags) \ + ((base)->event_count -= (~((flags) >> 4) & 1)) +#define INCR_EVENT_COUNT(base,flags) \ + ((base)->event_count += (~((flags) >> 4) & 1)) static void event_queue_remove_inserted(struct event_base *base, struct event *ev) @@ -2602,23 +2646,24 @@ event_queue_remove_inserted(struct event_base *base, struct event *ev) ev, ev->ev_fd, EVLIST_INSERTED); return; } - DECR_EVENT_COUNT(base, ev); + DECR_EVENT_COUNT(base, ev->ev_flags); ev->ev_flags &= ~EVLIST_INSERTED; } static void -event_queue_remove_active(struct event_base *base, struct event *ev) +event_queue_remove_active(struct event_base *base, struct event_callback *evcb) { EVENT_BASE_ASSERT_LOCKED(base); - if (EVUTIL_FAILURE_CHECK(!(ev->ev_flags & EVLIST_ACTIVE))) { - event_errx(1, "%s: %p(fd %d) not on queue %x", __func__, - ev, ev->ev_fd, EVLIST_ACTIVE); + if (EVUTIL_FAILURE_CHECK(!(evcb->evcb_flags & EVLIST_ACTIVE))) { + event_errx(1, "%s: %p not on queue %x", __func__, + evcb, EVLIST_ACTIVE); return; } - DECR_EVENT_COUNT(base, ev); - ev->ev_flags &= ~EVLIST_ACTIVE; + DECR_EVENT_COUNT(base, evcb->evcb_flags); + evcb->evcb_flags &= ~EVLIST_ACTIVE; base->event_count_active--; - TAILQ_REMOVE(&base->activequeues[ev->ev_pri], - ev, ev_active_next); + + TAILQ_REMOVE(&base->activequeues[evcb->evcb_pri], + evcb, evcb_active_next); } static void event_queue_remove_timeout(struct event_base *base, struct event *ev) @@ -2629,7 +2674,7 @@ event_queue_remove_timeout(struct event_base *base, struct event *ev) ev, ev->ev_fd, EVLIST_TIMEOUT); return; } - DECR_EVENT_COUNT(base, ev); + DECR_EVENT_COUNT(base, ev->ev_flags); ev->ev_flags &= ~EVLIST_TIMEOUT; if (is_common_timeout(&ev->ev_timeout, base)) { @@ -2725,28 +2770,28 @@ event_queue_insert_inserted(struct event_base *base, struct event *ev) return; } - INCR_EVENT_COUNT(base, ev); + INCR_EVENT_COUNT(base, ev->ev_flags); ev->ev_flags |= EVLIST_INSERTED; } static void -event_queue_insert_active(struct event_base *base, struct event *ev) +event_queue_insert_active(struct event_base *base, struct event_callback *evcb) { EVENT_BASE_ASSERT_LOCKED(base); - if (ev->ev_flags & EVLIST_ACTIVE) { + if (evcb->evcb_flags & EVLIST_ACTIVE) { /* Double insertion is possible for active events */ return; } - INCR_EVENT_COUNT(base, ev); + INCR_EVENT_COUNT(base, evcb->evcb_flags); - ev->ev_flags |= EVLIST_ACTIVE; + evcb->evcb_flags |= EVLIST_ACTIVE; base->event_count_active++; - TAILQ_INSERT_TAIL(&base->activequeues[ev->ev_pri], - ev,ev_active_next); + TAILQ_INSERT_TAIL(&base->activequeues[evcb->evcb_pri], + evcb, evcb_active_next); } static void @@ -2760,7 +2805,7 @@ event_queue_insert_timeout(struct event_base *base, struct event *ev) return; } - INCR_EVENT_COUNT(base, ev); + INCR_EVENT_COUNT(base, ev->ev_flags); ev->ev_flags |= EVLIST_TIMEOUT; @@ -3034,11 +3079,15 @@ event_base_foreach_event_(struct event_base *base, /* Finally, we deal wit all the active events that we haven't touched * yet. */ for (i = 0; i < base->nactivequeues; ++i) { - TAILQ_FOREACH(ev, &base->activequeues[i], ev_active_next) { - if (ev->ev_flags & (EVLIST_INSERTED|EVLIST_TIMEOUT)) { - /* we already processed this one */ + struct event_callback *evcb; + TAILQ_FOREACH(evcb, &base->activequeues[i], evcb_active_next) { + if ((evcb->evcb_flags & (EVLIST_INIT|EVLIST_INSERTED|EVLIST_TIMEOUT)) != EVLIST_INIT) { + /* This isn't an event (evlist_init clear), or + * we already processed it. (inserted or + * timeout set */ continue; } + ev = event_callback_to_event(evcb); if ((r = fn(base, ev, arg))) return r; } @@ -3231,11 +3280,11 @@ event_base_assert_ok_(struct event_base *base) /* Check the active queues. */ for (i = 0; i < base->nactivequeues; ++i) { - struct event *ev; - EVUTIL_ASSERT_TAILQ_OK(&base->activequeues[i], event, ev_active_next); - TAILQ_FOREACH(ev, &base->activequeues[i], ev_active_next) { - EVUTIL_ASSERT(ev->ev_pri == i); - EVUTIL_ASSERT(ev->ev_flags & EVLIST_ACTIVE); + struct event_callback *evcb; + EVUTIL_ASSERT_TAILQ_OK(&base->activequeues[i], event_callback, evcb_active_next); + TAILQ_FOREACH(evcb, &base->activequeues[i], evcb_active_next) { + EVUTIL_ASSERT(evcb->evcb_flags & EVLIST_ACTIVE); + EVUTIL_ASSERT(evcb->evcb_pri == i); } } diff --git a/include/event2/event_struct.h b/include/event2/event_struct.h index 44bb6f7e..6f30bde9 100644 --- a/include/event2/event_struct.h +++ b/include/event2/event_struct.h @@ -93,9 +93,19 @@ struct { \ } #endif /* !TAILQ_ENTRY */ +struct event_callback { + TAILQ_ENTRY(event_callback) evcb_active_next; + short evcb_flags; + ev_uint8_t evcb_pri; /* smaller numbers are higher priority */ + ev_uint8_t evcb_closure; + /* allows us to adopt for different types of events */ + void (*evcb_callback)(evutil_socket_t, short, void *arg); + void *evcb_arg; +}; + struct event_base; struct event { - TAILQ_ENTRY(event) ev_active_next; + struct event_callback ev_evcallback; /* for managing timeouts */ union { @@ -124,14 +134,7 @@ struct event { short ev_events; short ev_res; /* result passed to event callback */ - short ev_flags; - ev_uint8_t ev_pri; /* smaller numbers are higher priority */ - ev_uint8_t ev_closure; struct timeval ev_timeout; - - /* allows us to adopt for different types of events */ - void (*ev_callback)(evutil_socket_t, short, void *arg); - void *ev_arg; }; TAILQ_HEAD (event_list, event); -- 2.50.1