]> granicus.if.org Git - libevent/commitdiff
Make "deferred callback queue" independent of event_base.
authorNick Mathewson <nickm@torproject.org>
Sun, 26 Jul 2009 01:29:39 +0000 (01:29 +0000)
committerNick Mathewson <nickm@torproject.org>
Sun, 26 Jul 2009 01:29:39 +0000 (01:29 +0000)
This way, we can more easily have an IOCP bufferevent implementation
that does not need an event_base at all.  Woot.

svn:r1381

buffer.c
bufferevent.c
defer-internal.h
evbuffer-internal.h
evdns.c
event-internal.h
event.c

index 870c973ab0c4e380ee9416a37c972efbf58f927f..a67f7f789641d0bedfd9a27b9a60d3e3bbb98520 100644 (file)
--- a/buffer.c
+++ b/buffer.c
@@ -278,7 +278,7 @@ int
 evbuffer_defer_callbacks(struct evbuffer *buffer, struct event_base *base)
 {
        EVBUFFER_LOCK(buffer, EVTHREAD_WRITE);
-       buffer->ev_base = base;
+       buffer->cb_queue = event_base_get_deferred_cb_queue(base);
        buffer->deferred_cbs = 1;
        event_deferred_cb_init(&buffer->deferred,
            evbuffer_deferred_callback, buffer);
@@ -361,7 +361,7 @@ evbuffer_invoke_callbacks(struct evbuffer *buffer)
                if (buffer->deferred.queued)
                        return;
                _evbuffer_incref(buffer);
-               event_deferred_cb_schedule(buffer->ev_base, &buffer->deferred);
+               event_deferred_cb_schedule(buffer->cb_queue, &buffer->deferred);
        } else {
                evbuffer_run_callbacks(buffer);
        }
@@ -407,7 +407,7 @@ _evbuffer_decref_and_unlock(struct evbuffer *buffer)
        }
        evbuffer_remove_all_callbacks(buffer);
        if (buffer->deferred_cbs)
-               event_deferred_cb_cancel(buffer->ev_base, &buffer->deferred);
+               event_deferred_cb_cancel(buffer->cb_queue, &buffer->deferred);
 
        EVBUFFER_UNLOCK(buffer, EVTHREAD_WRITE);
         if (buffer->own_lock)
index 74097e26138dbac9d3add091085e72742466f711..f79ff54026c5323391d3bbd1aad088bc2842d415 100644 (file)
@@ -140,6 +140,14 @@ bufferevent_run_deferred_callbacks(struct deferred_cb *_, void *arg)
        _bufferevent_decref_and_unlock(bufev);
 }
 
+#define SCHEDULE_DEFERRED(bevp)                                                \
+       do {                                                            \
+               event_deferred_cb_schedule(                             \
+                       event_base_get_deferred_cb_queue((bevp)->bev.ev_base), \
+                       &(bevp)->deferred);                             \
+       } while (0);
+
+
 void
 _bufferevent_run_readcb(struct bufferevent *bufev)
 {
@@ -150,8 +158,7 @@ _bufferevent_run_readcb(struct bufferevent *bufev)
                p->readcb_pending = 1;
                if (!p->deferred.queued) {
                        bufferevent_incref(bufev);
-                       event_deferred_cb_schedule(
-                               bufev->ev_base, &p->deferred);
+                       SCHEDULE_DEFERRED(p);
                }
        } else {
                bufev->readcb(bufev, bufev->cbarg);
@@ -168,8 +175,7 @@ _bufferevent_run_writecb(struct bufferevent *bufev)
                p->writecb_pending = 1;
                if (!p->deferred.queued) {
                        bufferevent_incref(bufev);
-                       event_deferred_cb_schedule(
-                               bufev->ev_base, &p->deferred);
+                       SCHEDULE_DEFERRED(p);
                }
        } else {
                bufev->writecb(bufev, bufev->cbarg);
@@ -187,8 +193,7 @@ _bufferevent_run_eventcb(struct bufferevent *bufev, short what)
                p->errno_pending = EVUTIL_SOCKET_ERROR();
                if (!p->deferred.queued) {
                        bufferevent_incref(bufev);
-                       event_deferred_cb_schedule(
-                               bufev->ev_base, &p->deferred);
+                       SCHEDULE_DEFERRED(p);
                }
        } else {
                bufev->errorcb(bufev, what, bufev->cbarg);
index 8eead90f24d37ede014c0777c6e97fb608a7f625..cde6e903a9bb0b99618b5dbb794fbcaec9a3cd40 100644 (file)
@@ -50,6 +50,20 @@ struct deferred_cb {
        void *arg;
 };
 
+
+struct deferred_cb_queue {
+       void *lock;
+
+       int active_count;
+
+       void (*notify_fn)(struct deferred_cb_queue *, void *);
+       void *notify_arg;
+
+       /** Deferred callback management: a list of deferred callbacks to
+        * run active the active events. */
+       TAILQ_HEAD (deferred_cb_list, deferred_cb) deferred_cb_list;
+};
+
 /**
    Initialize an empty, non-pending deferred_cb.
 
@@ -61,15 +75,32 @@ void event_deferred_cb_init(struct deferred_cb *, deferred_cb_fn, void *);
 /**
    Cancel a deferred_cb if it is currently scheduled in an event_base.
  */
-void event_deferred_cb_cancel(struct event_base *, struct deferred_cb *);
+void event_deferred_cb_cancel(struct deferred_cb_queue *, struct deferred_cb *);
 /**
    Activate a deferred_cb if it is not currently scheduled in an event_base.
  */
-void event_deferred_cb_schedule(struct event_base *, struct deferred_cb *);
+void event_deferred_cb_schedule(struct deferred_cb_queue *, struct deferred_cb *);
+
+#define LOCK_DEFERRED_QUEUE(q)                                         \
+       do {                                                            \
+               if ((q)->lock)                                          \
+                       _evthread_locking_fn(EVTHREAD_LOCK|EVTHREAD_WRITE, \
+                           (q)->lock);                                 \
+       } while (0)
+
+#define UNLOCK_DEFERRED_QUEUE(q)                                       \
+       do {                                                            \
+               if ((q)->lock)                                          \
+                       _evthread_locking_fn(EVTHREAD_UNLOCK|EVTHREAD_WRITE, \
+                           (q)->lock);                                 \
+       } while (0)
 
 #ifdef __cplusplus
 }
 #endif
 
+void event_deferred_cb_queue_init(struct deferred_cb_queue *);
+struct deferred_cb_queue *event_base_get_deferred_cb_queue(struct event_base *);
+
 #endif /* _EVENT_INTERNAL_H_ */
 
index 8e94f607682a0cfca4962f9dcfdbadfdb4ec386d..063777e2cab760cc87e717d61dcc6dc3c5c44c4a 100644 (file)
@@ -115,9 +115,8 @@ struct evbuffer {
        unsigned is_overlapped : 1;
 #endif
 
-       /** An event_base associated with this evbuffer.  Used to implement
-        * deferred callbacks. */
-       struct event_base *ev_base;
+       /** Used to implement deferred callbacks. */
+       struct deferred_cb_queue *cb_queue;
 
        /** For debugging: how many times have we acquired the lock for this
         * evbuffer? */
diff --git a/evdns.c b/evdns.c
index 496c2e880ebe42013a05a0c8d0d22a07ed4c338e..6962cc19dbed794e465018ba34b881f076d7d19c 100644 (file)
--- a/evdns.c
+++ b/evdns.c
@@ -804,7 +804,9 @@ reply_schedule_callback(struct evdns_request *const req, u32 ttl, u32 err, struc
 
        event_deferred_cb_init(&d->deferred, reply_run_callback,
            req->user_pointer);
-       event_deferred_cb_schedule(req->base->event_base, &d->deferred);
+       event_deferred_cb_schedule(
+               event_base_get_deferred_cb_queue(req->base->event_base),
+               &d->deferred);
 }
 
 /* this processes a parsed reply packet */
index 527ee2cee419d5b9da4f24eaefee6fb1e333d188..724135b846a95bce9a545d0e3da376e22749944e 100644 (file)
@@ -37,6 +37,7 @@ extern "C" {
 #include "minheap-internal.h"
 #include "evsignal-internal.h"
 #include "mm-internal.h"
+#include "defer-internal.h"
 
 /* map union members back */
 
@@ -136,9 +137,7 @@ struct event_base {
        /** The event whose callback is executing right now */
        struct event *current_event;
 
-       /** Deferred callback management: a list of deferred callbacks to
-        * run active the active events. */
-       TAILQ_HEAD (deferred_cb_list, deferred_cb) deferred_cb_list;
+       struct deferred_cb_queue defer_queue;
 
        /** Mapping from file descriptors to enabled events */
        struct event_io_map io;
@@ -209,6 +208,9 @@ struct event_config {
 } while (0)
 #endif /* TAILQ_FOREACH */
 
+#define N_ACTIVE_CALLBACKS(base)                                       \
+       ((base)->event_count_active + (base)->defer_queue.active_count)
+
 int _evsig_set_handler(struct event_base *base, int evsignal,
                          void (*fn)(int));
 int _evsig_restore_handler(struct event_base *base, int evsignal);
diff --git a/event.c b/event.c
index 0fea11f4c151d18ffcd6bb8f77d3c4d75ec1caea..83526d090372fd4e036fae7c1e5c5a8d7f0d2fbc 100644 (file)
--- a/event.c
+++ b/event.c
@@ -230,6 +230,27 @@ event_base_get_features(struct event_base *base)
        return base->evsel->features;
 }
 
+void
+event_deferred_cb_queue_init(struct deferred_cb_queue *cb)
+{
+       memset(cb, 0, sizeof(struct deferred_cb_queue));
+       TAILQ_INIT(&cb->deferred_cb_list);
+}
+
+static void
+notify_base_cbq_callback(struct deferred_cb_queue *cb, void *baseptr)
+{
+       struct event_base *base = baseptr;
+       if (!EVBASE_IN_THREAD(base))
+           evthread_notify_base(base);
+}
+
+struct deferred_cb_queue *
+event_base_get_deferred_cb_queue(struct event_base *base)
+{
+       return base ? &base->defer_queue : NULL;
+}
+
 struct event_base *
 event_base_new_with_config(struct event_config *cfg)
 {
@@ -245,10 +266,13 @@ event_base_new_with_config(struct event_config *cfg)
 
        min_heap_ctor(&base->timeheap);
        TAILQ_INIT(&base->eventqueue);
-       TAILQ_INIT(&base->deferred_cb_list);
        base->sig.ev_signal_pair[0] = -1;
        base->sig.ev_signal_pair[1] = -1;
 
+       event_deferred_cb_queue_init(&base->defer_queue);
+       base->defer_queue.notify_fn = notify_base_cbq_callback;
+       base->defer_queue.notify_arg = base;
+
        evmap_io_initmap(&base->io);
        evmap_signal_initmap(&base->sigmap);
 
@@ -301,6 +325,7 @@ event_base_new_with_config(struct event_config *cfg)
        if (!cfg || !(cfg->flags & EVENT_BASE_FLAG_NOLOCK)) {
                int r;
                EVTHREAD_ALLOC_LOCK(base->th_base_lock);
+               base->defer_queue.lock = base->th_base_lock;
                EVTHREAD_ALLOC_LOCK(base->current_event_lock);
                r = evthread_make_base_notifiable(base);
                if (r<0) {
@@ -551,7 +576,7 @@ event_base_priority_init(struct event_base *base, int npriorities)
 {
        int i;
 
-       if (base->event_count_active || npriorities < 1
+       if (N_ACTIVE_CALLBACKS(base) || npriorities < 1
            || npriorities >= EVENT_MAX_PRIORITIES)
                return (-1);
 
@@ -678,23 +703,23 @@ event_process_active_single_queue(struct event_base *base,
 }
 
 static int
-event_process_deferred_callbacks(struct event_base *base)
+event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr)
 {
        int count = 0;
        struct deferred_cb *cb;
 
-       while ((cb = TAILQ_FIRST(&base->deferred_cb_list))) {
+       while ((cb = TAILQ_FIRST(&queue->deferred_cb_list))) {
                cb->queued = 0;
-               TAILQ_REMOVE(&base->deferred_cb_list, cb, cb_next);
-               --base->event_count_active;
-               EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+               TAILQ_REMOVE(&queue->deferred_cb_list, cb, cb_next);
+               --queue->active_count;
+               UNLOCK_DEFERRED_QUEUE(queue);
 
                cb->cb(cb, cb->arg);
                ++count;
-               if (base->event_break)
+               if (*breakptr)
                        return -1;
 
-               EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+               LOCK_DEFERRED_QUEUE(queue);
        }
        return count;
 }
@@ -727,7 +752,7 @@ event_process_active(struct event_base *base)
                }
        }
 
-       event_process_deferred_callbacks(base);
+       event_process_deferred_callbacks(&base->defer_queue,&base->event_break);
 
 unlock:
        EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
@@ -845,7 +870,7 @@ event_base_loop(struct event_base *base, int flags)
                timeout_correct(base, &tv);
 
                tv_p = &tv;
-               if (!base->event_count_active && !(flags & EVLOOP_NONBLOCK)) {
+               if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) {
                        timeout_next(base, &tv_p);
                } else {
                        /*
@@ -856,7 +881,7 @@ event_base_loop(struct event_base *base, int flags)
                }
 
                /* If we have no events, we just exit */
-               if (!event_haveevents(base) && !base->event_count_active) {
+               if (!event_haveevents(base) && !N_ACTIVE_CALLBACKS(base)) {
                        event_debug(("%s: no events registered.", __func__));
                        return (1);
                }
@@ -875,7 +900,7 @@ event_base_loop(struct event_base *base, int flags)
 
                timeout_process(base);
 
-               if (base->event_count_active) {
+               if (N_ACTIVE_CALLBACKS(base)) {
                        event_process_active(base);
                        if (!base->event_count_active && (flags & EVLOOP_ONCE))
                                done = 1;
@@ -1377,34 +1402,45 @@ event_deferred_cb_init(struct deferred_cb *cb, deferred_cb_fn fn, void *arg)
 }
 
 void
-event_deferred_cb_cancel(struct event_base *base, struct deferred_cb *cb)
+event_deferred_cb_cancel(struct deferred_cb_queue *queue,
+    struct deferred_cb *cb)
 {
-       if (!base)
-               base = current_base;
+       if (!queue) {
+               if (current_base)
+                       queue = &current_base->defer_queue;
+               else
+                       return;
+       }
 
-       EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+       LOCK_DEFERRED_QUEUE(queue);
        if (cb->queued) {
-               TAILQ_REMOVE(&base->deferred_cb_list, cb, cb_next);
-               --base->event_count_active;
+               TAILQ_REMOVE(&queue->deferred_cb_list, cb, cb_next);
+               --queue->active_count;
                cb->queued = 0;
        }
-       EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+       UNLOCK_DEFERRED_QUEUE(queue);
 }
 
 void
-event_deferred_cb_schedule(struct event_base *base, struct deferred_cb *cb)
+event_deferred_cb_schedule(struct deferred_cb_queue *queue,
+    struct deferred_cb *cb)
 {
-       if (!base)
-               base = current_base;
-       EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+       if (!queue) {
+               if (current_base)
+                       queue = &current_base->defer_queue;
+               else
+                       return;
+       }
+
+       LOCK_DEFERRED_QUEUE(queue);
        if (!cb->queued) {
                cb->queued = 1;
-               TAILQ_INSERT_TAIL(&base->deferred_cb_list, cb, cb_next);
-               ++base->event_count_active;
-               if (!EVBASE_IN_THREAD(base))
-                       evthread_notify_base(base);
+               TAILQ_INSERT_TAIL(&queue->deferred_cb_list, cb, cb_next);
+               ++queue->active_count;
+               if (queue->notify_fn)
+                       queue->notify_fn(queue, queue->notify_arg);
        }
-       EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+       UNLOCK_DEFERRED_QUEUE(queue);
 }
 
 static int