.Nm event_once ,
.Nm event_pending ,
.Nm event_initialized ,
+.Nm event_priority_init ,
+.Nm event_priority_set ,
.Nm evtimer_set ,
.Nm evtimer_add ,
.Nm evtimer_del
.Fn "event_pending" "struct event *ev" "short event" "struct timeval *tv"
.Ft int
.Fn "event_initialized" "struct event *ev"
+.Ft int
+.Fn "event_priority_init" "int npriorities"
+.Ft int
+.Fn "event_priority_set" "struct event *ev" "int priority"
.Ft void
.Fn "evtimer_set" "struct event *ev" "void (*fn)(int, short, void *)" "void *arg"
.Ft void
.Nm libevent
displays the kernel notification method that it uses.
.Pp
+.Sh EVENT PRIORITIES
+By default
+.Nm libevent
+schedules all active events with the same priority.
+However, sometime it is desirable to process some events with a higher
+priority than others.
+For that reason,
+.Nm libevent
+supports strict priority queues.
+Active events with a lower priority are always processed before events
+with a higher priority.
+.Pp
+The number of different priorities can be set initially with the
+.Fn event_priority_init
+function.
+This function should be called before the first call to
+.Fn event_dispatch .
+The
+.Fn event_priority_set
+function can be used to assign a priority to an event.
+By default,
+.Nm libevent
+assigns the middle priority to all events unless their priority
+is explicitly set.
+.Pp
.Sh BUFFERED EVENTS
.Nm libevent
provides an abstraction on top of the regular event callbacks.
const struct eventop *evsel;
void *evbase;
-static int event_count;
+static int event_count; /* counts number of total events */
+static int event_count_active; /* counts number of active events */
/* Handle signals - This is a deprecated interface */
int (*event_sigcb)(void); /* Signal callback when gotsig is set */
static void event_process_active(void);
static RB_HEAD(event_tree, event) timetree;
-static struct event_list activequeue;
+
+/* active event management */
+static struct event_list **activequeues;
+static int nactivequeues;
+
struct event_list signalqueue;
struct event_list eventqueue;
static struct timeval event_tv;
RB_INIT(&timetree);
TAILQ_INIT(&eventqueue);
- TAILQ_INIT(&activequeue);
TAILQ_INIT(&signalqueue);
evbase = NULL;
if (getenv("EVENT_SHOW_METHOD"))
fprintf(stderr, "libevent using: %s\n", evsel->name);
+
+ /* allocate a single active event queue */
+ event_priority_init(1);
+}
+
+int
+event_priority_init(int npriorities)
+{
+ int i;
+
+ if (event_count_active)
+ return (-1);
+
+ if (nactivequeues && npriorities != nactivequeues) {
+ for (i = 0; i < nactivequeues; ++i) {
+ free(activequeues[i]);
+ }
+ free(activequeues);
+ }
+
+ /* Allocate our priority queues */
+ nactivequeues = npriorities;
+ activequeues = (struct event_list **)calloc(nactivequeues,
+ npriorities * sizeof(struct event_list *));
+ if (activequeues == NULL)
+ err(1, "%s: calloc", __func__);
+
+ for (i = 0; i < nactivequeues; ++i) {
+ activequeues[i] = malloc(sizeof(struct event_list));
+ if (activequeues[i] == NULL)
+ err(1, "%s: malloc", __func__);
+ TAILQ_INIT(activequeues[i]);
+ }
+
+ return (0);
}
int
return (event_count > 0);
}
+/*
+ * Active events are stored in priority queues. Lower priorities are always
+ * process before higher priorities. Low priority events can starve high
+ * priority ones.
+ */
+
static void
event_process_active(void)
{
struct event *ev;
+ struct event_list *activeq = NULL;
+ int i;
short ncalls;
- for (ev = TAILQ_FIRST(&activequeue); ev;
- ev = TAILQ_FIRST(&activequeue)) {
+ if (!event_count_active)
+ return;
+
+ for (i = 0; i < nactivequeues; ++i) {
+ if (TAILQ_FIRST(activequeues[i]) != NULL) {
+ activeq = activequeues[i];
+ break;
+ }
+ }
+
+ for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) {
event_queue_remove(ev, EVLIST_ACTIVE);
/* Allows deletes to work */
}
event_tv = tv;
- if (!(flags & EVLOOP_NONBLOCK))
+ if (!event_count_active && !(flags & EVLOOP_NONBLOCK))
timeout_next(&tv);
else
timerclear(&tv);
timeout_process();
- if (TAILQ_FIRST(&activequeue)) {
+ if (event_count_active) {
event_process_active();
- if (flags & EVLOOP_ONCE)
+ if (!event_count_active && (flags & EVLOOP_ONCE))
done = 1;
} else if (flags & EVLOOP_NONBLOCK)
done = 1;
ev->ev_flags = EVLIST_INIT;
ev->ev_ncalls = 0;
ev->ev_pncalls = NULL;
+
+ /* by default, we put new events into the middle priority */
+ ev->ev_pri = nactivequeues/2;
+}
+
+/*
+ * Set's the priority of an event - if an event is already scheduled
+ * changing the priority is going to fail.
+ */
+
+int
+event_priority_set(struct event *ev, int pri)
+{
+ if (ev->ev_flags & EVLIST_ACTIVE)
+ return (-1);
+ if (pri < 0 || pri >= nactivequeues)
+ return (-1);
+
+ ev->ev_pri = pri;
+
+ return (0);
}
/*
void
event_queue_remove(struct event *ev, int queue)
{
+ int docount = 1;
+
if (!(ev->ev_flags & queue))
errx(1, "%s: %p(fd %d) not on queue %x", __func__,
ev, ev->ev_fd, queue);
- if (!(ev->ev_flags & EVLIST_INTERNAL))
+ if (ev->ev_flags & EVLIST_INTERNAL)
+ docount = 0;
+
+ if (docount)
event_count--;
ev->ev_flags &= ~queue;
switch (queue) {
case EVLIST_ACTIVE:
- TAILQ_REMOVE(&activequeue, ev, ev_active_next);
+ if (docount)
+ event_count_active--;
+ TAILQ_REMOVE(activequeues[ev->ev_pri], ev, ev_active_next);
break;
case EVLIST_SIGNAL:
TAILQ_REMOVE(&signalqueue, ev, ev_signal_next);
void
event_queue_insert(struct event *ev, int queue)
{
- if (ev->ev_flags & queue)
+ int docount = 1;
+
+ if (ev->ev_flags & queue) {
+ /* Double insertion is possible for active events */
+ if (queue & EVLIST_ACTIVE)
+ return;
+
errx(1, "%s: %p(fd %d) already on queue %x", __func__,
ev, ev->ev_fd, queue);
+ }
+
+ if (ev->ev_flags & EVLIST_INTERNAL)
+ docount = 0;
- if (!(ev->ev_flags & EVLIST_INTERNAL))
+ if (docount)
event_count++;
ev->ev_flags |= queue;
switch (queue) {
case EVLIST_ACTIVE:
- TAILQ_INSERT_TAIL(&activequeue, ev, ev_active_next);
+ if (docount)
+ event_count_active++;
+ TAILQ_INSERT_TAIL(activequeues[ev->ev_pri], ev,ev_active_next);
break;
case EVLIST_SIGNAL:
TAILQ_INSERT_TAIL(&signalqueue, ev, ev_signal_next);
struct timeval ev_timeout;
+ int ev_pri; /* smaller numbers are higher priority */
+
void (*ev_callback)(int, short, void *arg);
void *ev_arg;
#define event_initialized(ev) ((ev)->ev_flags & EVLIST_INIT)
#endif
+/* These functions deal with event priorities */
+
+int event_priority_init(int);
+int event_priority_set(struct event *, int);
+
/* These functions deal with buffering input and output */
struct evbuffer {
struct bufferevent *bufferevent_new(int fd,
evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg);
+int bufferevent_priority_set(struct bufferevent *bufev, int pri);
void bufferevent_free(struct bufferevent *bufev);
int bufferevent_write(struct bufferevent *bufev, void *data, size_t size);
int bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf);
cleanup_test();
}
+struct test_pri_event {
+ struct event ev;
+ int count;
+};
+
+void
+test_priorities_cb(int fd, short what, void *arg)
+{
+ struct test_pri_event *pri = arg;
+ struct timeval tv;
+
+ if (pri->count == 3) {
+ event_loopexit(NULL);
+ return;
+ }
+
+ pri->count++;
+
+ timerclear(&tv);
+ event_add(&pri->ev, &tv);
+}
+
+void
+test_priorities(int npriorities)
+{
+ char buf[32];
+ struct test_pri_event one, two;
+ struct timeval tv;
+
+ snprintf(buf, sizeof(buf), "Priorities %d: ", npriorities);
+ setup_test(buf);
+
+ event_priority_init(npriorities);
+
+ memset(&one, 0, sizeof(one));
+ memset(&two, 0, sizeof(two));
+
+ timeout_set(&one.ev, test_priorities_cb, &one);
+ if (event_priority_set(&one.ev, 0) == -1) {
+ fprintf(stderr, "%s: failed to set priority", __func__);
+ exit(1);
+ }
+
+ timeout_set(&two.ev, test_priorities_cb, &two);
+ if (event_priority_set(&two.ev, npriorities - 1) == -1) {
+ fprintf(stderr, "%s: failed to set priority", __func__);
+ exit(1);
+ }
+
+ timerclear(&tv);
+
+ if (event_add(&one.ev, &tv) == -1)
+ exit(1);
+ if (event_add(&two.ev, &tv) == -1)
+ exit(1);
+
+ event_dispatch();
+
+ event_del(&one.ev);
+ event_del(&two.ev);
+
+ if (npriorities == 1) {
+ if (one.count == 3 && two.count == 3)
+ test_ok = 1;
+ } else if (npriorities == 2) {
+ /* Two is called once because event_loopexit is priority 1 */
+ if (one.count == 3 && two.count == 1)
+ test_ok = 1;
+ } else {
+ if (one.count == 3 && two.count == 0)
+ test_ok = 1;
+ }
+
+ cleanup_test();
+}
+
int
main (int argc, char **argv)
{
test9();
+ test_priorities(1);
+ test_priorities(2);
+ test_priorities(3);
+
return (0);
}