evbuffer_defer_callbacks(struct evbuffer *buffer, struct event_base *base)
{
EVBUFFER_LOCK(buffer, EVTHREAD_WRITE);
- buffer->ev_base = base;
+ buffer->cb_queue = event_base_get_deferred_cb_queue(base);
buffer->deferred_cbs = 1;
event_deferred_cb_init(&buffer->deferred,
evbuffer_deferred_callback, buffer);
if (buffer->deferred.queued)
return;
_evbuffer_incref(buffer);
- event_deferred_cb_schedule(buffer->ev_base, &buffer->deferred);
+ event_deferred_cb_schedule(buffer->cb_queue, &buffer->deferred);
} else {
evbuffer_run_callbacks(buffer);
}
}
evbuffer_remove_all_callbacks(buffer);
if (buffer->deferred_cbs)
- event_deferred_cb_cancel(buffer->ev_base, &buffer->deferred);
+ event_deferred_cb_cancel(buffer->cb_queue, &buffer->deferred);
EVBUFFER_UNLOCK(buffer, EVTHREAD_WRITE);
if (buffer->own_lock)
_bufferevent_decref_and_unlock(bufev);
}
+#define SCHEDULE_DEFERRED(bevp) \
+ do { \
+ event_deferred_cb_schedule( \
+ event_base_get_deferred_cb_queue((bevp)->bev.ev_base), \
+ &(bevp)->deferred); \
+ } while (0);
+
+
void
_bufferevent_run_readcb(struct bufferevent *bufev)
{
p->readcb_pending = 1;
if (!p->deferred.queued) {
bufferevent_incref(bufev);
- event_deferred_cb_schedule(
- bufev->ev_base, &p->deferred);
+ SCHEDULE_DEFERRED(p);
}
} else {
bufev->readcb(bufev, bufev->cbarg);
p->writecb_pending = 1;
if (!p->deferred.queued) {
bufferevent_incref(bufev);
- event_deferred_cb_schedule(
- bufev->ev_base, &p->deferred);
+ SCHEDULE_DEFERRED(p);
}
} else {
bufev->writecb(bufev, bufev->cbarg);
p->errno_pending = EVUTIL_SOCKET_ERROR();
if (!p->deferred.queued) {
bufferevent_incref(bufev);
- event_deferred_cb_schedule(
- bufev->ev_base, &p->deferred);
+ SCHEDULE_DEFERRED(p);
}
} else {
bufev->errorcb(bufev, what, bufev->cbarg);
void *arg;
};
+
+struct deferred_cb_queue {
+ void *lock;
+
+ int active_count;
+
+ void (*notify_fn)(struct deferred_cb_queue *, void *);
+ void *notify_arg;
+
+ /** Deferred callback management: a list of deferred callbacks to
+ * run active the active events. */
+ TAILQ_HEAD (deferred_cb_list, deferred_cb) deferred_cb_list;
+};
+
/**
Initialize an empty, non-pending deferred_cb.
/**
Cancel a deferred_cb if it is currently scheduled in an event_base.
*/
-void event_deferred_cb_cancel(struct event_base *, struct deferred_cb *);
+void event_deferred_cb_cancel(struct deferred_cb_queue *, struct deferred_cb *);
/**
Activate a deferred_cb if it is not currently scheduled in an event_base.
*/
-void event_deferred_cb_schedule(struct event_base *, struct deferred_cb *);
+void event_deferred_cb_schedule(struct deferred_cb_queue *, struct deferred_cb *);
+
+#define LOCK_DEFERRED_QUEUE(q) \
+ do { \
+ if ((q)->lock) \
+ _evthread_locking_fn(EVTHREAD_LOCK|EVTHREAD_WRITE, \
+ (q)->lock); \
+ } while (0)
+
+#define UNLOCK_DEFERRED_QUEUE(q) \
+ do { \
+ if ((q)->lock) \
+ _evthread_locking_fn(EVTHREAD_UNLOCK|EVTHREAD_WRITE, \
+ (q)->lock); \
+ } while (0)
#ifdef __cplusplus
}
#endif
+void event_deferred_cb_queue_init(struct deferred_cb_queue *);
+struct deferred_cb_queue *event_base_get_deferred_cb_queue(struct event_base *);
+
#endif /* _EVENT_INTERNAL_H_ */
return base->evsel->features;
}
+void
+event_deferred_cb_queue_init(struct deferred_cb_queue *cb)
+{
+ memset(cb, 0, sizeof(struct deferred_cb_queue));
+ TAILQ_INIT(&cb->deferred_cb_list);
+}
+
+static void
+notify_base_cbq_callback(struct deferred_cb_queue *cb, void *baseptr)
+{
+ struct event_base *base = baseptr;
+ if (!EVBASE_IN_THREAD(base))
+ evthread_notify_base(base);
+}
+
+struct deferred_cb_queue *
+event_base_get_deferred_cb_queue(struct event_base *base)
+{
+ return base ? &base->defer_queue : NULL;
+}
+
struct event_base *
event_base_new_with_config(struct event_config *cfg)
{
min_heap_ctor(&base->timeheap);
TAILQ_INIT(&base->eventqueue);
- TAILQ_INIT(&base->deferred_cb_list);
base->sig.ev_signal_pair[0] = -1;
base->sig.ev_signal_pair[1] = -1;
+ event_deferred_cb_queue_init(&base->defer_queue);
+ base->defer_queue.notify_fn = notify_base_cbq_callback;
+ base->defer_queue.notify_arg = base;
+
evmap_io_initmap(&base->io);
evmap_signal_initmap(&base->sigmap);
if (!cfg || !(cfg->flags & EVENT_BASE_FLAG_NOLOCK)) {
int r;
EVTHREAD_ALLOC_LOCK(base->th_base_lock);
+ base->defer_queue.lock = base->th_base_lock;
EVTHREAD_ALLOC_LOCK(base->current_event_lock);
r = evthread_make_base_notifiable(base);
if (r<0) {
{
int i;
- if (base->event_count_active || npriorities < 1
+ if (N_ACTIVE_CALLBACKS(base) || npriorities < 1
|| npriorities >= EVENT_MAX_PRIORITIES)
return (-1);
}
static int
-event_process_deferred_callbacks(struct event_base *base)
+event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr)
{
int count = 0;
struct deferred_cb *cb;
- while ((cb = TAILQ_FIRST(&base->deferred_cb_list))) {
+ while ((cb = TAILQ_FIRST(&queue->deferred_cb_list))) {
cb->queued = 0;
- TAILQ_REMOVE(&base->deferred_cb_list, cb, cb_next);
- --base->event_count_active;
- EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+ TAILQ_REMOVE(&queue->deferred_cb_list, cb, cb_next);
+ --queue->active_count;
+ UNLOCK_DEFERRED_QUEUE(queue);
cb->cb(cb, cb->arg);
++count;
- if (base->event_break)
+ if (*breakptr)
return -1;
- EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+ LOCK_DEFERRED_QUEUE(queue);
}
return count;
}
}
}
- event_process_deferred_callbacks(base);
+ event_process_deferred_callbacks(&base->defer_queue,&base->event_break);
unlock:
EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
timeout_correct(base, &tv);
tv_p = &tv;
- if (!base->event_count_active && !(flags & EVLOOP_NONBLOCK)) {
+ if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) {
timeout_next(base, &tv_p);
} else {
/*
}
/* If we have no events, we just exit */
- if (!event_haveevents(base) && !base->event_count_active) {
+ if (!event_haveevents(base) && !N_ACTIVE_CALLBACKS(base)) {
event_debug(("%s: no events registered.", __func__));
return (1);
}
timeout_process(base);
- if (base->event_count_active) {
+ if (N_ACTIVE_CALLBACKS(base)) {
event_process_active(base);
if (!base->event_count_active && (flags & EVLOOP_ONCE))
done = 1;
}
void
-event_deferred_cb_cancel(struct event_base *base, struct deferred_cb *cb)
+event_deferred_cb_cancel(struct deferred_cb_queue *queue,
+ struct deferred_cb *cb)
{
- if (!base)
- base = current_base;
+ if (!queue) {
+ if (current_base)
+ queue = ¤t_base->defer_queue;
+ else
+ return;
+ }
- EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+ LOCK_DEFERRED_QUEUE(queue);
if (cb->queued) {
- TAILQ_REMOVE(&base->deferred_cb_list, cb, cb_next);
- --base->event_count_active;
+ TAILQ_REMOVE(&queue->deferred_cb_list, cb, cb_next);
+ --queue->active_count;
cb->queued = 0;
}
- EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+ UNLOCK_DEFERRED_QUEUE(queue);
}
void
-event_deferred_cb_schedule(struct event_base *base, struct deferred_cb *cb)
+event_deferred_cb_schedule(struct deferred_cb_queue *queue,
+ struct deferred_cb *cb)
{
- if (!base)
- base = current_base;
- EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+ if (!queue) {
+ if (current_base)
+ queue = ¤t_base->defer_queue;
+ else
+ return;
+ }
+
+ LOCK_DEFERRED_QUEUE(queue);
if (!cb->queued) {
cb->queued = 1;
- TAILQ_INSERT_TAIL(&base->deferred_cb_list, cb, cb_next);
- ++base->event_count_active;
- if (!EVBASE_IN_THREAD(base))
- evthread_notify_base(base);
+ TAILQ_INSERT_TAIL(&queue->deferred_cb_list, cb, cb_next);
+ ++queue->active_count;
+ if (queue->notify_fn)
+ queue->notify_fn(queue, queue->notify_arg);
}
- EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+ UNLOCK_DEFERRED_QUEUE(queue);
}
static int