event_del(ev);
++n_deleted;
}
+ for (i = 0; i < base->n_common_timeouts; ++i) {
+ struct common_timeout_list *ctl =
+ base->common_timeout_queues[i];
+ event_del(&ctl->timeout_event); /* Internal; doesn't count */
+ for (ev = TAILQ_FIRST(&ctl->events); ev; ) {
+ struct event *next = TAILQ_NEXT(ev,
+ ev_timeout_pos.ev_next_with_common_timeout);
+ if (!(ev->ev_flags & EVLIST_INTERNAL)) {
+ event_del(ev);
+ ++n_deleted;
+ }
+ ev = next;
+ }
+ mm_free(ctl);
+ }
+ if (base->common_timeout_queues)
+ mm_free(base->common_timeout_queues);
for (i = 0; i < base->nactivequeues; ++i) {
for (ev = TAILQ_FIRST(base->activequeues[i]); ev; ) {
}
}
+#define MICROSECONDS_MASK 0x000fffff
+#define COMMON_TIMEOUT_IDX_MASK 0x0ff00000
+#define COMMON_TIMEOUT_IDX_SHIFT 20
+#define COMMON_TIMEOUT_MASK 0xf0000000
+#define COMMON_TIMEOUT_MAGIC 0x50000000
+
+#define COMMON_TIMEOUT_IDX(tv) \
+ (((tv)->tv_usec & COMMON_TIMEOUT_IDX_MASK)>>COMMON_TIMEOUT_IDX_SHIFT)
+
+static inline int
+is_common_timeout(const struct timeval *tv,
+ const struct event_base *base)
+{
+ int idx;
+ if ((tv->tv_usec & COMMON_TIMEOUT_MASK) != COMMON_TIMEOUT_MAGIC)
+ return 0;
+ idx = COMMON_TIMEOUT_IDX(tv);
+ return idx < base->n_common_timeouts;
+}
+
+static inline struct common_timeout_list *
+get_common_timeout_list(struct event_base *base, const struct timeval *tv)
+{
+ return base->common_timeout_queues[COMMON_TIMEOUT_IDX(tv)];
+}
+
+static inline int
+common_timeout_ok(const struct timeval *tv,
+ struct event_base *base)
+{
+ const struct timeval *expect =
+ &get_common_timeout_list(base, tv)->duration;
+ return tv->tv_sec == expect->tv_sec &&
+ tv->tv_usec == expect->tv_usec;
+}
+
+static void
+common_timeout_schedule(struct common_timeout_list *ctl,
+ const struct timeval *now, struct event *head)
+{
+ struct timeval delay;
+ struct timeval timeout = head->ev_timeout;
+ timeout.tv_usec &= MICROSECONDS_MASK;
+ evutil_timersub(&timeout, now, &delay);
+ event_add_internal(&ctl->timeout_event, &delay);
+}
+
+static void
+common_timeout_callback(evutil_socket_t fd, short what, void *arg)
+{
+ struct timeval now;
+ struct common_timeout_list *ctl = arg;
+ struct event_base *base = ctl->base;
+ struct event *ev = NULL;
+ EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+ gettime(base, &now);
+ while (1) {
+ ev = TAILQ_FIRST(&ctl->events);
+ if (!ev || ev->ev_timeout.tv_sec > now.tv_sec ||
+ (ev->ev_timeout.tv_sec == now.tv_sec &&
+ (ev->ev_timeout.tv_usec&MICROSECONDS_MASK) > now.tv_usec))
+ break;
+ event_del_internal(ev);
+ event_active_nolock(ev, EV_TIMEOUT, 1);
+ }
+ if (ev)
+ common_timeout_schedule(ctl, &now, ev);
+ EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+}
+
+#define MAX_COMMON_TIMEOUTS 256
+
+const struct timeval *
+event_base_init_common_timeout(struct event_base *base,
+ const struct timeval *duration)
+{
+ int i;
+ struct timeval tv;
+ const struct timeval *result=NULL;
+ struct common_timeout_list *new_ctl;
+
+ EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+ if (duration->tv_usec > 1000000) {
+ memcpy(&tv, duration, sizeof(struct timeval));
+ if (is_common_timeout(duration, base))
+ tv.tv_usec &= MICROSECONDS_MASK;
+ tv.tv_sec += tv.tv_usec / 1000000;
+ tv.tv_usec %= 1000000;
+ duration = &tv;
+ }
+ for (i = 0; i < base->n_common_timeouts; ++i) {
+ const struct common_timeout_list *ctl =
+ base->common_timeout_queues[i];
+ if (duration->tv_sec == ctl->duration.tv_sec &&
+ duration->tv_usec ==
+ (ctl->duration.tv_usec & MICROSECONDS_MASK)) {
+ EVUTIL_ASSERT(is_common_timeout(&ctl->duration, base));
+ result = &ctl->duration;
+ goto done;
+ }
+ }
+ if (base->n_common_timeouts == MAX_COMMON_TIMEOUTS) {
+ event_warn("%s: Too many common timeouts already in use; "
+ "we only support %d per event_base", __func__,
+ MAX_COMMON_TIMEOUTS);
+ goto done;
+ }
+ if (base->n_common_timeouts_allocated == base->n_common_timeouts) {
+ int n = base->n_common_timeouts < 16 ? 16 :
+ base->n_common_timeouts*2;
+ struct common_timeout_list **newqueues =
+ mm_realloc(base->common_timeout_queues,
+ n*sizeof(struct common_timeout_queue *));
+ if (!newqueues) {
+ event_warn("%s: realloc",__func__);
+ goto done;
+ }
+ base->n_common_timeouts_allocated = n;
+ base->common_timeout_queues = newqueues;
+ }
+ new_ctl = mm_calloc(1, sizeof(struct common_timeout_list));
+ if (!new_ctl) {
+ event_warn("%s: calloc",__func__);
+ goto done;
+ }
+ TAILQ_INIT(&new_ctl->events);
+ new_ctl->duration.tv_sec = duration->tv_sec;
+ new_ctl->duration.tv_usec =
+ duration->tv_usec | COMMON_TIMEOUT_MAGIC |
+ (base->n_common_timeouts << COMMON_TIMEOUT_IDX_SHIFT);
+ evtimer_assign(&new_ctl->timeout_event, base,
+ common_timeout_callback, new_ctl);
+ new_ctl->timeout_event.ev_flags |= EVLIST_INTERNAL;
+ event_priority_set(&new_ctl->timeout_event, 0);
+ new_ctl->base = base;
+ base->common_timeout_queues[base->n_common_timeouts++] = new_ctl;
+ result = &new_ctl->duration;
+
+done:
+ if (result)
+ EVUTIL_ASSERT(is_common_timeout(result, base));
+
+ EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+ return result;
+}
+
/*
Helper for event_process_active to process all the events in a single queue,
releasing the lock as we go. This function requires that the lock be held
/* See if there is a timeout that we should report */
if (tv != NULL && (flags & event & EV_TIMEOUT)) {
+ struct timeval tmp = ev->ev_timeout;
gettime(ev->ev_base, &now);
- evutil_timersub(&ev->ev_timeout, &now, &res);
+ tmp.tv_usec &= MICROSECONDS_MASK;
+ evutil_timersub(&tmp, &now, &res);
/* correctly remap to real time */
evutil_gettimeofday(&now, NULL);
evutil_timeradd(&now, &res, tv);
*/
if (res != -1 && tv != NULL) {
struct timeval now;
+ int common_timeout;
/*
* for persistent timeout events, we remember the
* are not replacing an existing timeout.
*/
if (ev->ev_flags & EVLIST_TIMEOUT) {
+ /* XXX I believe this is needless. */
if (min_heap_elt_is_top(ev))
notify = 1;
event_queue_remove(base, ev, EVLIST_TIMEOUT);
}
gettime(base, &now);
- evutil_timeradd(&now, tv, &ev->ev_timeout);
+ common_timeout = is_common_timeout(tv, base);
+ if (common_timeout) {
+ struct timeval tmp = *tv;
+ tmp.tv_usec &= MICROSECONDS_MASK;
+ evutil_timeradd(&now, &tmp, &ev->ev_timeout);
+ ev->ev_timeout.tv_usec |=
+ (tv->tv_usec & ~MICROSECONDS_MASK);
+ } else {
+ evutil_timeradd(&now, tv, &ev->ev_timeout);
+ }
event_debug((
"event_add: timeout in %d seconds, call %p",
(int)tv->tv_sec, ev->ev_callback));
event_queue_insert(base, ev, EVLIST_TIMEOUT);
- if (min_heap_elt_is_top(ev)) {
- /* The earliest timeout is now earlier than it was
- * before: we will need to tell the main thread to
- * wake up earlier than it would otherwise. */
- notify = 1;
+ if (common_timeout) {
+ struct common_timeout_list *ctl =
+ get_common_timeout_list(base, &ev->ev_timeout);
+ if (ev == TAILQ_FIRST(&ctl->events)) {
+ common_timeout_schedule(ctl, &now, ev);
+ }
+ } else {
+ /* See if the earliest timeout is now earlier than it
+ * was before: if so, we will need to tell the main
+ * thread to wake up earlier than it would
+ * otherwise. */
+ if (min_heap_elt_is_top(ev))
+ notify = 1;
}
}
struct event **pev;
unsigned int size;
struct timeval off;
+ int i;
if (use_monotonic)
return;
struct timeval *ev_tv = &(**pev).ev_timeout;
evutil_timersub(ev_tv, &off, ev_tv);
}
+ for (i=0; i<base->n_common_timeouts; ++i) {
+ struct event *ev;
+ struct common_timeout_list *ctl =
+ base->common_timeout_queues[i];
+ TAILQ_FOREACH(ev, &ctl->events,
+ ev_timeout_pos.ev_next_with_common_timeout) {
+ struct timeval *ev_tv = &ev->ev_timeout;
+ ev_tv->tv_usec &= MICROSECONDS_MASK;
+ evutil_timersub(ev_tv, &off, ev_tv);
+ ev_tv->tv_usec |= COMMON_TIMEOUT_MAGIC |
+ (i<<COMMON_TIMEOUT_IDX_SHIFT);
+ }
+ }
+
/* Now remember what the new time turned out to be. */
base->event_tv = *tv;
}
ev, ev_active_next);
break;
case EVLIST_TIMEOUT:
- min_heap_erase(&base->timeheap, ev);
+ if (is_common_timeout(&ev->ev_timeout, base)) {
+ struct common_timeout_list *ctl =
+ get_common_timeout_list(base, &ev->ev_timeout);
+ TAILQ_REMOVE(&ctl->events, ev,
+ ev_timeout_pos.ev_next_with_common_timeout);
+ } else {
+ min_heap_erase(&base->timeheap, ev);
+ }
break;
default:
event_errx(1, "%s: unknown queue %x", __func__, queue);
ev,ev_active_next);
break;
case EVLIST_TIMEOUT: {
- min_heap_push(&base->timeheap, ev);
+ if (is_common_timeout(&ev->ev_timeout, base)) {
+ struct common_timeout_list *ctl =
+ get_common_timeout_list(base, &ev->ev_timeout);
+ TAILQ_INSERT_TAIL(&ctl->events, ev,
+ ev_timeout_pos.ev_next_with_common_timeout);
+ } else
+ min_heap_push(&base->timeheap, ev);
break;
}
default:
void min_heap_ctor(min_heap_t* s) { s->p = 0; s->n = 0; s->a = 0; }
void min_heap_dtor(min_heap_t* s) { free(s->p); }
-void min_heap_elem_init(struct event* e) { e->min_heap_idx = -1; }
+void min_heap_elem_init(struct event* e) { e->ev_timeout_pos.min_heap_idx = -1; }
int min_heap_empty(min_heap_t* s) { return 0u == s->n; }
unsigned min_heap_size(min_heap_t* s) { return s->n; }
struct event* min_heap_top(min_heap_t* s) { return s->n ? *s->p : 0; }
{
struct event* e = *s->p;
min_heap_shift_down_(s, 0u, s->p[--s->n]);
- e->min_heap_idx = -1;
+ e->ev_timeout_pos.min_heap_idx = -1;
return e;
}
return 0;
int min_heap_elt_is_top(const struct event *e)
{
- return e->min_heap_idx == 0;
+ return e->ev_timeout_pos.min_heap_idx == 0;
}
int min_heap_erase(min_heap_t* s, struct event* e)
{
- if(((unsigned int)-1) != e->min_heap_idx)
+ if(((unsigned int)-1) != e->ev_timeout_pos.min_heap_idx)
{
struct event *last = s->p[--s->n];
- unsigned parent = (e->min_heap_idx - 1) / 2;
+ unsigned parent = (e->ev_timeout_pos.min_heap_idx - 1) / 2;
/* we replace e with the last element in the heap. We might need to
shift it upward if it is less than its parent, or downward if it is
greater than one or both its children. Since the children are known
to be less than the parent, it can't need to shift both up and
down. */
- if (e->min_heap_idx > 0 && min_heap_elem_greater(s->p[parent], last))
- min_heap_shift_up_(s, e->min_heap_idx, last);
+ if (e->ev_timeout_pos.min_heap_idx > 0 && min_heap_elem_greater(s->p[parent], last))
+ min_heap_shift_up_(s, e->ev_timeout_pos.min_heap_idx, last);
else
- min_heap_shift_down_(s, e->min_heap_idx, last);
- e->min_heap_idx = -1;
+ min_heap_shift_down_(s, e->ev_timeout_pos.min_heap_idx, last);
+ e->ev_timeout_pos.min_heap_idx = -1;
return 0;
}
return -1;
unsigned parent = (hole_index - 1) / 2;
while(hole_index && min_heap_elem_greater(s->p[parent], e))
{
- (s->p[hole_index] = s->p[parent])->min_heap_idx = hole_index;
+ (s->p[hole_index] = s->p[parent])->ev_timeout_pos.min_heap_idx = hole_index;
hole_index = parent;
parent = (hole_index - 1) / 2;
}
- (s->p[hole_index] = e)->min_heap_idx = hole_index;
+ (s->p[hole_index] = e)->ev_timeout_pos.min_heap_idx = hole_index;
}
void min_heap_shift_down_(min_heap_t* s, unsigned hole_index, struct event* e)
min_child -= min_child == s->n || min_heap_elem_greater(s->p[min_child], s->p[min_child - 1]);
if(!(min_heap_elem_greater(e, s->p[min_child])))
break;
- (s->p[hole_index] = s->p[min_child])->min_heap_idx = hole_index;
+ (s->p[hole_index] = s->p[min_child])->ev_timeout_pos.min_heap_idx = hole_index;
hole_index = min_child;
min_child = 2 * (hole_index + 1);
}
event_dispatch();
event_del(&ev);
+}
+
+static int total_common_counts;
+
+struct common_timeout_info {
+ struct event ev;
+ struct timeval called_at;
+ int which;
+ int count;
+};
+
+static void
+common_timeout_cb(int fd, short event, void *arg)
+{
+ struct common_timeout_info *ti = arg;
+ ++ti->count;
+ evutil_gettimeofday(&ti->called_at, NULL);
+ if (ti->count >= 6)
+ event_del(&ti->ev);
+}
+
+static void
+test_common_timeout(void *ptr)
+{
+ struct basic_test_data *data = ptr;
+
+ struct event_base *base = data->base;
+ int i;
+ struct common_timeout_info info[100];
+
+ struct timeval now;
+ struct timeval tmp_100_ms = { 0, 100*1000 };
+ struct timeval tmp_200_ms = { 0, 200*1000 };
+
+ const struct timeval *ms_100, *ms_200;
+
+ ms_100 = event_base_init_common_timeout(base, &tmp_100_ms);
+ ms_200 = event_base_init_common_timeout(base, &tmp_200_ms);
+ tt_assert(ms_100);
+ tt_assert(ms_200);
+ tt_ptr_op(event_base_init_common_timeout(base, &tmp_200_ms),
+ ==, ms_200);
+ tt_int_op(ms_100->tv_sec, ==, 0);
+ tt_int_op(ms_200->tv_sec, ==, 0);
+ tt_int_op(ms_100->tv_usec, ==, 100000|0x50000000);
+ tt_int_op(ms_200->tv_usec, ==, 200000|0x50100000);
+
+ total_common_counts = 0;
+
+ memset(info, 0, sizeof(info));
+
+ for (i=0; i<100; ++i) {
+ info[i].which = i;
+ event_assign(&info[i].ev, base, -1, EV_TIMEOUT|EV_PERSIST,
+ common_timeout_cb, &info[i]);
+ if (i % 2) {
+ event_add(&info[i].ev, ms_100);
+ } else {
+ event_add(&info[i].ev, ms_200);
+ }
+ }
+
+ event_base_dispatch(base);
+
+ evutil_gettimeofday(&now, NULL);
+ for (i=0; i<10; ++i) {
+ struct timeval tmp;
+ int ms_diff;
+ tt_int_op(info[i].count, ==, 6);
+ evutil_timersub(&now, &info[i].called_at, &tmp);
+ ms_diff = tmp.tv_usec/1000 + tmp.tv_sec*1000;
+ if (i % 2) {
+ tt_int_op(ms_diff, >, 500);
+ tt_int_op(ms_diff, <, 700);
+ } else {
+ tt_int_op(ms_diff, >, -100);
+ tt_int_op(ms_diff, <, 100);
+ }
+ }
+
+ /* Make sure we can free the base with some events in. */
+ for (i=0; i<100; ++i) {
+ if (i % 2) {
+ event_add(&info[i].ev, ms_100);
+ } else {
+ event_add(&info[i].ev, ms_200);
+ }
+ }
+
+end:
+ event_base_free(data->base); /* need to do this here before info is
+ * out-of-scope */
+ data->base = NULL;
}
#ifndef WIN32
/* These are still using the old API */
LEGACY(persistent_timeout, TT_FORK|TT_NEED_BASE),
LEGACY(priorities, TT_FORK|TT_NEED_BASE),
+ { "common_timeout", test_common_timeout, TT_FORK|TT_NEED_BASE,
+ &basic_setup, NULL },
/* These legacy tests may not all need all of these flags. */
LEGACY(simpleread, TT_ISOLATED),