static inline int event_del_internal(struct event *ev);
static void event_queue_insert_active(struct event_base *, struct event_callback *);
+static void event_queue_insert_active_later(struct event_base *, struct event_callback *);
static void event_queue_insert_timeout(struct event_base *, struct event *);
static void event_queue_insert_inserted(struct event_base *, struct event *);
static void event_queue_remove_active(struct event_base *, struct event_callback *);
+static void event_queue_remove_active_later(struct event_base *, struct event_callback *);
static void event_queue_remove_timeout(struct event_base *, struct event *);
static void event_queue_remove_inserted(struct event_base *, struct event *);
+static void event_queue_make_later_events_active(struct event_base *base);
+
#ifdef USE_REINSERT_TIMEOUT
/* This code seems buggy; only turn it on if we find out what the trouble is. */
static void event_queue_reinsert_timeout(struct event_base *,struct event *, int was_common, int is_common, int old_timeout_idx);
base->defer_queue.notify_fn = notify_base_cbq_callback;
base->defer_queue.notify_arg = base;
+ TAILQ_INIT(&base->active_later_queue);
+
evmap_io_initmap_(&base->io);
evmap_signal_initmap_(&base->sigmap);
event_changelist_init_(&base->changelist);
++n_deleted;
}
} else {
- event_queue_remove_active(base, evcb);
+ event_callback_cancel_(base, evcb);
+ ++n_deleted;
}
evcb = next;
}
}
+ {
+ struct event_callback *evcb;
+ while ((evcb = TAILQ_FIRST(&base->active_later_queue))) {
+ if (evcb->evcb_flags & EVLIST_INIT) {
+ ev = event_callback_to_event(evcb);
+ event_del(ev);
+ ++n_deleted;
+ } else {
+ event_callback_cancel_(base, evcb);
+ ++n_deleted;
+ }
+ }
+ }
+
if (n_deleted)
event_debug(("%s: %d events were still set in base",
goto done;
}
+ event_queue_make_later_events_active(base);
+
clear_time_cache(base);
res = evsel->dispatch(base, tv_p);
if (ev->ev_flags & EVLIST_INSERTED)
flags |= (ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL));
- if (ev->ev_flags & EVLIST_ACTIVE)
+ if (ev->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))
flags |= ev->ev_res;
if (ev->ev_flags & EVLIST_TIMEOUT)
flags |= EV_TIMEOUT;
#endif
if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) &&
- !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) {
+ !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))) {
if (ev->ev_events & (EV_READ|EV_WRITE))
res = evmap_io_add_(base, ev->ev_fd, ev);
else if (ev->ev_events & EV_SIGNAL)
if (ev->ev_flags & EVLIST_ACTIVE)
event_queue_remove_active(base, event_to_event_callback(ev));
+ else if (ev->ev_flags & EVLIST_ACTIVE_LATER)
+ event_queue_remove_active_later(base, event_to_event_callback(ev));
if (ev->ev_flags & EVLIST_INSERTED) {
event_queue_remove_inserted(base, ev);
event_debug(("event_active: %p (fd %d), res %d, callback %p",
ev, (int)ev->ev_fd, (int)res, ev->ev_callback));
+ base = ev->ev_base;
+ EVENT_BASE_ASSERT_LOCKED(base);
- /* We get different kinds of events, add them together */
- if (ev->ev_flags & EVLIST_ACTIVE) {
+ switch ((ev->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))) {
+ default:
+ case EVLIST_ACTIVE|EVLIST_ACTIVE_LATER:
+ EVUTIL_ASSERT(0);
+ break;
+ case EVLIST_ACTIVE:
+ /* We get different kinds of events, add them together */
ev->ev_res |= res;
return;
+ case EVLIST_ACTIVE_LATER:
+ ev->ev_res |= res;
+ break;
+ case 0:
+ ev->ev_res = res;
+ break;
}
- base = ev->ev_base;
-
- EVENT_BASE_ASSERT_LOCKED(base);
-
- ev->ev_res = res;
-
if (ev->ev_pri < base->event_running_priority)
base->event_continue = 1;
event_callback_activate_nolock_(base, event_to_event_callback(ev));
}
+void
+event_active_later_(struct event *ev, int res)
+{
+ EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);
+ event_active_later_nolock_(ev, res);
+ EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);
+}
+
+void
+event_active_later_nolock_(struct event *ev, int res)
+{
+ struct event_base *base = ev->ev_base;
+ EVENT_BASE_ASSERT_LOCKED(base);
+
+ if (ev->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) {
+ /* We get different kinds of events, add them together */
+ ev->ev_res |= res;
+ return;
+ }
+
+ ev->ev_res = res;
+
+ event_callback_activate_later_nolock_(base, event_to_event_callback(ev));
+}
+
void
event_callback_activate_nolock_(struct event_base *base,
struct event_callback *evcb)
{
+ if (evcb->evcb_flags & EVLIST_ACTIVE_LATER)
+ event_queue_remove_active_later(base, evcb);
+
event_queue_insert_active(base, evcb);
if (EVBASE_NEED_NOTIFY(base))
evthread_notify_base(base);
}
+void
+event_callback_activate_later_nolock_(struct event_base *base,
+ struct event_callback *evcb)
+{
+ if (evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))
+ return;
+
+ event_queue_insert_active_later(base, evcb);
+ if (EVBASE_NEED_NOTIFY(base))
+ evthread_notify_base(base);
+}
+
+void
+event_callback_init_(struct event_base *base,
+ struct event_callback *cb)
+{
+ memset(cb, 0, sizeof(*cb));
+ cb->evcb_pri = base->nactivequeues - 1;
+}
+
+int
+event_callback_cancel_(struct event_base *base,
+ struct event_callback *evcb)
+{
+ int r;
+ EVBASE_ACQUIRE_LOCK(base, th_base_lock);
+ r = event_callback_cancel_nolock_(base, evcb);
+ EVBASE_RELEASE_LOCK(base, th_base_lock);
+ return r;
+}
+
+int
+event_callback_cancel_nolock_(struct event_base *base,
+ struct event_callback *evcb)
+{
+ if (evcb->evcb_flags & EVLIST_INIT)
+ return event_del_internal(event_callback_to_event(evcb));
+
+ switch ((evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))) {
+ default:
+ case EVLIST_ACTIVE|EVLIST_ACTIVE_LATER:
+ EVUTIL_ASSERT(0);
+ break;
+ case EVLIST_ACTIVE:
+ /* We get different kinds of events, add them together */
+ event_queue_remove_active(base, evcb);
+ return 0;
+ case EVLIST_ACTIVE_LATER:
+ event_queue_remove_active_later(base, evcb);
+ break;
+ case 0:
+ break;
+ }
+ return 0;
+}
+
void
event_deferred_cb_init_(struct deferred_cb *cb, deferred_cb_fn fn, void *arg)
{
evcb, evcb_active_next);
}
static void
+event_queue_remove_active_later(struct event_base *base, struct event_callback *evcb)
+{
+ EVENT_BASE_ASSERT_LOCKED(base);
+ if (EVUTIL_FAILURE_CHECK(!(evcb->evcb_flags & EVLIST_ACTIVE_LATER))) {
+ event_errx(1, "%s: %p not on queue %x", __func__,
+ evcb, EVLIST_ACTIVE_LATER);
+ return;
+ }
+ DECR_EVENT_COUNT(base, evcb->evcb_flags);
+ evcb->evcb_flags &= ~EVLIST_ACTIVE_LATER;
+ base->event_count_active--;
+
+ TAILQ_REMOVE(&base->active_later_queue, evcb, evcb_active_next);
+}
+static void
event_queue_remove_timeout(struct event_base *base, struct event *ev)
{
EVENT_BASE_ASSERT_LOCKED(base);
evcb, evcb_active_next);
}
+static void
+event_queue_insert_active_later(struct event_base *base, struct event_callback *evcb)
+{
+ EVENT_BASE_ASSERT_LOCKED(base);
+ if (evcb->evcb_flags & (EVLIST_ACTIVE_LATER|EVLIST_ACTIVE)) {
+ /* Double insertion is possible */
+ return;
+ }
+
+ INCR_EVENT_COUNT(base, evcb->evcb_flags);
+ evcb->evcb_flags |= EVLIST_ACTIVE_LATER;
+ base->event_count_active++;
+ TAILQ_INSERT_TAIL(&base->active_later_queue, evcb, evcb_active_next);
+}
+
static void
event_queue_insert_timeout(struct event_base *base, struct event *ev)
{
}
}
+static void
+event_queue_make_later_events_active(struct event_base *base)
+{
+ struct event_callback *evcb;
+ EVENT_BASE_ASSERT_LOCKED(base);
+
+ while ((evcb = TAILQ_FIRST(&base->active_later_queue))) {
+ TAILQ_REMOVE(&base->active_later_queue, evcb, evcb_active_next);
+ evcb->evcb_flags = (evcb->evcb_flags & ~EVLIST_ACTIVE_LATER) | EVLIST_ACTIVE;
+ TAILQ_INSERT_TAIL(&base->activequeues[evcb->evcb_pri], evcb, evcb_active_next);
+ }
+}
+
/* Functions for debugging */
const char *
const char *gloss = (e->ev_events & EV_SIGNAL) ?
"sig" : "fd ";
- if (! (e->ev_flags & EVLIST_ACTIVE))
+ if (! (e->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)))
return 0;
- fprintf(output, " %p [%s %ld, priority=%d]%s%s%s%s active%s\n",
+ fprintf(output, " %p [%s %ld, priority=%d]%s%s%s%s active%s%s\n",
(void*)e, gloss, (long)e->ev_fd, e->ev_pri,
(e->ev_res&EV_READ)?" Read":"",
(e->ev_res&EV_WRITE)?" Write":"",
(e->ev_res&EV_SIGNAL)?" Signal":"",
(e->ev_res&EV_TIMEOUT)?" Timeout":"",
- (e->ev_flags&EVLIST_INTERNAL)?" [Internal]":"");
+ (e->ev_flags&EVLIST_INTERNAL)?" [Internal]":"",
+ (e->ev_flags&EVLIST_ACTIVE_LATER)?" [NextTime]":"");
return 0;
}
struct event_callback *evcb;
EVUTIL_ASSERT_TAILQ_OK(&base->activequeues[i], event_callback, evcb_active_next);
TAILQ_FOREACH(evcb, &base->activequeues[i], evcb_active_next) {
- EVUTIL_ASSERT(evcb->evcb_flags & EVLIST_ACTIVE);
+ EVUTIL_ASSERT((evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) == EVLIST_ACTIVE);
EVUTIL_ASSERT(evcb->evcb_pri == i);
}
}
+ {
+ struct event_callback *evcb;
+ TAILQ_FOREACH(evcb, &base->active_later_queue, evcb_active_next) {
+ EVUTIL_ASSERT((evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) == EVLIST_ACTIVE_LATER);
+ }
+ }
+
EVBASE_RELEASE_LOCK(base, th_base_lock);
}
;
}
+static int n_write_a_byte_cb=0;
+static int n_read_and_drain_cb=0;
+static int n_activate_other_event_cb=0;
+static void
+write_a_byte_cb(evutil_socket_t fd, short what, void *arg)
+{
+ char buf[] = "x";
+ write(fd, buf, 1);
+ ++n_write_a_byte_cb;
+}
+static void
+read_and_drain_cb(evutil_socket_t fd, short what, void *arg)
+{
+ char buf[128];
+ int n;
+ ++n_read_and_drain_cb;
+ while ((n = read(fd, buf, sizeof(buf))) > 0)
+ ;
+}
+
+static void
+activate_other_event_cb(evutil_socket_t fd, short what, void *other_)
+{
+ struct event *ev_activate = other_;
+ ++n_activate_other_event_cb;
+ event_active_later_(ev_activate, EV_READ);
+}
+
+static void
+test_active_later(void *ptr)
+{
+ struct basic_test_data *data = ptr;
+ struct event *ev1, *ev2;
+ struct event ev3, ev4;
+ struct timeval qsec = {0, 100000};
+ ev1 = event_new(data->base, data->pair[0], EV_READ|EV_PERSIST, read_and_drain_cb, NULL);
+ ev2 = event_new(data->base, data->pair[1], EV_WRITE|EV_PERSIST, write_a_byte_cb, NULL);
+ event_assign(&ev3, data->base, -1, 0, activate_other_event_cb, &ev4);
+ event_assign(&ev4, data->base, -1, 0, activate_other_event_cb, &ev3);
+ event_add(ev1, NULL);
+ event_add(ev2, NULL);
+ event_active_later_(&ev3, EV_READ);
+
+ event_base_loopexit(data->base, &qsec);
+
+ event_base_loop(data->base, 0);
+
+ TT_BLATHER(("%d write calls, %d read calls, %d activate-other calls.",
+ n_write_a_byte_cb, n_read_and_drain_cb, n_activate_other_event_cb));
+ event_del(&ev3);
+ event_del(&ev4);
+
+ tt_int_op(n_write_a_byte_cb, ==, n_activate_other_event_cb);
+ tt_int_op(n_write_a_byte_cb, >, 100);
+ tt_int_op(n_read_and_drain_cb, >, 100);
+ tt_int_op(n_activate_other_event_cb, >, 100);
+end:
+ ;
+}
+
static void
test_event_base_new(void *ptr)
{
BASIC(bad_assign, TT_FORK|TT_NEED_BASE|TT_NO_LOGS),
BASIC(bad_reentrant, TT_FORK|TT_NEED_BASE|TT_NO_LOGS),
+ BASIC(active_later, TT_FORK|TT_NEED_BASE|TT_NEED_SOCKETPAIR),
+ /* These are still using the old API */
LEGACY(persistent_timeout, TT_FORK|TT_NEED_BASE),
{ "persistent_timeout_jump", test_persistent_timeout_jump, TT_FORK|TT_NEED_BASE, &basic_setup, NULL },
{ "persistent_active_timeout", test_persistent_active_timeout,