]> granicus.if.org Git - libevent/commitdiff
Replace deferred_cbs with event_callback-based implementation.
authorNick Mathewson <nickm@torproject.org>
Fri, 6 Apr 2012 08:33:19 +0000 (04:33 -0400)
committerNick Mathewson <nickm@torproject.org>
Wed, 9 May 2012 16:05:53 +0000 (12:05 -0400)
12 files changed:
buffer.c
bufferevent-internal.h
bufferevent.c
defer-internal.h
evbuffer-internal.h
evdns.c
event-internal.h
event.c
http.c
include/event2/event_struct.h
test/regress_buffer.c
test/regress_thread.c

index b28c9361ba7b2fdd45840de4921029090ead0341..6853edf390287e150869e6c9c13747ec28bb75b3 100644 (file)
--- a/buffer.c
+++ b/buffer.c
@@ -404,7 +404,7 @@ evbuffer_defer_callbacks(struct evbuffer *buffer, struct event_base *base)
        EVBUFFER_LOCK(buffer);
        buffer->cb_queue = event_base_get_deferred_cb_queue_(base);
        buffer->deferred_cbs = 1;
-       event_deferred_cb_init_(&buffer->deferred,
+       event_deferred_cb_init_(base, &buffer->deferred,
            evbuffer_deferred_callback, buffer);
        EVBUFFER_UNLOCK(buffer);
        return 0;
@@ -509,13 +509,12 @@ evbuffer_invoke_callbacks_(struct evbuffer *buffer)
        }
 
        if (buffer->deferred_cbs) {
-               if (buffer->deferred.queued)
-                       return;
-               evbuffer_incref_and_lock_(buffer);
-               if (buffer->parent)
-                       bufferevent_incref_(buffer->parent);
+               if (event_deferred_cb_schedule_(buffer->cb_queue, &buffer->deferred)) {
+                       evbuffer_incref_and_lock_(buffer);
+                       if (buffer->parent)
+                               bufferevent_incref_(buffer->parent);
+               }
                EVBUFFER_UNLOCK(buffer);
-               event_deferred_cb_schedule_(buffer->cb_queue, &buffer->deferred);
        }
 
        evbuffer_run_callbacks(buffer, 0);
index 461c46e76ca3a95b00dbbb04615969acd7072d95..5d7e98c074859e5a3a3ea501fb1a2b67345e81ee 100644 (file)
@@ -31,6 +31,7 @@ extern "C" {
 #endif
 
 #include "event2/event-config.h"
+#include "event2/event_struct.h"
 #include "evconfig-private.h"
 #include "event2/util.h"
 #include "defer-internal.h"
index 0d4b01d6a69ce9ed87d2b10b212b9143eb2edc93..9c023ad40622ed2c9a20315efa63c277568e7103 100644 (file)
@@ -210,10 +210,10 @@ bufferevent_run_deferred_callbacks_unlocked(struct deferred_cb *cb, void *arg)
 
 #define SCHEDULE_DEFERRED(bevp)                                                \
        do {                                                            \
-               bufferevent_incref_(&(bevp)->bev);                      \
-               event_deferred_cb_schedule_(                            \
+               if (event_deferred_cb_schedule_(                        \
                        event_base_get_deferred_cb_queue_((bevp)->bev.ev_base), \
-                       &(bevp)->deferred);                             \
+                       &(bevp)->deferred))                             \
+                       bufferevent_incref_(&(bevp)->bev);              \
        } while (0)
 
 
@@ -227,8 +227,7 @@ bufferevent_run_readcb_(struct bufferevent *bufev)
                return;
        if (p->options & BEV_OPT_DEFER_CALLBACKS) {
                p->readcb_pending = 1;
-               if (!p->deferred.queued)
-                       SCHEDULE_DEFERRED(p);
+               SCHEDULE_DEFERRED(p);
        } else {
                bufev->readcb(bufev, bufev->cbarg);
        }
@@ -244,8 +243,7 @@ bufferevent_run_writecb_(struct bufferevent *bufev)
                return;
        if (p->options & BEV_OPT_DEFER_CALLBACKS) {
                p->writecb_pending = 1;
-               if (!p->deferred.queued)
-                       SCHEDULE_DEFERRED(p);
+               SCHEDULE_DEFERRED(p);
        } else {
                bufev->writecb(bufev, bufev->cbarg);
        }
@@ -262,8 +260,7 @@ bufferevent_run_eventcb_(struct bufferevent *bufev, short what)
        if (p->options & BEV_OPT_DEFER_CALLBACKS) {
                p->eventcb_pending |= what;
                p->errno_pending = EVUTIL_SOCKET_ERROR();
-               if (!p->deferred.queued)
-                       SCHEDULE_DEFERRED(p);
+               SCHEDULE_DEFERRED(p);
        } else {
                bufev->errorcb(bufev, what, bufev->cbarg);
        }
@@ -326,11 +323,15 @@ bufferevent_init_common_(struct bufferevent_private *bufev_private,
        }
        if (options & BEV_OPT_DEFER_CALLBACKS) {
                if (options & BEV_OPT_UNLOCK_CALLBACKS)
-                       event_deferred_cb_init_(&bufev_private->deferred,
+                       event_deferred_cb_init_(
+                           bufev->ev_base,
+                           &bufev_private->deferred,
                            bufferevent_run_deferred_callbacks_unlocked,
                            bufev_private);
                else
-                       event_deferred_cb_init_(&bufev_private->deferred,
+                       event_deferred_cb_init_(
+                           bufev->ev_base,
+                           &bufev_private->deferred,
                            bufferevent_run_deferred_callbacks_locked,
                            bufev_private);
        }
index 114a9dc27ef0f8c21646655dbb2027c5db3ee855..a4c88135d91576d2d8c0adac50d2a990658abecb 100644 (file)
@@ -35,43 +35,11 @@ extern "C" {
 
 #include <sys/queue.h>
 
-struct deferred_cb;
+#define deferred_cb event_callback
+#define deferred_cb_queue event_base
+struct event_callback;
 
-typedef void (*deferred_cb_fn)(struct deferred_cb *, void *);
-
-/** A deferred_cb is a callback that can be scheduled to run as part of
- * an event_base's event_loop, rather than running immediately. */
-struct deferred_cb {
-       /** Links to the adjacent active (pending) deferred_cb objects. */
-       TAILQ_ENTRY (deferred_cb) cb_next;
-       /** True iff this deferred_cb is pending in an event_base. */
-       unsigned queued : 1;
-       /** The function to execute when the callback runs. */
-       deferred_cb_fn cb;
-       /** The function's second argument. */
-       void *arg;
-};
-
-/** A deferred_cb_queue is a list of deferred_cb that we can add to and run. */
-struct deferred_cb_queue {
-       /** Lock used to protect the queue. */
-       void *lock;
-
-       /** Which event_base does this queue associate itself with?
-        * (Used for timing) */
-       struct event_base *base;
-
-       /** How many entries are in the queue? */
-       int active_count;
-
-       /** Function called when adding to the queue from another thread. */
-       void (*notify_fn)(struct deferred_cb_queue *, void *);
-       void *notify_arg;
-
-       /** Deferred callback management: a list of deferred callbacks to
-        * run active the active events. */
-       TAILQ_HEAD (deferred_cb_list, deferred_cb) deferred_cb_list;
-};
+typedef void (*deferred_cb_fn)(struct event_callback *, void *);
 
 /**
    Initialize an empty, non-pending deferred_cb.
@@ -80,27 +48,23 @@ struct deferred_cb_queue {
    @param cb The function to run when the deferred_cb executes.
    @param arg The function's second argument.
  */
-void event_deferred_cb_init_(struct deferred_cb *, deferred_cb_fn, void *);
+void event_deferred_cb_init_(struct event_base *base, struct event_callback *, deferred_cb_fn, void *);
 /**
    Cancel a deferred_cb if it is currently scheduled in an event_base.
  */
-void event_deferred_cb_cancel_(struct deferred_cb_queue *, struct deferred_cb *);
+void event_deferred_cb_cancel_(struct event_base *, struct event_callback *);
 /**
    Activate a deferred_cb if it is not currently scheduled in an event_base.
- */
-void event_deferred_cb_schedule_(struct deferred_cb_queue *, struct deferred_cb *);
 
-#define LOCK_DEFERRED_QUEUE(q)                                         \
-       EVLOCK_LOCK((q)->lock, 0)
-#define UNLOCK_DEFERRED_QUEUE(q)                                       \
-       EVLOCK_UNLOCK((q)->lock, 0)
+   Return true iff it was not previously scheduled.
+ */
+int event_deferred_cb_schedule_(struct event_base *, struct event_callback *);
 
 #ifdef __cplusplus
 }
 #endif
 
-void event_deferred_cb_queue_init_(struct deferred_cb_queue *);
-struct deferred_cb_queue *event_base_get_deferred_cb_queue_(struct event_base *);
+#define event_base_get_deferred_cb_queue_(x) (x)
 
 #endif /* EVENT_INTERNAL_H_INCLUDED_ */
 
index 480c35a3d7bce002e926fc4368da192b3d87e408..824739b10c178263dd88516bde5acb38a012eda8 100644 (file)
@@ -34,6 +34,7 @@ extern "C" {
 #include "event2/event-config.h"
 #include "evconfig-private.h"
 #include "event2/util.h"
+#include "event2/event_struct.h"
 #include "util-internal.h"
 #include "defer-internal.h"
 
diff --git a/evdns.c b/evdns.c
index 41d8b6b2369e4a4d45e9fa0b0edbcfb0306262ec..ff36fc3c4eb99ba429253e74a262d4b83ad7cf3f 100644 (file)
--- a/evdns.c
+++ b/evdns.c
@@ -836,7 +836,8 @@ reply_schedule_callback(struct request *const req, u32 ttl, u32 err, struct repl
                d->handle = req->handle;
        }
 
-       event_deferred_cb_init_(&d->deferred, reply_run_callback,
+       event_deferred_cb_init_(req->base->event_base,
+           &d->deferred, reply_run_callback,
            req->user_pointer);
        event_deferred_cb_schedule_(
                event_base_get_deferred_cb_queue_(req->base->event_base),
index 93b7c6264680588c458230563921f1c4fc7a6317..4757836caaeebea16fd38c7cc0c7c4039839ad45 100644 (file)
@@ -56,13 +56,14 @@ extern "C" {
 #define ev_pri ev_evcallback.evcb_pri
 #define ev_flags ev_evcallback.evcb_flags
 #define ev_closure ev_evcallback.evcb_closure
-#define ev_callback ev_evcallback.evcb_callback
+#define ev_callback ev_evcallback.evcb_cb_union.evcb_callback
 #define ev_arg ev_evcallback.evcb_arg
 
 /* Possible values for evcb_closure in struct event_callback */
 #define EV_CLOSURE_EVENT 0
 #define EV_CLOSURE_EVENT_SIGNAL 1
 #define EV_CLOSURE_EVENT_PERSIST 2
+#define EV_CLOSURE_CB_SELF 3
 
 /** Structure to define the backend of a given event_base. */
 struct eventop {
@@ -239,10 +240,6 @@ struct event_base {
        /** The total size of common_timeout_queues. */
        int n_common_timeouts_allocated;
 
-       /** List of defered_cb that are active.  We run these after the active
-        * events. */
-       struct deferred_cb_queue defer_queue;
-
        /** Mapping from file descriptors to enabled (added) events */
        struct event_io_map io;
 
@@ -358,7 +355,7 @@ struct event_config {
 #endif /* TAILQ_FOREACH */
 
 #define N_ACTIVE_CALLBACKS(base)                                       \
-       ((base)->event_count_active + (base)->defer_queue.active_count)
+       ((base)->event_count_active)
 
 int evsig_set_handler_(struct event_base *base, int evsignal,
                          void (*fn)(int));
@@ -366,7 +363,8 @@ int evsig_restore_handler_(struct event_base *base, int evsignal);
 
 
 void event_active_nolock_(struct event *ev, int res, short count);
-void event_callback_activate_nolock_(struct event_base *, struct event_callback *);
+int event_callback_activate_(struct event_base *, struct event_callback *);
+int event_callback_activate_nolock_(struct event_base *, struct event_callback *);
 int event_callback_cancel_(struct event_base *base,
     struct event_callback *evcb);
 
diff --git a/event.c b/event.c
index c8315911e79696d69f55cec11879d1ed7b07ead1..9a506c82dca5a12f45bf29e2c76fef1e3ca1fe90 100644 (file)
--- a/event.c
+++ b/event.c
@@ -506,28 +506,6 @@ event_base_get_features(const struct event_base *base)
        return base->evsel->features;
 }
 
-void
-event_deferred_cb_queue_init_(struct deferred_cb_queue *cb)
-{
-       memset(cb, 0, sizeof(struct deferred_cb_queue));
-       TAILQ_INIT(&cb->deferred_cb_list);
-}
-
-/** Helper for the deferred_cb queue: wake up the event base. */
-static void
-notify_base_cbq_callback(struct deferred_cb_queue *cb, void *baseptr)
-{
-       struct event_base *base = baseptr;
-       if (EVBASE_NEED_NOTIFY(base))
-               evthread_notify_base(base);
-}
-
-struct deferred_cb_queue *
-event_base_get_deferred_cb_queue_(struct event_base *base)
-{
-       return base ? &base->defer_queue : NULL;
-}
-
 void
 event_enable_debug_mode(void)
 {
@@ -605,11 +583,6 @@ event_base_new_with_config(const struct event_config *cfg)
        base->th_notify_fd[0] = -1;
        base->th_notify_fd[1] = -1;
 
-       event_deferred_cb_queue_init_(&base->defer_queue);
-       base->defer_queue.base = base;
-       base->defer_queue.notify_fn = notify_base_cbq_callback;
-       base->defer_queue.notify_arg = base;
-
        TAILQ_INIT(&base->active_later_queue);
 
        evmap_io_initmap_(&base->io);
@@ -682,7 +655,6 @@ event_base_new_with_config(const struct event_config *cfg)
                int r;
                EVTHREAD_ALLOC_LOCK(base->th_base_lock,
                    EVTHREAD_LOCKTYPE_RECURSIVE);
-               base->defer_queue.lock = base->th_base_lock;
                EVTHREAD_ALLOC_COND(base->current_event_cond);
                r = evthread_make_base_notifiable(base);
                if (r<0) {
@@ -1464,7 +1436,7 @@ event_process_active_single_queue(struct event_base *base,
                        event_queue_remove_active(base, evcb);
                        event_debug(("event_process_active: event_callback %p, "
                                "closure %d, call %p",
-                               evcb, evcb->evcb_closure, evcb->evcb_callback));
+                               evcb, evcb->evcb_closure, evcb->evcb_cb_union.evcb_callback));
                }
 
                if (!(evcb->evcb_flags & EVLIST_INTERNAL))
@@ -1488,6 +1460,10 @@ event_process_active_single_queue(struct event_base *base,
                        (*ev->ev_callback)(
                                ev->ev_fd, ev->ev_res, ev->ev_arg);
                        break;
+               case EV_CLOSURE_CB_SELF:
+                       EVBASE_RELEASE_LOCK(base, th_base_lock);
+                       evcb->evcb_cb_union.evcb_selfcb(evcb, evcb->evcb_arg);
+                       break;
                default:
                        EVUTIL_ASSERT(0);
                }
@@ -1518,47 +1494,6 @@ event_process_active_single_queue(struct event_base *base,
        return count;
 }
 
-/*
-   Process up to MAX_DEFERRED of the defered_cb entries in 'queue'.  If
-   *breakptr becomes set to 1, stop.  Requires that we start out holding
-   the lock on 'queue'; releases the lock around 'queue' for each deferred_cb
-   we process.
- */
-static int
-event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr,
-    int max_to_process, const struct timeval *endtime)
-{
-       int count = 0;
-       struct deferred_cb *cb;
-#define MAX_DEFERRED 16
-       if (max_to_process > MAX_DEFERRED)
-               max_to_process = MAX_DEFERRED;
-#undef MAX_DEFERRED
-
-       while ((cb = TAILQ_FIRST(&queue->deferred_cb_list))) {
-               cb->queued = 0;
-               TAILQ_REMOVE(&queue->deferred_cb_list, cb, cb_next);
-               --queue->active_count;
-               UNLOCK_DEFERRED_QUEUE(queue);
-
-               cb->cb(cb, cb->arg);
-
-               LOCK_DEFERRED_QUEUE(queue);
-               if (*breakptr)
-                       return -1;
-               if (++count >= max_to_process)
-                       break;
-               if (endtime) {
-                       struct timeval now;
-                       update_time_cache(queue->base);
-                       gettime(queue->base, &now);
-                       if (evutil_timercmp(&now, endtime, >=))
-                               return count;
-               }
-       }
-       return count;
-}
-
 /*
  * Active events are stored in priority queues.  Lower priorities are always
  * process before higher priorities.  Low priority events can starve high
@@ -1605,8 +1540,6 @@ event_process_active(struct event_base *base)
                }
        }
 
-       event_process_deferred_callbacks(&base->defer_queue,&base->event_break,
-           maxcb-c, endtime);
        base->event_running_priority = -1;
        return c;
 }
@@ -2558,17 +2491,42 @@ event_active_later_nolock_(struct event *ev, int res)
        event_callback_activate_later_nolock_(base, event_to_event_callback(ev));
 }
 
-void
+int
+event_callback_activate_(struct event_base *base,
+    struct event_callback *evcb)
+{
+       int r;
+       EVBASE_ACQUIRE_LOCK(base, th_base_lock);
+       r = event_callback_activate_nolock_(base, evcb);
+       EVBASE_RELEASE_LOCK(base, th_base_lock);
+       return r;
+}
+
+int
 event_callback_activate_nolock_(struct event_base *base,
     struct event_callback *evcb)
 {
-       if (evcb->evcb_flags & EVLIST_ACTIVE_LATER)
+       int r = 1;
+
+       switch (evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) {
+       default:
+               EVUTIL_ASSERT(0);
+       case EVLIST_ACTIVE_LATER:
                event_queue_remove_active_later(base, evcb);
+               r = 0;
+               break;
+       case EVLIST_ACTIVE:
+               return 0;
+       case 0:
+               break;
+       }
 
        event_queue_insert_active(base, evcb);
 
        if (EVBASE_NEED_NOTIFY(base))
                evthread_notify_base(base);
+
+       return r;
 }
 
 void
@@ -2628,53 +2586,31 @@ event_callback_cancel_nolock_(struct event_base *base,
 }
 
 void
-event_deferred_cb_init_(struct deferred_cb *cb, deferred_cb_fn fn, void *arg)
+event_deferred_cb_init_(struct event_base *base, struct event_callback *cb, deferred_cb_fn fn, void *arg)
 {
-       memset(cb, 0, sizeof(struct deferred_cb));
-       cb->cb = fn;
-       cb->arg = arg;
+       if (!base)
+               base = current_base;
+       memset(cb, 0, sizeof(*cb));
+       cb->evcb_cb_union.evcb_selfcb = fn;
+       cb->evcb_arg = arg;
+       cb->evcb_pri = base->nactivequeues - 1;
+       cb->evcb_closure = EV_CLOSURE_CB_SELF;
 }
 
 void
-event_deferred_cb_cancel_(struct deferred_cb_queue *queue,
-    struct deferred_cb *cb)
+event_deferred_cb_cancel_(struct event_base *base, struct event_callback *cb)
 {
-       if (!queue) {
-               if (current_base)
-                       queue = &current_base->defer_queue;
-               else
-                       return;
-       }
-
-       LOCK_DEFERRED_QUEUE(queue);
-       if (cb->queued) {
-               TAILQ_REMOVE(&queue->deferred_cb_list, cb, cb_next);
-               --queue->active_count;
-               cb->queued = 0;
-       }
-       UNLOCK_DEFERRED_QUEUE(queue);
+       if (!base)
+               base = current_base;
+       event_callback_cancel_(base, cb);
 }
 
-void
-event_deferred_cb_schedule_(struct deferred_cb_queue *queue,
-    struct deferred_cb *cb)
+int
+event_deferred_cb_schedule_(struct event_base *base, struct event_callback *cb)
 {
-       if (!queue) {
-               if (current_base)
-                       queue = &current_base->defer_queue;
-               else
-                       return;
-       }
-
-       LOCK_DEFERRED_QUEUE(queue);
-       if (!cb->queued) {
-               cb->queued = 1;
-               TAILQ_INSERT_TAIL(&queue->deferred_cb_list, cb, cb_next);
-               ++queue->active_count;
-               if (queue->notify_fn)
-                       queue->notify_fn(queue, queue->notify_arg);
-       }
-       UNLOCK_DEFERRED_QUEUE(queue);
+       if (!base)
+               base = current_base;
+       return event_callback_activate_(base, cb);
 }
 
 static int
diff --git a/http.c b/http.c
index 786105d37bab3c2d578361c9dca1354e9ac754af..e52627558a4f566e1ee4e9c7a59e851b496855eb 100644 (file)
--- a/http.c
+++ b/http.c
@@ -2197,7 +2197,8 @@ evhttp_connection_base_bufferevent_new(struct event_base *base, struct evdns_bas
                        bufferevent_base_set(base, evcon->bufev);
        }
 
-       event_deferred_cb_init_(&evcon->read_more_deferred_cb,
+       event_deferred_cb_init_(evcon->base,
+           &evcon->read_more_deferred_cb,
            evhttp_deferred_read_cb, evcon);
 
        evcon->dns_base = dnsbase;
index 701e8eac5a052e9845a497a4736a189dbec46411..cf7b3df359c4a2450987ca0a6d5b70fef51d2f00 100644 (file)
@@ -99,7 +99,10 @@ struct event_callback {
        ev_uint8_t evcb_pri;    /* smaller numbers are higher priority */
        ev_uint8_t evcb_closure;
        /* allows us to adopt for different types of events */
-       void (*evcb_callback)(evutil_socket_t, short, void *arg);
+        union {
+               void (*evcb_callback)(evutil_socket_t, short, void *arg);
+               void (*evcb_selfcb)(struct event_callback *, void *arg);
+       } evcb_cb_union;
        void *evcb_arg;
 };
 
index d183b4f9db75568fe4332a24543f7f5493eb7832..d94a8b37305ef9256e2006922d168ba943cf7973 100644 (file)
@@ -57,6 +57,7 @@
 #include "event2/buffer_compat.h"
 #include "event2/util.h"
 
+#include "defer-internal.h"
 #include "evbuffer-internal.h"
 #include "log-internal.h"
 
index 091bcb73e31c8293d6832cacde2225d28d450bce..9fd49fa6d4b2444ecc17d4aae67dd2f4bb2a4cc4 100644 (file)
@@ -439,7 +439,7 @@ load_deferred_queue(void *arg)
        size_t i;
 
        for (i = 0; i < CB_COUNT; ++i) {
-               event_deferred_cb_init_(&data->cbs[i], deferred_callback, NULL);
+               event_deferred_cb_init_(data->queue, &data->cbs[i], deferred_callback, NULL);
                event_deferred_cb_schedule_(data->queue, &data->cbs[i]);
                SLEEP_MS(1);
        }