]> granicus.if.org Git - libevent/commitdiff
support for event priorities; active events are scheduled into priority queues;
authorNiels Provos <provos@gmail.com>
Sun, 19 Sep 2004 21:08:09 +0000 (21:08 +0000)
committerNiels Provos <provos@gmail.com>
Sun, 19 Sep 2004 21:08:09 +0000 (21:08 +0000)
lower priorities get always processed before higher priorities

svn:r120

evbuffer.c
event.3
event.c
event.h
test/regress.c

index 0e7481eaf4033ad2bbf1403454159aee0f54e936..b8254d86a74818383505a5e6d619112a5d17c329 100644 (file)
@@ -225,6 +225,17 @@ bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb,
        return (bufev);
 }
 
+int
+bufferevent_priority_set(struct bufferevent *bufev, int priority)
+{
+       if (event_priority_set(&bufev->ev_read, priority) == -1)
+               return (-1);
+       if (event_priority_set(&bufev->ev_write, priority) == -1)
+               return (-1);
+
+       return (0);
+}
+
 void
 bufferevent_free(struct bufferevent *bufev)
 {
diff --git a/event.3 b/event.3
index ec4cd7c4fee705e2a1c2c8a9373605447bf1ff96..c263a4948bdb42cd26aafeaf1b5d05d238af09dc 100644 (file)
--- a/event.3
+++ b/event.3
@@ -40,6 +40,8 @@
 .Nm event_once ,
 .Nm event_pending ,
 .Nm event_initialized ,
+.Nm event_priority_init ,
+.Nm event_priority_set ,
 .Nm evtimer_set ,
 .Nm evtimer_add ,
 .Nm evtimer_del
 .Fn "event_pending" "struct event *ev" "short event" "struct timeval *tv"
 .Ft int
 .Fn "event_initialized" "struct event *ev"
+.Ft int
+.Fn "event_priority_init" "int npriorities"
+.Ft int
+.Fn "event_priority_set" "struct event *ev" "int priority"
 .Ft void
 .Fn "evtimer_set" "struct event *ev" "void (*fn)(int, short, void *)" "void *arg"
 .Ft void
@@ -368,6 +374,31 @@ By setting the environment variable
 .Nm libevent
 displays the kernel notification method that it uses.
 .Pp
+.Sh EVENT PRIORITIES
+By default
+.Nm libevent
+schedules all active events with the same priority.
+However, sometime it is desirable to process some events with a higher
+priority than others.
+For that reason,
+.Nm libevent
+supports strict priority queues.
+Active events with a lower priority are always processed before events
+with a higher priority.
+.Pp
+The number of different priorities can be set initially with the
+.Fn event_priority_init
+function.
+This function should be called before the first call to
+.Fn event_dispatch .
+The
+.Fn event_priority_set
+function can be used to assign a priority to an event.
+By default,
+.Nm libevent
+assigns the middle priority to all events unless their priority
+is explicitly set.
+.Pp
 .Sh BUFFERED EVENTS
 .Nm libevent
 provides an abstraction on top of the regular event callbacks.
diff --git a/event.c b/event.c
index 66cf8a6a57b0944a342e524019ccc583e347cd6a..c62574e30aac41df947e089e13d45239f5552682 100644 (file)
--- a/event.c
+++ b/event.c
@@ -113,7 +113,8 @@ const struct eventop *eventops[] = {
 
 const struct eventop *evsel;
 void *evbase;
-static int event_count;
+static int event_count;                        /* counts number of total events */
+static int event_count_active;         /* counts number of active events */
 
 /* Handle signals - This is a deprecated interface */
 int (*event_sigcb)(void);      /* Signal callback when gotsig is set */
@@ -128,7 +129,11 @@ int                event_haveevents(void);
 static void    event_process_active(void);
 
 static RB_HEAD(event_tree, event) timetree;
-static struct event_list activequeue;
+
+/* active event management */
+static struct event_list **activequeues;
+static int nactivequeues;
+
 struct event_list signalqueue;
 struct event_list eventqueue;
 static struct timeval event_tv;
@@ -168,7 +173,6 @@ event_init(void)
 
        RB_INIT(&timetree);
        TAILQ_INIT(&eventqueue);
-       TAILQ_INIT(&activequeue);
        TAILQ_INIT(&signalqueue);
        
        evbase = NULL;
@@ -183,6 +187,41 @@ event_init(void)
 
        if (getenv("EVENT_SHOW_METHOD")) 
                fprintf(stderr, "libevent using: %s\n", evsel->name); 
+
+       /* allocate a single active event queue */
+       event_priority_init(1);
+}
+
+int
+event_priority_init(int npriorities)
+{
+       int i;
+
+       if (event_count_active)
+               return (-1);
+
+       if (nactivequeues && npriorities != nactivequeues) {
+               for (i = 0; i < nactivequeues; ++i) {
+                       free(activequeues[i]);
+               }
+               free(activequeues);
+       }
+
+       /* Allocate our priority queues */
+       nactivequeues = npriorities;
+       activequeues = (struct event_list **)calloc(nactivequeues,
+           npriorities * sizeof(struct event_list *));
+       if (activequeues == NULL)
+               err(1, "%s: calloc", __func__);
+
+       for (i = 0; i < nactivequeues; ++i) {
+               activequeues[i] = malloc(sizeof(struct event_list));
+               if (activequeues[i] == NULL)
+                       err(1, "%s: malloc", __func__);
+               TAILQ_INIT(activequeues[i]);
+       }
+
+       return (0);
 }
 
 int
@@ -191,14 +230,31 @@ event_haveevents(void)
        return (event_count > 0);
 }
 
+/*
+ * Active events are stored in priority queues.  Lower priorities are always
+ * process before higher priorities.  Low priority events can starve high
+ * priority ones.
+ */
+
 static void
 event_process_active(void)
 {
        struct event *ev;
+       struct event_list *activeq = NULL;
+       int i;
        short ncalls;
 
-       for (ev = TAILQ_FIRST(&activequeue); ev;
-           ev = TAILQ_FIRST(&activequeue)) {
+       if (!event_count_active)
+               return;
+
+       for (i = 0; i < nactivequeues; ++i) {
+               if (TAILQ_FIRST(activequeues[i]) != NULL) {
+                       activeq = activequeues[i];
+                       break;
+               }
+       }
+
+       for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) {
                event_queue_remove(ev, EVLIST_ACTIVE);
                
                /* Allows deletes to work */
@@ -276,7 +332,7 @@ event_loop(int flags)
                }
                event_tv = tv;
 
-               if (!(flags & EVLOOP_NONBLOCK))
+               if (!event_count_active && !(flags & EVLOOP_NONBLOCK))
                        timeout_next(&tv);
                else
                        timerclear(&tv);
@@ -292,9 +348,9 @@ event_loop(int flags)
 
                timeout_process();
 
-               if (TAILQ_FIRST(&activequeue)) {
+               if (event_count_active) {
                        event_process_active();
-                       if (flags & EVLOOP_ONCE)
+                       if (!event_count_active && (flags & EVLOOP_ONCE))
                                done = 1;
                } else if (flags & EVLOOP_NONBLOCK)
                        done = 1;
@@ -382,6 +438,27 @@ event_set(struct event *ev, int fd, short events,
        ev->ev_flags = EVLIST_INIT;
        ev->ev_ncalls = 0;
        ev->ev_pncalls = NULL;
+
+       /* by default, we put new events into the middle priority */
+       ev->ev_pri = nactivequeues/2;
+}
+
+/*
+ * Set's the priority of an event - if an event is already scheduled
+ * changing the priority is going to fail.
+ */
+
+int
+event_priority_set(struct event *ev, int pri)
+{
+       if (ev->ev_flags & EVLIST_ACTIVE)
+               return (-1);
+       if (pri < 0 || pri >= nactivequeues)
+               return (-1);
+
+       ev->ev_pri = pri;
+
+       return (0);
 }
 
 /*
@@ -587,17 +664,24 @@ timeout_process(void)
 void
 event_queue_remove(struct event *ev, int queue)
 {
+       int docount = 1;
+
        if (!(ev->ev_flags & queue))
                errx(1, "%s: %p(fd %d) not on queue %x", __func__,
                    ev, ev->ev_fd, queue);
 
-       if (!(ev->ev_flags & EVLIST_INTERNAL))
+       if (ev->ev_flags & EVLIST_INTERNAL)
+               docount = 0;
+
+       if (docount)
                event_count--;
 
        ev->ev_flags &= ~queue;
        switch (queue) {
        case EVLIST_ACTIVE:
-               TAILQ_REMOVE(&activequeue, ev, ev_active_next);
+               if (docount)
+                       event_count_active--;
+               TAILQ_REMOVE(activequeues[ev->ev_pri], ev, ev_active_next);
                break;
        case EVLIST_SIGNAL:
                TAILQ_REMOVE(&signalqueue, ev, ev_signal_next);
@@ -616,17 +700,29 @@ event_queue_remove(struct event *ev, int queue)
 void
 event_queue_insert(struct event *ev, int queue)
 {
-       if (ev->ev_flags & queue)
+       int docount = 1;
+
+       if (ev->ev_flags & queue) {
+               /* Double insertion is possible for active events */
+               if (queue & EVLIST_ACTIVE)
+                       return;
+
                errx(1, "%s: %p(fd %d) already on queue %x", __func__,
                    ev, ev->ev_fd, queue);
+       }
+
+       if (ev->ev_flags & EVLIST_INTERNAL)
+               docount = 0;
 
-       if (!(ev->ev_flags & EVLIST_INTERNAL))
+       if (docount)
                event_count++;
 
        ev->ev_flags |= queue;
        switch (queue) {
        case EVLIST_ACTIVE:
-               TAILQ_INSERT_TAIL(&activequeue, ev, ev_active_next);
+               if (docount)
+                       event_count_active++;
+               TAILQ_INSERT_TAIL(activequeues[ev->ev_pri], ev,ev_active_next);
                break;
        case EVLIST_SIGNAL:
                TAILQ_INSERT_TAIL(&signalqueue, ev, ev_signal_next);
diff --git a/event.h b/event.h
index 875560c6f56f1a3731a94dd21aedda0021f62675..83230bf71f26fdd648975f9aac6d61718ef96839 100644 (file)
--- a/event.h
+++ b/event.h
@@ -89,6 +89,8 @@ struct event {
 
        struct timeval ev_timeout;
 
+       int ev_pri;             /* smaller numbers are higher priority */
+
        void (*ev_callback)(int, short, void *arg);
        void *ev_arg;
 
@@ -167,6 +169,11 @@ int event_pending(struct event *, short, struct timeval *);
 #define event_initialized(ev)          ((ev)->ev_flags & EVLIST_INIT)
 #endif
 
+/* These functions deal with event priorities */
+
+int    event_priority_init(int);
+int    event_priority_set(struct event *, int);
+
 /* These functions deal with buffering input and output */
 
 struct evbuffer {
@@ -220,6 +227,7 @@ struct bufferevent {
 
 struct bufferevent *bufferevent_new(int fd,
     evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg);
+int bufferevent_priority_set(struct bufferevent *bufev, int pri);
 void bufferevent_free(struct bufferevent *bufev);
 int bufferevent_write(struct bufferevent *bufev, void *data, size_t size);
 int bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf);
index a6547e19f3d3bf2c8003b897ed1a2accaaa39027..20ef0b40d68751ef7ca35262a89b338242e67c08 100644 (file)
@@ -529,6 +529,82 @@ test9(void)
        cleanup_test();
 }
 
+struct test_pri_event {
+       struct event ev;
+       int count;
+};
+
+void
+test_priorities_cb(int fd, short what, void *arg)
+{
+       struct test_pri_event *pri = arg;
+       struct timeval tv;
+
+       if (pri->count == 3) {
+               event_loopexit(NULL);
+               return;
+       }
+
+       pri->count++;
+
+       timerclear(&tv);
+       event_add(&pri->ev, &tv);
+}
+
+void
+test_priorities(int npriorities)
+{
+       char buf[32];
+       struct test_pri_event one, two;
+       struct timeval tv;
+
+       snprintf(buf, sizeof(buf), "Priorities %d: ", npriorities);
+       setup_test(buf);
+
+       event_priority_init(npriorities);
+
+       memset(&one, 0, sizeof(one));
+       memset(&two, 0, sizeof(two));
+
+       timeout_set(&one.ev, test_priorities_cb, &one);
+       if (event_priority_set(&one.ev, 0) == -1) {
+               fprintf(stderr, "%s: failed to set priority", __func__);
+               exit(1);
+       }
+
+       timeout_set(&two.ev, test_priorities_cb, &two);
+       if (event_priority_set(&two.ev, npriorities - 1) == -1) {
+               fprintf(stderr, "%s: failed to set priority", __func__);
+               exit(1);
+       }
+
+       timerclear(&tv);
+
+       if (event_add(&one.ev, &tv) == -1)
+               exit(1);
+       if (event_add(&two.ev, &tv) == -1)
+               exit(1);
+
+       event_dispatch();
+
+       event_del(&one.ev);
+       event_del(&two.ev);
+
+       if (npriorities == 1) {
+               if (one.count == 3 && two.count == 3)
+                       test_ok = 1;
+       } else if (npriorities == 2) {
+               /* Two is called once because event_loopexit is priority 1 */
+               if (one.count == 3 && two.count == 1)
+                       test_ok = 1;
+       } else {
+               if (one.count == 3 && two.count == 0)
+                       test_ok = 1;
+       }
+
+       cleanup_test();
+}
+
 int
 main (int argc, char **argv)
 {
@@ -565,6 +641,10 @@ main (int argc, char **argv)
 
        test9();
 
+       test_priorities(1);
+       test_priorities(2);
+       test_priorities(3);
+
        return (0);
 }