]> granicus.if.org Git - libevent/commitdiff
Add "active later" event_callbacks to supersede deferred
authorNick Mathewson <nickm@torproject.org>
Fri, 6 Apr 2012 07:00:40 +0000 (03:00 -0400)
committerNick Mathewson <nickm@torproject.org>
Wed, 9 May 2012 16:05:53 +0000 (12:05 -0400)
An event or event callback can now be in an additional state: "active
later".  When an event is in this state, it will become active the
next time we run through the event loop.  This lets us do what we
wanted to with deferred callbacks: make a type of active thing that
avoids infinite circular regress in a way that starves other events or
exhausts the stack.  It improves on deferred callbacks by respecting
priorities, and by having a non-kludgy way to avoid event starvation.

event-internal.h
event.c
include/event2/event_struct.h
test/regress.c

index e4025cf62fc431b361717260d947aad33831929c..93b7c6264680588c458230563921f1c4fc7a6317 100644 (file)
@@ -225,6 +225,9 @@ struct event_base {
        struct evcallback_list *activequeues;
        /** The length of the activequeues array */
        int nactivequeues;
+       /** A list of event_callbacks that should become active the next time
+        * we process events, but not this time. */
+       struct evcallback_list active_later_queue;
 
        /* common timeout logic */
 
@@ -364,7 +367,17 @@ int evsig_restore_handler_(struct event_base *base, int evsignal);
 
 void event_active_nolock_(struct event *ev, int res, short count);
 void event_callback_activate_nolock_(struct event_base *, struct event_callback *);
-
+int event_callback_cancel_(struct event_base *base,
+    struct event_callback *evcb);
+
+void event_active_later_(struct event *ev, int res);
+void event_active_later_nolock_(struct event *ev, int res);
+void event_callback_activate_later_nolock_(struct event_base *base,
+    struct event_callback *evcb);
+int event_callback_cancel_nolock_(struct event_base *base,
+    struct event_callback *evcb);
+void event_callback_init_(struct event_base *base,
+    struct event_callback *cb);
 
 /* FIXME document. */
 void event_base_add_virtual_(struct event_base *base);
diff --git a/event.c b/event.c
index 3307db1b52ebf7bbfd34dc092b9ef3bc7b2dd128..d6bf420781a06dacdc34d7efea11e43572ad5554 100644 (file)
--- a/event.c
+++ b/event.c
@@ -136,11 +136,15 @@ static inline int event_add_internal(struct event *ev,
 static inline int event_del_internal(struct event *ev);
 
 static void    event_queue_insert_active(struct event_base *, struct event_callback *);
+static void    event_queue_insert_active_later(struct event_base *, struct event_callback *);
 static void    event_queue_insert_timeout(struct event_base *, struct event *);
 static void    event_queue_insert_inserted(struct event_base *, struct event *);
 static void    event_queue_remove_active(struct event_base *, struct event_callback *);
+static void    event_queue_remove_active_later(struct event_base *, struct event_callback *);
 static void    event_queue_remove_timeout(struct event_base *, struct event *);
 static void    event_queue_remove_inserted(struct event_base *, struct event *);
+static void event_queue_make_later_events_active(struct event_base *base);
+
 #ifdef USE_REINSERT_TIMEOUT
 /* This code seems buggy; only turn it on if we find out what the trouble is. */
 static void    event_queue_reinsert_timeout(struct event_base *,struct event *, int was_common, int is_common, int old_timeout_idx);
@@ -606,6 +610,8 @@ event_base_new_with_config(const struct event_config *cfg)
        base->defer_queue.notify_fn = notify_base_cbq_callback;
        base->defer_queue.notify_arg = base;
 
+       TAILQ_INIT(&base->active_later_queue);
+
        evmap_io_initmap_(&base->io);
        evmap_signal_initmap_(&base->sigmap);
        event_changelist_init_(&base->changelist);
@@ -800,11 +806,26 @@ event_base_free(struct event_base *base)
                                        ++n_deleted;
                                }
                        } else {
-                               event_queue_remove_active(base, evcb);
+                               event_callback_cancel_(base, evcb);
+                               ++n_deleted;
                        }
                        evcb = next;
                }
        }
+       {
+               struct event_callback *evcb;
+               while ((evcb = TAILQ_FIRST(&base->active_later_queue))) {
+                       if (evcb->evcb_flags & EVLIST_INIT) {
+                               ev = event_callback_to_event(evcb);
+                               event_del(ev);
+                               ++n_deleted;
+                       } else {
+                               event_callback_cancel_(base, evcb);
+                               ++n_deleted;
+                       }
+               }
+       }
+
 
        if (n_deleted)
                event_debug(("%s: %d events were still set in base",
@@ -1754,6 +1775,8 @@ event_base_loop(struct event_base *base, int flags)
                        goto done;
                }
 
+               event_queue_make_later_events_active(base);
+
                clear_time_cache(base);
 
                res = evsel->dispatch(base, tv_p);
@@ -2031,7 +2054,7 @@ event_pending(const struct event *ev, short event, struct timeval *tv)
 
        if (ev->ev_flags & EVLIST_INSERTED)
                flags |= (ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL));
-       if (ev->ev_flags & EVLIST_ACTIVE)
+       if (ev->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))
                flags |= ev->ev_res;
        if (ev->ev_flags & EVLIST_TIMEOUT)
                flags |= EV_TIMEOUT;
@@ -2235,7 +2258,7 @@ event_add_internal(struct event *ev, const struct timeval *tv,
 #endif
 
        if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) &&
-           !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) {
+           !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))) {
                if (ev->ev_events & (EV_READ|EV_WRITE))
                        res = evmap_io_add_(base, ev->ev_fd, ev);
                else if (ev->ev_events & EV_SIGNAL)
@@ -2421,6 +2444,8 @@ event_del_internal(struct event *ev)
 
        if (ev->ev_flags & EVLIST_ACTIVE)
                event_queue_remove_active(base, event_to_event_callback(ev));
+       else if (ev->ev_flags & EVLIST_ACTIVE_LATER)
+               event_queue_remove_active_later(base, event_to_event_callback(ev));
 
        if (ev->ev_flags & EVLIST_INSERTED) {
                event_queue_remove_inserted(base, ev);
@@ -2470,19 +2495,26 @@ event_active_nolock_(struct event *ev, int res, short ncalls)
        event_debug(("event_active: %p (fd %d), res %d, callback %p",
                ev, (int)ev->ev_fd, (int)res, ev->ev_callback));
 
+       base = ev->ev_base;
+       EVENT_BASE_ASSERT_LOCKED(base);
 
-       /* We get different kinds of events, add them together */
-       if (ev->ev_flags & EVLIST_ACTIVE) {
+       switch ((ev->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))) {
+       default:
+       case EVLIST_ACTIVE|EVLIST_ACTIVE_LATER:
+               EVUTIL_ASSERT(0);
+               break;
+       case EVLIST_ACTIVE:
+               /* We get different kinds of events, add them together */
                ev->ev_res |= res;
                return;
+       case EVLIST_ACTIVE_LATER:
+               ev->ev_res |= res;
+               break;
+       case 0:
+               ev->ev_res = res;
+               break;
        }
 
-       base = ev->ev_base;
-
-       EVENT_BASE_ASSERT_LOCKED(base);
-
-       ev->ev_res = res;
-
        if (ev->ev_pri < base->event_running_priority)
                base->event_continue = 1;
 
@@ -2501,16 +2533,100 @@ event_active_nolock_(struct event *ev, int res, short ncalls)
        event_callback_activate_nolock_(base, event_to_event_callback(ev));
 }
 
+void
+event_active_later_(struct event *ev, int res)
+{
+       EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);
+       event_active_later_nolock_(ev, res);
+       EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);
+}
+
+void
+event_active_later_nolock_(struct event *ev, int res)
+{
+       struct event_base *base = ev->ev_base;
+       EVENT_BASE_ASSERT_LOCKED(base);
+
+       if (ev->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) {
+               /* We get different kinds of events, add them together */
+               ev->ev_res |= res;
+               return;
+       }
+
+       ev->ev_res = res;
+
+       event_callback_activate_later_nolock_(base, event_to_event_callback(ev));
+}
+
 void
 event_callback_activate_nolock_(struct event_base *base,
     struct event_callback *evcb)
 {
+       if (evcb->evcb_flags & EVLIST_ACTIVE_LATER)
+               event_queue_remove_active_later(base, evcb);
+
        event_queue_insert_active(base, evcb);
 
        if (EVBASE_NEED_NOTIFY(base))
                evthread_notify_base(base);
 }
 
+void
+event_callback_activate_later_nolock_(struct event_base *base,
+    struct event_callback *evcb)
+{
+       if (evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))
+               return;
+
+       event_queue_insert_active_later(base, evcb);
+       if (EVBASE_NEED_NOTIFY(base))
+               evthread_notify_base(base);
+}
+
+void
+event_callback_init_(struct event_base *base,
+    struct event_callback *cb)
+{
+       memset(cb, 0, sizeof(*cb));
+       cb->evcb_pri = base->nactivequeues - 1;
+}
+
+int
+event_callback_cancel_(struct event_base *base,
+    struct event_callback *evcb)
+{
+       int r;
+       EVBASE_ACQUIRE_LOCK(base, th_base_lock);
+       r = event_callback_cancel_nolock_(base, evcb);
+       EVBASE_RELEASE_LOCK(base, th_base_lock);
+       return r;
+}
+
+int
+event_callback_cancel_nolock_(struct event_base *base,
+    struct event_callback *evcb)
+{
+       if (evcb->evcb_flags & EVLIST_INIT)
+               return event_del_internal(event_callback_to_event(evcb));
+
+       switch ((evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))) {
+       default:
+       case EVLIST_ACTIVE|EVLIST_ACTIVE_LATER:
+               EVUTIL_ASSERT(0);
+               break;
+       case EVLIST_ACTIVE:
+               /* We get different kinds of events, add them together */
+               event_queue_remove_active(base, evcb);
+               return 0;
+       case EVLIST_ACTIVE_LATER:
+               event_queue_remove_active_later(base, evcb);
+               break;
+       case 0:
+               break;
+       }
+       return 0;
+}
+
 void
 event_deferred_cb_init_(struct deferred_cb *cb, deferred_cb_fn fn, void *arg)
 {
@@ -2666,6 +2782,21 @@ event_queue_remove_active(struct event_base *base, struct event_callback *evcb)
            evcb, evcb_active_next);
 }
 static void
+event_queue_remove_active_later(struct event_base *base, struct event_callback *evcb)
+{
+       EVENT_BASE_ASSERT_LOCKED(base);
+       if (EVUTIL_FAILURE_CHECK(!(evcb->evcb_flags & EVLIST_ACTIVE_LATER))) {
+               event_errx(1, "%s: %p not on queue %x", __func__,
+                          evcb, EVLIST_ACTIVE_LATER);
+               return;
+       }
+       DECR_EVENT_COUNT(base, evcb->evcb_flags);
+       evcb->evcb_flags &= ~EVLIST_ACTIVE_LATER;
+       base->event_count_active--;
+
+       TAILQ_REMOVE(&base->active_later_queue, evcb, evcb_active_next);
+}
+static void
 event_queue_remove_timeout(struct event_base *base, struct event *ev)
 {
        EVENT_BASE_ASSERT_LOCKED(base);
@@ -2794,6 +2925,21 @@ event_queue_insert_active(struct event_base *base, struct event_callback *evcb)
            evcb, evcb_active_next);
 }
 
+static void
+event_queue_insert_active_later(struct event_base *base, struct event_callback *evcb)
+{
+       EVENT_BASE_ASSERT_LOCKED(base);
+       if (evcb->evcb_flags & (EVLIST_ACTIVE_LATER|EVLIST_ACTIVE)) {
+               /* Double insertion is possible */
+               return;
+       }
+
+       INCR_EVENT_COUNT(base, evcb->evcb_flags);
+       evcb->evcb_flags |= EVLIST_ACTIVE_LATER;
+       base->event_count_active++;
+       TAILQ_INSERT_TAIL(&base->active_later_queue, evcb, evcb_active_next);
+}
+
 static void
 event_queue_insert_timeout(struct event_base *base, struct event *ev)
 {
@@ -2818,6 +2964,19 @@ event_queue_insert_timeout(struct event_base *base, struct event *ev)
        }
 }
 
+static void
+event_queue_make_later_events_active(struct event_base *base)
+{
+       struct event_callback *evcb;
+       EVENT_BASE_ASSERT_LOCKED(base);
+
+       while ((evcb = TAILQ_FIRST(&base->active_later_queue))) {
+               TAILQ_REMOVE(&base->active_later_queue, evcb, evcb_active_next);
+               evcb->evcb_flags = (evcb->evcb_flags & ~EVLIST_ACTIVE_LATER) | EVLIST_ACTIVE;
+               TAILQ_INSERT_TAIL(&base->activequeues[evcb->evcb_pri], evcb, evcb_active_next);
+       }
+}
+
 /* Functions for debugging */
 
 const char *
@@ -3137,16 +3296,17 @@ dump_active_event_fn(struct event_base *base, struct event *e, void *arg)
        const char *gloss = (e->ev_events & EV_SIGNAL) ?
            "sig" : "fd ";
 
-       if (! (e->ev_flags & EVLIST_ACTIVE))
+       if (! (e->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)))
                return 0;
 
-       fprintf(output, "  %p [%s %ld, priority=%d]%s%s%s%s active%s\n",
+       fprintf(output, "  %p [%s %ld, priority=%d]%s%s%s%s active%s%s\n",
            (void*)e, gloss, (long)e->ev_fd, e->ev_pri,
            (e->ev_res&EV_READ)?" Read":"",
            (e->ev_res&EV_WRITE)?" Write":"",
            (e->ev_res&EV_SIGNAL)?" Signal":"",
            (e->ev_res&EV_TIMEOUT)?" Timeout":"",
-           (e->ev_flags&EVLIST_INTERNAL)?" [Internal]":"");
+           (e->ev_flags&EVLIST_INTERNAL)?" [Internal]":"",
+           (e->ev_flags&EVLIST_ACTIVE_LATER)?" [NextTime]":"");
 
        return 0;
 }
@@ -3283,10 +3443,17 @@ event_base_assert_ok_(struct event_base *base)
                struct event_callback *evcb;
                EVUTIL_ASSERT_TAILQ_OK(&base->activequeues[i], event_callback, evcb_active_next);
                TAILQ_FOREACH(evcb, &base->activequeues[i], evcb_active_next) {
-                       EVUTIL_ASSERT(evcb->evcb_flags & EVLIST_ACTIVE);
+                       EVUTIL_ASSERT((evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) == EVLIST_ACTIVE);
                        EVUTIL_ASSERT(evcb->evcb_pri == i);
                }
        }
 
+       {
+               struct event_callback *evcb;
+               TAILQ_FOREACH(evcb, &base->active_later_queue, evcb_active_next) {
+                       EVUTIL_ASSERT((evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) == EVLIST_ACTIVE_LATER);
+               }
+       }
+
        EVBASE_RELEASE_LOCK(base, th_base_lock);
 }
index d2470f052ec80c98611205af6e8000322d86c76a..701e8eac5a052e9845a497a4736a189dbec46411 100644 (file)
@@ -54,14 +54,15 @@ extern "C" {
 /* For evkeyvalq */
 #include <event2/keyvalq_struct.h>
 
-#define EVLIST_TIMEOUT 0x01
-#define EVLIST_INSERTED        0x02
-#define EVLIST_SIGNAL  0x04
-#define EVLIST_ACTIVE  0x08
-#define EVLIST_INTERNAL        0x10
-#define EVLIST_INIT    0x80
-
-#define EVLIST_ALL      0x9f
+#define EVLIST_TIMEOUT     0x01
+#define EVLIST_INSERTED            0x02
+#define EVLIST_SIGNAL      0x04
+#define EVLIST_ACTIVE      0x08
+#define EVLIST_INTERNAL            0x10
+#define EVLIST_ACTIVE_LATER 0x20
+#define EVLIST_INIT        0x80
+
+#define EVLIST_ALL          0xbf
 
 /* Fix so that people don't have to run with <sys/queue.h> */
 #ifndef TAILQ_ENTRY
index 0c5faed76374dd6d60a4e254b251955297361beb..b736d89143ac94b16695c9ff2e9141f1372549d6 100644 (file)
@@ -1350,6 +1350,66 @@ end:
        ;
 }
 
+static int n_write_a_byte_cb=0;
+static int n_read_and_drain_cb=0;
+static int n_activate_other_event_cb=0;
+static void
+write_a_byte_cb(evutil_socket_t fd, short what, void *arg)
+{
+       char buf[] = "x";
+       write(fd, buf, 1);
+       ++n_write_a_byte_cb;
+}
+static void
+read_and_drain_cb(evutil_socket_t fd, short what, void *arg)
+{
+       char buf[128];
+       int n;
+       ++n_read_and_drain_cb;
+       while ((n = read(fd, buf, sizeof(buf))) > 0)
+               ;
+}
+
+static void
+activate_other_event_cb(evutil_socket_t fd, short what, void *other_)
+{
+       struct event *ev_activate = other_;
+       ++n_activate_other_event_cb;
+       event_active_later_(ev_activate, EV_READ);
+}
+
+static void
+test_active_later(void *ptr)
+{
+       struct basic_test_data *data = ptr;
+       struct event *ev1, *ev2;
+       struct event ev3, ev4;
+       struct timeval qsec = {0, 100000};
+       ev1 = event_new(data->base, data->pair[0], EV_READ|EV_PERSIST, read_and_drain_cb, NULL);
+       ev2 = event_new(data->base, data->pair[1], EV_WRITE|EV_PERSIST, write_a_byte_cb, NULL);
+       event_assign(&ev3, data->base, -1, 0, activate_other_event_cb, &ev4);
+       event_assign(&ev4, data->base, -1, 0, activate_other_event_cb, &ev3);
+       event_add(ev1, NULL);
+       event_add(ev2, NULL);
+       event_active_later_(&ev3, EV_READ);
+
+       event_base_loopexit(data->base, &qsec);
+
+       event_base_loop(data->base, 0);
+
+       TT_BLATHER(("%d write calls, %d read calls, %d activate-other calls.",
+               n_write_a_byte_cb, n_read_and_drain_cb, n_activate_other_event_cb));
+       event_del(&ev3);
+       event_del(&ev4);
+
+       tt_int_op(n_write_a_byte_cb, ==, n_activate_other_event_cb);
+       tt_int_op(n_write_a_byte_cb, >, 100);
+       tt_int_op(n_read_and_drain_cb, >, 100);
+       tt_int_op(n_activate_other_event_cb, >, 100);
+end:
+       ;
+}
+
 static void
 test_event_base_new(void *ptr)
 {
@@ -2468,7 +2528,9 @@ struct testcase_t main_testcases[] = {
 
        BASIC(bad_assign, TT_FORK|TT_NEED_BASE|TT_NO_LOGS),
        BASIC(bad_reentrant, TT_FORK|TT_NEED_BASE|TT_NO_LOGS),
+       BASIC(active_later, TT_FORK|TT_NEED_BASE|TT_NEED_SOCKETPAIR),
 
+       /* These are still using the old API */
        LEGACY(persistent_timeout, TT_FORK|TT_NEED_BASE),
        { "persistent_timeout_jump", test_persistent_timeout_jump, TT_FORK|TT_NEED_BASE, &basic_setup, NULL },
        { "persistent_active_timeout", test_persistent_active_timeout,