const struct timeval *tv, int tv_is_absolute);
static inline int event_del_internal(struct event *ev);
-static void event_queue_insert_active(struct event_base *, struct event *);
+static void event_queue_insert_active(struct event_base *, struct event_callback *);
static void event_queue_insert_timeout(struct event_base *, struct event *);
static void event_queue_insert_inserted(struct event_base *, struct event *);
-static void event_queue_remove_active(struct event_base *, struct event *);
+static void event_queue_remove_active(struct event_base *, struct event_callback *);
static void event_queue_remove_timeout(struct event_base *, struct event *);
static void event_queue_remove_inserted(struct event_base *, struct event *);
#ifdef USE_REINSERT_TIMEOUT
return 0;
}
+static inline struct event *
+event_callback_to_event(struct event_callback *evcb)
+{
+ EVUTIL_ASSERT((evcb->evcb_flags & EVLIST_INIT));
+ return EVUTIL_UPCAST(evcb, struct event, ev_evcallback);
+}
+
+static inline struct event_callback *
+event_to_event_callback(struct event *ev)
+{
+ return &ev->ev_evcallback;
+}
+
struct event_base *
event_init(void)
{
mm_free(base->common_timeout_queues);
for (i = 0; i < base->nactivequeues; ++i) {
- for (ev = TAILQ_FIRST(&base->activequeues[i]); ev; ) {
- struct event *next = TAILQ_NEXT(ev, ev_active_next);
- if (!(ev->ev_flags & EVLIST_INTERNAL)) {
- event_del(ev);
- ++n_deleted;
+ struct event_callback *evcb, *next;
+ for (evcb = TAILQ_FIRST(&base->activequeues[i]); evcb; ) {
+ next = TAILQ_NEXT(evcb, evcb_active_next);
+ if (evcb->evcb_flags & EVLIST_INIT) {
+ ev = event_callback_to_event(evcb);
+ if (!(ev->ev_flags & EVLIST_INTERNAL)) {
+ event_del(ev);
+ ++n_deleted;
+ }
+ } else {
+ event_queue_remove_active(base, evcb);
}
- ev = next;
+ evcb = next;
}
}
}
/* Allocate our priority queues */
- base->activequeues = (struct event_list *)
- mm_calloc(npriorities, sizeof(struct event_list));
+ base->activequeues = (struct evcallback_list *)
+ mm_calloc(npriorities, sizeof(struct evcallback_list));
if (base->activequeues == NULL) {
event_warn("%s: calloc", __func__);
goto err;
releasing the lock as we go. This function requires that the lock be held
when it's invoked. Returns -1 if we get a signal or an event_break that
means we should stop processing any active events now. Otherwise returns
- the number of non-internal events that we processed.
+ the number of non-internal event_callbacks that we processed.
*/
static int
event_process_active_single_queue(struct event_base *base,
- struct event_list *activeq,
+ struct evcallback_list *activeq,
int max_to_process, const struct timeval *endtime)
{
- struct event *ev;
+ struct event_callback *evcb;
int count = 0;
EVUTIL_ASSERT(activeq != NULL);
- for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) {
- if (ev->ev_events & EV_PERSIST)
- event_queue_remove_active(base, ev);
- else
- event_del_internal(ev);
- if (!(ev->ev_flags & EVLIST_INTERNAL))
+ for (evcb = TAILQ_FIRST(activeq); evcb; evcb = TAILQ_FIRST(activeq)) {
+ struct event *ev=NULL;
+ if (evcb->evcb_flags & EVLIST_INIT) {
+ ev = event_callback_to_event(evcb);
+
+ if (ev->ev_events & EV_PERSIST)
+ event_queue_remove_active(base, evcb);
+ else
+ event_del_internal(ev);
+ event_debug((
+ "event_process_active: event: %p, %s%scall %p",
+ ev,
+ ev->ev_res & EV_READ ? "EV_READ " : " ",
+ ev->ev_res & EV_WRITE ? "EV_WRITE " : " ",
+ ev->ev_callback));
+ } else {
+ event_queue_remove_active(base, evcb);
+ event_debug(("event_process_active: event_callback %p, "
+ "closure %d, call %p",
+ evcb, evcb->evcb_closure, evcb->evcb_callback));
+ }
+
+ if (!(evcb->evcb_flags & EVLIST_INTERNAL))
++count;
- event_debug((
- "event_process_active: event: %p, %s%scall %p",
- ev,
- ev->ev_res & EV_READ ? "EV_READ " : " ",
- ev->ev_res & EV_WRITE ? "EV_WRITE " : " ",
- ev->ev_callback));
- base->current_event = ev;
+ base->current_event = evcb;
#ifndef EVENT__DISABLE_THREAD_SUPPORT
base->current_event_waiters = 0;
#endif
- switch (ev->ev_closure) {
- case EV_CLOSURE_SIGNAL:
+ switch (evcb->evcb_closure) {
+ case EV_CLOSURE_EVENT_SIGNAL:
event_signal_closure(base, ev);
break;
- case EV_CLOSURE_PERSIST:
+ case EV_CLOSURE_EVENT_PERSIST:
event_persist_closure(base, ev);
break;
- default:
- case EV_CLOSURE_NONE:
+ case EV_CLOSURE_EVENT:
EVBASE_RELEASE_LOCK(base, th_base_lock);
(*ev->ev_callback)(
ev->ev_fd, ev->ev_res, ev->ev_arg);
break;
+ default:
+ EVUTIL_ASSERT(0);
}
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
event_process_active(struct event_base *base)
{
/* Caller must hold th_base_lock */
- struct event_list *activeq = NULL;
+ struct evcallback_list *activeq = NULL;
int i, c = 0;
const struct timeval *endtime;
struct timeval tv;
"EV_READ or EV_WRITE", __func__);
return -1;
}
- ev->ev_closure = EV_CLOSURE_SIGNAL;
+ ev->ev_closure = EV_CLOSURE_EVENT_SIGNAL;
} else {
if (events & EV_PERSIST) {
evutil_timerclear(&ev->ev_io_timeout);
- ev->ev_closure = EV_CLOSURE_PERSIST;
+ ev->ev_closure = EV_CLOSURE_EVENT_PERSIST;
} else {
- ev->ev_closure = EV_CLOSURE_NONE;
+ ev->ev_closure = EV_CLOSURE_EVENT;
}
}
{
struct event *ev = NULL;
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
- if (EVBASE_IN_THREAD(base))
- ev = base->current_event;
+ if (EVBASE_IN_THREAD(base)) {
+ struct event_callback *evcb = base->current_event;
+ if (evcb->evcb_flags & EVLIST_INIT)
+ ev = event_callback_to_event(evcb);
+ }
EVBASE_RELEASE_LOCK(base, th_base_lock);
return ev;
}
* until the callback is done before we mess with the event, or else
* we can race on ev_ncalls and ev_pncalls below. */
#ifndef EVENT__DISABLE_THREAD_SUPPORT
- if (base->current_event == ev && (ev->ev_events & EV_SIGNAL)
+ if (base->current_event == event_to_event_callback(ev) &&
+ (ev->ev_events & EV_SIGNAL)
&& !EVBASE_IN_THREAD(base)) {
++base->current_event_waiters;
EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
*
* If tv_is_absolute, this was already set.
*/
- if (ev->ev_closure == EV_CLOSURE_PERSIST && !tv_is_absolute)
+ if (ev->ev_closure == EV_CLOSURE_EVENT_PERSIST && !tv_is_absolute)
ev->ev_io_timeout = *tv;
#ifndef USE_REINSERT_TIMEOUT
}
}
- event_queue_remove_active(base, ev);
+ event_queue_remove_active(base, event_to_event_callback(ev));
}
gettime(base, &now);
* user-supplied argument. */
base = ev->ev_base;
#ifndef EVENT__DISABLE_THREAD_SUPPORT
- if (base->current_event == ev && !EVBASE_IN_THREAD(base)) {
+ if (base->current_event == event_to_event_callback(ev) &&
+ !EVBASE_IN_THREAD(base)) {
++base->current_event_waiters;
EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
}
}
if (ev->ev_flags & EVLIST_ACTIVE)
- event_queue_remove_active(base, ev);
+ event_queue_remove_active(base, event_to_event_callback(ev));
if (ev->ev_flags & EVLIST_INSERTED) {
event_queue_remove_inserted(base, ev);
if (ev->ev_events & EV_SIGNAL) {
#ifndef EVENT__DISABLE_THREAD_SUPPORT
- if (base->current_event == ev && !EVBASE_IN_THREAD(base)) {
+ if (base->current_event == event_to_event_callback(ev) &&
+ !EVBASE_IN_THREAD(base)) {
++base->current_event_waiters;
EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
}
ev->ev_pncalls = NULL;
}
- event_queue_insert_active(base, ev);
+ event_callback_activate_nolock_(base, event_to_event_callback(ev));
+}
+
+void
+event_callback_activate_nolock_(struct event_base *base,
+ struct event_callback *evcb)
+{
+ event_queue_insert_active(base, evcb);
if (EVBASE_NEED_NOTIFY(base))
evthread_notify_base(base);
#error "Mismatch for value of EVLIST_INTERNAL"
#endif
/* These are a fancy way to spell
- if (~ev->ev_flags & EVLIST_INTERNAL)
+ if (flags & EVLIST_INTERNAL)
base->event_count--/++;
*/
-#define DECR_EVENT_COUNT(base,ev) \
- ((base)->event_count -= (~((ev)->ev_flags >> 4) & 1))
-#define INCR_EVENT_COUNT(base, ev) \
- ((base)->event_count += (~((ev)->ev_flags >> 4) & 1))
+#define DECR_EVENT_COUNT(base,flags) \
+ ((base)->event_count -= (~((flags) >> 4) & 1))
+#define INCR_EVENT_COUNT(base,flags) \
+ ((base)->event_count += (~((flags) >> 4) & 1))
static void
event_queue_remove_inserted(struct event_base *base, struct event *ev)
ev, ev->ev_fd, EVLIST_INSERTED);
return;
}
- DECR_EVENT_COUNT(base, ev);
+ DECR_EVENT_COUNT(base, ev->ev_flags);
ev->ev_flags &= ~EVLIST_INSERTED;
}
static void
-event_queue_remove_active(struct event_base *base, struct event *ev)
+event_queue_remove_active(struct event_base *base, struct event_callback *evcb)
{
EVENT_BASE_ASSERT_LOCKED(base);
- if (EVUTIL_FAILURE_CHECK(!(ev->ev_flags & EVLIST_ACTIVE))) {
- event_errx(1, "%s: %p(fd %d) not on queue %x", __func__,
- ev, ev->ev_fd, EVLIST_ACTIVE);
+ if (EVUTIL_FAILURE_CHECK(!(evcb->evcb_flags & EVLIST_ACTIVE))) {
+ event_errx(1, "%s: %p not on queue %x", __func__,
+ evcb, EVLIST_ACTIVE);
return;
}
- DECR_EVENT_COUNT(base, ev);
- ev->ev_flags &= ~EVLIST_ACTIVE;
+ DECR_EVENT_COUNT(base, evcb->evcb_flags);
+ evcb->evcb_flags &= ~EVLIST_ACTIVE;
base->event_count_active--;
- TAILQ_REMOVE(&base->activequeues[ev->ev_pri],
- ev, ev_active_next);
+
+ TAILQ_REMOVE(&base->activequeues[evcb->evcb_pri],
+ evcb, evcb_active_next);
}
static void
event_queue_remove_timeout(struct event_base *base, struct event *ev)
ev, ev->ev_fd, EVLIST_TIMEOUT);
return;
}
- DECR_EVENT_COUNT(base, ev);
+ DECR_EVENT_COUNT(base, ev->ev_flags);
ev->ev_flags &= ~EVLIST_TIMEOUT;
if (is_common_timeout(&ev->ev_timeout, base)) {
return;
}
- INCR_EVENT_COUNT(base, ev);
+ INCR_EVENT_COUNT(base, ev->ev_flags);
ev->ev_flags |= EVLIST_INSERTED;
}
static void
-event_queue_insert_active(struct event_base *base, struct event *ev)
+event_queue_insert_active(struct event_base *base, struct event_callback *evcb)
{
EVENT_BASE_ASSERT_LOCKED(base);
- if (ev->ev_flags & EVLIST_ACTIVE) {
+ if (evcb->evcb_flags & EVLIST_ACTIVE) {
/* Double insertion is possible for active events */
return;
}
- INCR_EVENT_COUNT(base, ev);
+ INCR_EVENT_COUNT(base, evcb->evcb_flags);
- ev->ev_flags |= EVLIST_ACTIVE;
+ evcb->evcb_flags |= EVLIST_ACTIVE;
base->event_count_active++;
- TAILQ_INSERT_TAIL(&base->activequeues[ev->ev_pri],
- ev,ev_active_next);
+ TAILQ_INSERT_TAIL(&base->activequeues[evcb->evcb_pri],
+ evcb, evcb_active_next);
}
static void
return;
}
- INCR_EVENT_COUNT(base, ev);
+ INCR_EVENT_COUNT(base, ev->ev_flags);
ev->ev_flags |= EVLIST_TIMEOUT;
/* Finally, we deal wit all the active events that we haven't touched
* yet. */
for (i = 0; i < base->nactivequeues; ++i) {
- TAILQ_FOREACH(ev, &base->activequeues[i], ev_active_next) {
- if (ev->ev_flags & (EVLIST_INSERTED|EVLIST_TIMEOUT)) {
- /* we already processed this one */
+ struct event_callback *evcb;
+ TAILQ_FOREACH(evcb, &base->activequeues[i], evcb_active_next) {
+ if ((evcb->evcb_flags & (EVLIST_INIT|EVLIST_INSERTED|EVLIST_TIMEOUT)) != EVLIST_INIT) {
+ /* This isn't an event (evlist_init clear), or
+ * we already processed it. (inserted or
+ * timeout set */
continue;
}
+ ev = event_callback_to_event(evcb);
if ((r = fn(base, ev, arg)))
return r;
}
/* Check the active queues. */
for (i = 0; i < base->nactivequeues; ++i) {
- struct event *ev;
- EVUTIL_ASSERT_TAILQ_OK(&base->activequeues[i], event, ev_active_next);
- TAILQ_FOREACH(ev, &base->activequeues[i], ev_active_next) {
- EVUTIL_ASSERT(ev->ev_pri == i);
- EVUTIL_ASSERT(ev->ev_flags & EVLIST_ACTIVE);
+ struct event_callback *evcb;
+ EVUTIL_ASSERT_TAILQ_OK(&base->activequeues[i], event_callback, evcb_active_next);
+ TAILQ_FOREACH(evcb, &base->activequeues[i], evcb_active_next) {
+ EVUTIL_ASSERT(evcb->evcb_flags & EVLIST_ACTIVE);
+ EVUTIL_ASSERT(evcb->evcb_pri == i);
}
}