if (dent) { \
dent->added = 1; \
} else { \
- event_errx(_EVENT_ERR_ABORT, \
+ event_errx(EVENT_ERR_ABORT_, \
"%s: noting an add on a non-setup event %p" \
- " (events: 0x%x, fd: %d, flags: 0x%x)", \
+ " (events: 0x%x, fd: "EV_SOCK_FMT \
+ ", flags: 0x%x)", \
__func__, (ev), (ev)->ev_events, \
- (ev)->ev_fd, (ev)->ev_flags); \
+ EV_SOCK_ARG((ev)->ev_fd), (ev)->ev_flags); \
} \
- EVLOCK_UNLOCK(_event_debug_map_lock, 0); \
+ EVLOCK_UNLOCK(event_debug_map_lock_, 0); \
} \
event_debug_mode_too_late = 1; \
} while (0)
if (dent) { \
dent->added = 0; \
} else { \
- event_errx(_EVENT_ERR_ABORT, \
+ event_errx(EVENT_ERR_ABORT_, \
"%s: noting a del on a non-setup event %p" \
- " (events: 0x%x, fd: %d, flags: 0x%x)", \
+ " (events: 0x%x, fd: "EV_SOCK_FMT \
+ ", flags: 0x%x)", \
__func__, (ev), (ev)->ev_events, \
- (ev)->ev_fd, (ev)->ev_flags); \
+ EV_SOCK_ARG((ev)->ev_fd), (ev)->ev_flags); \
} \
- EVLOCK_UNLOCK(_event_debug_map_lock, 0); \
+ EVLOCK_UNLOCK(event_debug_map_lock_, 0); \
} \
event_debug_mode_too_late = 1; \
} while (0)
/* Macro: assert that ev is setup (i.e., okay to add or inspect) */
-#define _event_debug_assert_is_setup(ev) do { \
- if (_event_debug_mode_on) { \
+#define event_debug_assert_is_setup_(ev) do { \
+ if (event_debug_mode_on_) { \
struct event_debug_entry *dent,find; \
find.ptr = (ev); \
- EVLOCK_LOCK(_event_debug_map_lock, 0); \
+ EVLOCK_LOCK(event_debug_map_lock_, 0); \
dent = HT_FIND(event_debug_map, &global_debug_map, &find); \
if (!dent) { \
- event_errx(_EVENT_ERR_ABORT, \
+ event_errx(EVENT_ERR_ABORT_, \
"%s called on a non-initialized event %p" \
- " (events: 0x%x, fd: %d, flags: 0x%x)", \
+ " (events: 0x%x, fd: "EV_SOCK_FMT\
+ ", flags: 0x%x)", \
__func__, (ev), (ev)->ev_events, \
- (ev)->ev_fd, (ev)->ev_flags); \
+ EV_SOCK_ARG((ev)->ev_fd), (ev)->ev_flags); \
} \
- EVLOCK_UNLOCK(_event_debug_map_lock, 0); \
+ EVLOCK_UNLOCK(event_debug_map_lock_, 0); \
} \
} while (0)
/* Macro: assert that ev is not added (i.e., okay to tear down or set
* up again) */
-#define _event_debug_assert_not_added(ev) do { \
- if (_event_debug_mode_on) { \
+#define event_debug_assert_not_added_(ev) do { \
+ if (event_debug_mode_on_) { \
struct event_debug_entry *dent,find; \
find.ptr = (ev); \
- EVLOCK_LOCK(_event_debug_map_lock, 0); \
+ EVLOCK_LOCK(event_debug_map_lock_, 0); \
dent = HT_FIND(event_debug_map, &global_debug_map, &find); \
if (dent && dent->added) { \
- event_errx(_EVENT_ERR_ABORT, \
+ event_errx(EVENT_ERR_ABORT_, \
"%s called on an already added event %p" \
- " (events: 0x%x, fd: %d, flags: 0x%x)", \
+ " (events: 0x%x, fd: "EV_SOCK_FMT", " \
+ "flags: 0x%x)", \
__func__, (ev), (ev)->ev_events, \
- (ev)->ev_fd, (ev)->ev_flags); \
+ EV_SOCK_ARG((ev)->ev_fd), (ev)->ev_flags); \
} \
- EVLOCK_UNLOCK(_event_debug_map_lock, 0); \
+ EVLOCK_UNLOCK(event_debug_map_lock_, 0); \
} \
} while (0)
#else
int notify = 0;
EVENT_BASE_ASSERT_LOCKED(base);
- _event_debug_assert_is_setup(ev);
+ event_debug_assert_is_setup_(ev);
event_debug((
- "event_add: event: %p (fd %d), %s%s%scall %p",
+ "event_add: event: %p (fd "EV_SOCK_FMT"), %s%s%scall %p",
ev,
- (int)ev->ev_fd,
+ EV_SOCK_ARG(ev->ev_fd),
ev->ev_events & EV_READ ? "EV_READ " : " ",
ev->ev_events & EV_WRITE ? "EV_WRITE " : " ",
tv ? "EV_TIMEOUT " : " ",
{
struct event_base *base;
- event_debug(("event_active: %p (fd %d), res %d, callback %p",
- ev, (int)ev->ev_fd, (int)res, ev->ev_callback));
+ event_debug(("event_active: %p (fd "EV_SOCK_FMT"), res %d, callback %p",
+ ev, EV_SOCK_ARG(ev->ev_fd), (int)res, ev->ev_callback));
+ base = ev->ev_base;
+ EVENT_BASE_ASSERT_LOCKED(base);
- /* We get different kinds of events, add them together */
- if (ev->ev_flags & EVLIST_ACTIVE) {
+ switch ((ev->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))) {
+ default:
+ case EVLIST_ACTIVE|EVLIST_ACTIVE_LATER:
+ EVUTIL_ASSERT(0);
+ break;
+ case EVLIST_ACTIVE:
+ /* We get different kinds of events, add them together */
ev->ev_res |= res;
return;
+ case EVLIST_ACTIVE_LATER:
+ ev->ev_res |= res;
+ break;
+ case 0:
+ ev->ev_res = res;
+ break;
}
- base = ev->ev_base;
-
- EVENT_BASE_ASSERT_LOCKED(base);
-
- ev->ev_res = res;
-
if (ev->ev_pri < base->event_running_priority)
base->event_continue = 1;
break;
/* delete this event from the I/O queues */
- event_del_internal(ev);
+ event_del_nolock_(ev);
+
+ event_debug(("timeout_process: event: %p, call %p",
+ ev, ev->ev_callback));
+ event_active_nolock_(ev, EV_TIMEOUT, 1);
+ }
+}
- event_debug(("timeout_process: call %p",
- ev->ev_callback));
- event_active_nolock(ev, EV_TIMEOUT, 1);
+#if (EVLIST_INTERNAL >> 4) != 1
+#error "Mismatch for value of EVLIST_INTERNAL"
+#endif
+/* These are a fancy way to spell
+ if (flags & EVLIST_INTERNAL)
+ base->event_count--/++;
+*/
+#define DECR_EVENT_COUNT(base,flags) \
+ ((base)->event_count -= (~((flags) >> 4) & 1))
+#define INCR_EVENT_COUNT(base,flags) \
+ ((base)->event_count += (~((flags) >> 4) & 1))
+
+static void
+event_queue_remove_inserted(struct event_base *base, struct event *ev)
+{
+ EVENT_BASE_ASSERT_LOCKED(base);
+ if (EVUTIL_FAILURE_CHECK(!(ev->ev_flags & EVLIST_INSERTED))) {
+ event_errx(1, "%s: %p(fd %d) not on queue %x", __func__,
+ ev, ev->ev_fd, EVLIST_INSERTED);
+ return;
}
+ DECR_EVENT_COUNT(base, ev->ev_flags);
+ ev->ev_flags &= ~EVLIST_INSERTED;
}
+static void
+event_queue_remove_active(struct event_base *base, struct event_callback *evcb)
+{
+ EVENT_BASE_ASSERT_LOCKED(base);
+ if (EVUTIL_FAILURE_CHECK(!(evcb->evcb_flags & EVLIST_ACTIVE))) {
+ event_errx(1, "%s: %p not on queue %x", __func__,
+ evcb, EVLIST_ACTIVE);
+ return;
+ }
+ DECR_EVENT_COUNT(base, evcb->evcb_flags);
+ evcb->evcb_flags &= ~EVLIST_ACTIVE;
+ base->event_count_active--;
-/* Remove 'ev' from 'queue' (EVLIST_...) in base. */
+ TAILQ_REMOVE(&base->activequeues[evcb->evcb_pri],
+ evcb, evcb_active_next);
+}
static void
-event_queue_remove(struct event_base *base, struct event *ev, int queue)
+event_queue_remove_active_later(struct event_base *base, struct event_callback *evcb)
{
EVENT_BASE_ASSERT_LOCKED(base);
+ if (EVUTIL_FAILURE_CHECK(!(evcb->evcb_flags & EVLIST_ACTIVE_LATER))) {
+ event_errx(1, "%s: %p not on queue %x", __func__,
+ evcb, EVLIST_ACTIVE_LATER);
+ return;
+ }
+ DECR_EVENT_COUNT(base, evcb->evcb_flags);
+ evcb->evcb_flags &= ~EVLIST_ACTIVE_LATER;
+ base->event_count_active--;
- if (!(ev->ev_flags & queue)) {
+ TAILQ_REMOVE(&base->active_later_queue, evcb, evcb_active_next);
+}
+static void
+event_queue_remove_timeout(struct event_base *base, struct event *ev)
+{
+ EVENT_BASE_ASSERT_LOCKED(base);
+ if (EVUTIL_FAILURE_CHECK(!(ev->ev_flags & EVLIST_TIMEOUT))) {
- event_errx(1, "%s: %p(fd %d) not on queue %x", __func__,
- ev, ev->ev_fd, EVLIST_TIMEOUT);
+ event_errx(1, "%s: %p(fd "EV_SOCK_FMT") not on queue %x", __func__,
- ev, EV_SOCK_ARG(ev->ev_fd), queue);
++ ev, EV_SOCK_ARG(ev->ev_fd), EVLIST_TIMEOUT);
return;
}
+ DECR_EVENT_COUNT(base, ev->ev_flags);
+ ev->ev_flags &= ~EVLIST_TIMEOUT;
- if (~ev->ev_flags & EVLIST_INTERNAL)
- base->event_count--;
+ if (is_common_timeout(&ev->ev_timeout, base)) {
+ struct common_timeout_list *ctl =
+ get_common_timeout_list(base, &ev->ev_timeout);
+ TAILQ_REMOVE(&ctl->events, ev,
+ ev_timeout_pos.ev_next_with_common_timeout);
+ } else {
+ min_heap_erase_(&base->timeheap, ev);
+ }
+}
- ev->ev_flags &= ~queue;
- switch (queue) {
- case EVLIST_INSERTED:
- TAILQ_REMOVE(&base->eventqueue, ev, ev_next);
+#ifdef USE_REINSERT_TIMEOUT
+/* Remove and reinsert 'ev' into the timeout queue. */
+static void
+event_queue_reinsert_timeout(struct event_base *base, struct event *ev,
+ int was_common, int is_common, int old_timeout_idx)
+{
+ struct common_timeout_list *ctl;
+ if (!(ev->ev_flags & EVLIST_TIMEOUT)) {
+ event_queue_insert_timeout(base, ev);
+ return;
+ }
+
+ switch ((was_common<<1) | is_common) {
+ case 3: /* Changing from one common timeout to another */
+ ctl = base->common_timeout_queues[old_timeout_idx];
+ TAILQ_REMOVE(&ctl->events, ev,
+ ev_timeout_pos.ev_next_with_common_timeout);
+ ctl = get_common_timeout_list(base, &ev->ev_timeout);
+ insert_common_timeout_inorder(ctl, ev);
break;
- case EVLIST_ACTIVE:
- base->event_count_active--;
- TAILQ_REMOVE(&base->activequeues[ev->ev_pri],
- ev, ev_active_next);
+ case 2: /* Was common; is no longer common */
+ ctl = base->common_timeout_queues[old_timeout_idx];
+ TAILQ_REMOVE(&ctl->events, ev,
+ ev_timeout_pos.ev_next_with_common_timeout);
+ min_heap_push_(&base->timeheap, ev);
break;
- case EVLIST_TIMEOUT:
- if (is_common_timeout(&ev->ev_timeout, base)) {
- struct common_timeout_list *ctl =
- get_common_timeout_list(base, &ev->ev_timeout);
- TAILQ_REMOVE(&ctl->events, ev,
- ev_timeout_pos.ev_next_with_common_timeout);
- } else {
- min_heap_erase(&base->timeheap, ev);
- }
+ case 1: /* Wasn't common; has become common. */
+ min_heap_erase_(&base->timeheap, ev);
+ ctl = get_common_timeout_list(base, &ev->ev_timeout);
+ insert_common_timeout_inorder(ctl, ev);
+ break;
+ case 0: /* was in heap; is still on heap. */
+ min_heap_adjust_(&base->timeheap, ev);
break;
default:
- event_errx(1, "%s: unknown queue %x", __func__, queue);
+ EVUTIL_ASSERT(0); /* unreachable */
+ break;
}
}
+#endif
/* Add 'ev' to the common timeout list in 'ev'. */
static void
}
static void
-event_queue_insert(struct event_base *base, struct event *ev, int queue)
+event_queue_insert_inserted(struct event_base *base, struct event *ev)
+{
+ EVENT_BASE_ASSERT_LOCKED(base);
+
+ if (EVUTIL_FAILURE_CHECK(ev->ev_flags & EVLIST_INSERTED)) {
- event_errx(1, "%s: %p(fd %d) already inserted", __func__,
- ev, ev->ev_fd);
++ event_errx(1, "%s: %p(fd "EV_SOCK_FMT") already inserted", __func__,
++ ev, EV_SOCK_ARG(ev->ev_fd));
+ return;
+ }
+
+ INCR_EVENT_COUNT(base, ev->ev_flags);
+
+ ev->ev_flags |= EVLIST_INSERTED;
+}
+
+static void
+event_queue_insert_active(struct event_base *base, struct event_callback *evcb)
{
EVENT_BASE_ASSERT_LOCKED(base);
return;
}
- if (~ev->ev_flags & EVLIST_INTERNAL)
- base->event_count++;
+ INCR_EVENT_COUNT(base, evcb->evcb_flags);
+ evcb->evcb_flags |= EVLIST_ACTIVE_LATER;
+ base->event_count_active++;
+ EVUTIL_ASSERT(evcb->evcb_pri < base->nactivequeues);
+ TAILQ_INSERT_TAIL(&base->active_later_queue, evcb, evcb_active_next);
+}
- ev->ev_flags |= queue;
- switch (queue) {
- case EVLIST_INSERTED:
- TAILQ_INSERT_TAIL(&base->eventqueue, ev, ev_next);
- break;
- case EVLIST_ACTIVE:
- base->event_count_active++;
- TAILQ_INSERT_TAIL(&base->activequeues[ev->ev_pri],
- ev,ev_active_next);
- break;
- case EVLIST_TIMEOUT: {
- if (is_common_timeout(&ev->ev_timeout, base)) {
- struct common_timeout_list *ctl =
- get_common_timeout_list(base, &ev->ev_timeout);
- insert_common_timeout_inorder(ctl, ev);
- } else
- min_heap_push(&base->timeheap, ev);
- break;
+static void
+event_queue_insert_timeout(struct event_base *base, struct event *ev)
+{
+ EVENT_BASE_ASSERT_LOCKED(base);
+
+ if (EVUTIL_FAILURE_CHECK(ev->ev_flags & EVLIST_TIMEOUT)) {
- event_errx(1, "%s: %p(fd %d) already on timeout", __func__,
- ev, ev->ev_fd);
++ event_errx(1, "%s: %p(fd "EV_SOCK_FMT") already on timeout", __func__,
++ ev, EV_SOCK_ARG(ev->ev_fd));
+ return;
}
- default:
- event_errx(1, "%s: unknown queue %x", __func__, queue);
+
+ INCR_EVENT_COUNT(base, ev->ev_flags);
+
+ ev->ev_flags |= EVLIST_TIMEOUT;
+
+ if (is_common_timeout(&ev->ev_timeout, base)) {
+ struct common_timeout_list *ctl =
+ get_common_timeout_list(base, &ev->ev_timeout);
+ insert_common_timeout_inorder(ctl, ev);
+ } else {
+ min_heap_push_(&base->timeheap, ev);
+ }
+}
+
+static void
+event_queue_make_later_events_active(struct event_base *base)
+{
+ struct event_callback *evcb;
+ EVENT_BASE_ASSERT_LOCKED(base);
+
+ while ((evcb = TAILQ_FIRST(&base->active_later_queue))) {
+ TAILQ_REMOVE(&base->active_later_queue, evcb, evcb_active_next);
+ evcb->evcb_flags = (evcb->evcb_flags & ~EVLIST_ACTIVE_LATER) | EVLIST_ACTIVE;
+ EVUTIL_ASSERT(evcb->evcb_pri < base->nactivequeues);
+ TAILQ_INSERT_TAIL(&base->activequeues[evcb->evcb_pri], evcb, evcb_active_next);
+ base->n_deferreds_queued += (evcb->evcb_closure == EV_CLOSURE_CB_SELF);
}
}
base->th_notify.ev_flags |= EVLIST_INTERNAL;
event_priority_set(&base->th_notify, 0);
- return event_add(&base->th_notify, NULL);
+ return event_add_nolock_(&base->th_notify, NULL, 0);
}
-void
-event_base_dump_events(struct event_base *base, FILE *output)
+int
+event_base_foreach_event_nolock_(struct event_base *base,
+ event_base_foreach_event_cb fn, void *arg)
{
- struct event *e;
- int i;
- fprintf(output, "Inserted events:\n");
- TAILQ_FOREACH(e, &base->eventqueue, ev_next) {
- fprintf(output, " %p [fd "EV_SOCK_FMT"]%s%s%s%s%s\n",
- (void*)e, EV_SOCK_ARG(e->ev_fd),
- (e->ev_events&EV_READ)?" Read":"",
- (e->ev_events&EV_WRITE)?" Write":"",
- (e->ev_events&EV_SIGNAL)?" Signal":"",
- (e->ev_events&EV_TIMEOUT)?" Timeout":"",
- (e->ev_events&EV_PERSIST)?" Persist":"");
+ int r, i;
+ unsigned u;
+ struct event *ev;
+
+ /* Start out with all the EVLIST_INSERTED events. */
+ if ((r = evmap_foreach_event_(base, fn, arg)))
+ return r;
+
+ /* Okay, now we deal with those events that have timeouts and are in
+ * the min-heap. */
+ for (u = 0; u < base->timeheap.n; ++u) {
+ ev = base->timeheap.p[u];
+ if (ev->ev_flags & EVLIST_INSERTED) {
+ /* we already processed this one */
+ continue;
+ }
+ if ((r = fn(base, ev, arg)))
+ return r;
+ }
+ /* Now for the events in one of the timeout queues.
+ * the min-heap. */
+ for (i = 0; i < base->n_common_timeouts; ++i) {
+ struct common_timeout_list *ctl =
+ base->common_timeout_queues[i];
+ TAILQ_FOREACH(ev, &ctl->events,
+ ev_timeout_pos.ev_next_with_common_timeout) {
+ if (ev->ev_flags & EVLIST_INSERTED) {
+ /* we already processed this one */
+ continue;
+ }
+ if ((r = fn(base, ev, arg)))
+ return r;
+ }
}
+
+ /* Finally, we deal wit all the active events that we haven't touched
+ * yet. */
for (i = 0; i < base->nactivequeues; ++i) {
- if (TAILQ_EMPTY(&base->activequeues[i]))
- continue;
- fprintf(output, "Active events [priority %d]:\n", i);
- TAILQ_FOREACH(e, &base->eventqueue, ev_next) {
- fprintf(output, " %p [fd "EV_SOCK_FMT"]%s%s%s%s\n",
- (void*)e, EV_SOCK_ARG(e->ev_fd),
- (e->ev_res&EV_READ)?" Read active":"",
- (e->ev_res&EV_WRITE)?" Write active":"",
- (e->ev_res&EV_SIGNAL)?" Signal active":"",
- (e->ev_res&EV_TIMEOUT)?" Timeout active":"");
+ struct event_callback *evcb;
+ TAILQ_FOREACH(evcb, &base->activequeues[i], evcb_active_next) {
+ if ((evcb->evcb_flags & (EVLIST_INIT|EVLIST_INSERTED|EVLIST_TIMEOUT)) != EVLIST_INIT) {
+ /* This isn't an event (evlist_init clear), or
+ * we already processed it. (inserted or
+ * timeout set */
+ continue;
+ }
+ ev = event_callback_to_event(evcb);
+ if ((r = fn(base, ev, arg)))
+ return r;
}
}
- fprintf(output, " %p [%s %ld]%s%s%s%s%s",
- (void*)e, gloss, (long)e->ev_fd,
+
+ return 0;
+}
+
+/* Helper for event_base_dump_events: called on each event in the event base;
+ * dumps only the inserted events. */
+static int
+dump_inserted_event_fn(const struct event_base *base, const struct event *e, void *arg)
+{
+ FILE *output = arg;
+ const char *gloss = (e->ev_events & EV_SIGNAL) ?
+ "sig" : "fd ";
+
+ if (! (e->ev_flags & (EVLIST_INSERTED|EVLIST_TIMEOUT)))
+ return 0;
+
- fprintf(output, " %p [%s %ld, priority=%d]%s%s%s%s active%s%s\n",
- (void*)e, gloss, (long)e->ev_fd, e->ev_pri,
++ fprintf(output, " %p [%s "EV_SOCK_FMT"]%s%s%s%s%s",
++ (void*)e, gloss, EV_SOCK_ARG(e->ev_fd),
+ (e->ev_events&EV_READ)?" Read":"",
+ (e->ev_events&EV_WRITE)?" Write":"",
+ (e->ev_events&EV_SIGNAL)?" Signal":"",
+ (e->ev_events&EV_PERSIST)?" Persist":"",
+ (e->ev_flags&EVLIST_INTERNAL)?" Internal":"");
+ if (e->ev_flags & EVLIST_TIMEOUT) {
+ struct timeval tv;
+ tv.tv_sec = e->ev_timeout.tv_sec;
+ tv.tv_usec = e->ev_timeout.tv_usec & MICROSECONDS_MASK;
+ evutil_timeradd(&tv, &base->tv_clock_diff, &tv);
+ fprintf(output, " Timeout=%ld.%06d",
+ (long)tv.tv_sec, (int)(tv.tv_usec & MICROSECONDS_MASK));
+ }
+ fputc('\n', output);
+
+ return 0;
+}
+
+/* Helper for event_base_dump_events: called on each event in the event base;
+ * dumps only the active events. */
+static int
+dump_active_event_fn(const struct event_base *base, const struct event *e, void *arg)
+{
+ FILE *output = arg;
+ const char *gloss = (e->ev_events & EV_SIGNAL) ?
+ "sig" : "fd ";
+
+ if (! (e->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)))
+ return 0;
+
++ fprintf(output, " %p [%s "EV_SOCK_FMT", priority=%d]%s%s%s%s active%s%s\n",
++ (void*)e, gloss, EV_SOCK_ARG(e->ev_fd), e->ev_pri,
+ (e->ev_res&EV_READ)?" Read":"",
+ (e->ev_res&EV_WRITE)?" Write":"",
+ (e->ev_res&EV_SIGNAL)?" Signal":"",
+ (e->ev_res&EV_TIMEOUT)?" Timeout":"",
+ (e->ev_flags&EVLIST_INTERNAL)?" [Internal]":"",
+ (e->ev_flags&EVLIST_ACTIVE_LATER)?" [NextTime]":"");
+
+ return 0;
+}
+
+int
+event_base_foreach_event(struct event_base *base,
+ event_base_foreach_event_cb fn, void *arg)
+{
+ int r;
+ if ((!fn) || (!base)) {
+ return -1;
+ }
+ EVBASE_ACQUIRE_LOCK(base, th_base_lock);
+ r = event_base_foreach_event_nolock_(base, fn, arg);
+ EVBASE_RELEASE_LOCK(base, th_base_lock);
+ return r;
+}
+
+
+void
+event_base_dump_events(struct event_base *base, FILE *output)
+{
+ EVBASE_ACQUIRE_LOCK(base, th_base_lock);
+ fprintf(output, "Inserted events:\n");
+ event_base_foreach_event_nolock_(base, dump_inserted_event_fn, output);
+
+ fprintf(output, "Active events:\n");
+ event_base_foreach_event_nolock_(base, dump_active_event_fn, output);
+ EVBASE_RELEASE_LOCK(base, th_base_lock);
}
void