EVBUFFER_LOCK(buffer);
buffer->cb_queue = event_base_get_deferred_cb_queue_(base);
buffer->deferred_cbs = 1;
- event_deferred_cb_init_(&buffer->deferred,
+ event_deferred_cb_init_(base, &buffer->deferred,
evbuffer_deferred_callback, buffer);
EVBUFFER_UNLOCK(buffer);
return 0;
}
if (buffer->deferred_cbs) {
- if (buffer->deferred.queued)
- return;
- evbuffer_incref_and_lock_(buffer);
- if (buffer->parent)
- bufferevent_incref_(buffer->parent);
+ if (event_deferred_cb_schedule_(buffer->cb_queue, &buffer->deferred)) {
+ evbuffer_incref_and_lock_(buffer);
+ if (buffer->parent)
+ bufferevent_incref_(buffer->parent);
+ }
EVBUFFER_UNLOCK(buffer);
- event_deferred_cb_schedule_(buffer->cb_queue, &buffer->deferred);
}
evbuffer_run_callbacks(buffer, 0);
#endif
#include "event2/event-config.h"
+#include "event2/event_struct.h"
#include "evconfig-private.h"
#include "event2/util.h"
#include "defer-internal.h"
#define SCHEDULE_DEFERRED(bevp) \
do { \
- bufferevent_incref_(&(bevp)->bev); \
- event_deferred_cb_schedule_( \
+ if (event_deferred_cb_schedule_( \
event_base_get_deferred_cb_queue_((bevp)->bev.ev_base), \
- &(bevp)->deferred); \
+ &(bevp)->deferred)) \
+ bufferevent_incref_(&(bevp)->bev); \
} while (0)
return;
if (p->options & BEV_OPT_DEFER_CALLBACKS) {
p->readcb_pending = 1;
- if (!p->deferred.queued)
- SCHEDULE_DEFERRED(p);
+ SCHEDULE_DEFERRED(p);
} else {
bufev->readcb(bufev, bufev->cbarg);
}
return;
if (p->options & BEV_OPT_DEFER_CALLBACKS) {
p->writecb_pending = 1;
- if (!p->deferred.queued)
- SCHEDULE_DEFERRED(p);
+ SCHEDULE_DEFERRED(p);
} else {
bufev->writecb(bufev, bufev->cbarg);
}
if (p->options & BEV_OPT_DEFER_CALLBACKS) {
p->eventcb_pending |= what;
p->errno_pending = EVUTIL_SOCKET_ERROR();
- if (!p->deferred.queued)
- SCHEDULE_DEFERRED(p);
+ SCHEDULE_DEFERRED(p);
} else {
bufev->errorcb(bufev, what, bufev->cbarg);
}
}
if (options & BEV_OPT_DEFER_CALLBACKS) {
if (options & BEV_OPT_UNLOCK_CALLBACKS)
- event_deferred_cb_init_(&bufev_private->deferred,
+ event_deferred_cb_init_(
+ bufev->ev_base,
+ &bufev_private->deferred,
bufferevent_run_deferred_callbacks_unlocked,
bufev_private);
else
- event_deferred_cb_init_(&bufev_private->deferred,
+ event_deferred_cb_init_(
+ bufev->ev_base,
+ &bufev_private->deferred,
bufferevent_run_deferred_callbacks_locked,
bufev_private);
}
#include <sys/queue.h>
-struct deferred_cb;
+#define deferred_cb event_callback
+#define deferred_cb_queue event_base
+struct event_callback;
-typedef void (*deferred_cb_fn)(struct deferred_cb *, void *);
-
-/** A deferred_cb is a callback that can be scheduled to run as part of
- * an event_base's event_loop, rather than running immediately. */
-struct deferred_cb {
- /** Links to the adjacent active (pending) deferred_cb objects. */
- TAILQ_ENTRY (deferred_cb) cb_next;
- /** True iff this deferred_cb is pending in an event_base. */
- unsigned queued : 1;
- /** The function to execute when the callback runs. */
- deferred_cb_fn cb;
- /** The function's second argument. */
- void *arg;
-};
-
-/** A deferred_cb_queue is a list of deferred_cb that we can add to and run. */
-struct deferred_cb_queue {
- /** Lock used to protect the queue. */
- void *lock;
-
- /** Which event_base does this queue associate itself with?
- * (Used for timing) */
- struct event_base *base;
-
- /** How many entries are in the queue? */
- int active_count;
-
- /** Function called when adding to the queue from another thread. */
- void (*notify_fn)(struct deferred_cb_queue *, void *);
- void *notify_arg;
-
- /** Deferred callback management: a list of deferred callbacks to
- * run active the active events. */
- TAILQ_HEAD (deferred_cb_list, deferred_cb) deferred_cb_list;
-};
+typedef void (*deferred_cb_fn)(struct event_callback *, void *);
/**
Initialize an empty, non-pending deferred_cb.
@param cb The function to run when the deferred_cb executes.
@param arg The function's second argument.
*/
-void event_deferred_cb_init_(struct deferred_cb *, deferred_cb_fn, void *);
+void event_deferred_cb_init_(struct event_base *base, struct event_callback *, deferred_cb_fn, void *);
/**
Cancel a deferred_cb if it is currently scheduled in an event_base.
*/
-void event_deferred_cb_cancel_(struct deferred_cb_queue *, struct deferred_cb *);
+void event_deferred_cb_cancel_(struct event_base *, struct event_callback *);
/**
Activate a deferred_cb if it is not currently scheduled in an event_base.
- */
-void event_deferred_cb_schedule_(struct deferred_cb_queue *, struct deferred_cb *);
-#define LOCK_DEFERRED_QUEUE(q) \
- EVLOCK_LOCK((q)->lock, 0)
-#define UNLOCK_DEFERRED_QUEUE(q) \
- EVLOCK_UNLOCK((q)->lock, 0)
+ Return true iff it was not previously scheduled.
+ */
+int event_deferred_cb_schedule_(struct event_base *, struct event_callback *);
#ifdef __cplusplus
}
#endif
-void event_deferred_cb_queue_init_(struct deferred_cb_queue *);
-struct deferred_cb_queue *event_base_get_deferred_cb_queue_(struct event_base *);
+#define event_base_get_deferred_cb_queue_(x) (x)
#endif /* EVENT_INTERNAL_H_INCLUDED_ */
#include "event2/event-config.h"
#include "evconfig-private.h"
#include "event2/util.h"
+#include "event2/event_struct.h"
#include "util-internal.h"
#include "defer-internal.h"
d->handle = req->handle;
}
- event_deferred_cb_init_(&d->deferred, reply_run_callback,
+ event_deferred_cb_init_(req->base->event_base,
+ &d->deferred, reply_run_callback,
req->user_pointer);
event_deferred_cb_schedule_(
event_base_get_deferred_cb_queue_(req->base->event_base),
#define ev_pri ev_evcallback.evcb_pri
#define ev_flags ev_evcallback.evcb_flags
#define ev_closure ev_evcallback.evcb_closure
-#define ev_callback ev_evcallback.evcb_callback
+#define ev_callback ev_evcallback.evcb_cb_union.evcb_callback
#define ev_arg ev_evcallback.evcb_arg
/* Possible values for evcb_closure in struct event_callback */
#define EV_CLOSURE_EVENT 0
#define EV_CLOSURE_EVENT_SIGNAL 1
#define EV_CLOSURE_EVENT_PERSIST 2
+#define EV_CLOSURE_CB_SELF 3
/** Structure to define the backend of a given event_base. */
struct eventop {
/** The total size of common_timeout_queues. */
int n_common_timeouts_allocated;
- /** List of defered_cb that are active. We run these after the active
- * events. */
- struct deferred_cb_queue defer_queue;
-
/** Mapping from file descriptors to enabled (added) events */
struct event_io_map io;
#endif /* TAILQ_FOREACH */
#define N_ACTIVE_CALLBACKS(base) \
- ((base)->event_count_active + (base)->defer_queue.active_count)
+ ((base)->event_count_active)
int evsig_set_handler_(struct event_base *base, int evsignal,
void (*fn)(int));
void event_active_nolock_(struct event *ev, int res, short count);
-void event_callback_activate_nolock_(struct event_base *, struct event_callback *);
+int event_callback_activate_(struct event_base *, struct event_callback *);
+int event_callback_activate_nolock_(struct event_base *, struct event_callback *);
int event_callback_cancel_(struct event_base *base,
struct event_callback *evcb);
return base->evsel->features;
}
-void
-event_deferred_cb_queue_init_(struct deferred_cb_queue *cb)
-{
- memset(cb, 0, sizeof(struct deferred_cb_queue));
- TAILQ_INIT(&cb->deferred_cb_list);
-}
-
-/** Helper for the deferred_cb queue: wake up the event base. */
-static void
-notify_base_cbq_callback(struct deferred_cb_queue *cb, void *baseptr)
-{
- struct event_base *base = baseptr;
- if (EVBASE_NEED_NOTIFY(base))
- evthread_notify_base(base);
-}
-
-struct deferred_cb_queue *
-event_base_get_deferred_cb_queue_(struct event_base *base)
-{
- return base ? &base->defer_queue : NULL;
-}
-
void
event_enable_debug_mode(void)
{
base->th_notify_fd[0] = -1;
base->th_notify_fd[1] = -1;
- event_deferred_cb_queue_init_(&base->defer_queue);
- base->defer_queue.base = base;
- base->defer_queue.notify_fn = notify_base_cbq_callback;
- base->defer_queue.notify_arg = base;
-
TAILQ_INIT(&base->active_later_queue);
evmap_io_initmap_(&base->io);
int r;
EVTHREAD_ALLOC_LOCK(base->th_base_lock,
EVTHREAD_LOCKTYPE_RECURSIVE);
- base->defer_queue.lock = base->th_base_lock;
EVTHREAD_ALLOC_COND(base->current_event_cond);
r = evthread_make_base_notifiable(base);
if (r<0) {
event_queue_remove_active(base, evcb);
event_debug(("event_process_active: event_callback %p, "
"closure %d, call %p",
- evcb, evcb->evcb_closure, evcb->evcb_callback));
+ evcb, evcb->evcb_closure, evcb->evcb_cb_union.evcb_callback));
}
if (!(evcb->evcb_flags & EVLIST_INTERNAL))
(*ev->ev_callback)(
ev->ev_fd, ev->ev_res, ev->ev_arg);
break;
+ case EV_CLOSURE_CB_SELF:
+ EVBASE_RELEASE_LOCK(base, th_base_lock);
+ evcb->evcb_cb_union.evcb_selfcb(evcb, evcb->evcb_arg);
+ break;
default:
EVUTIL_ASSERT(0);
}
return count;
}
-/*
- Process up to MAX_DEFERRED of the defered_cb entries in 'queue'. If
- *breakptr becomes set to 1, stop. Requires that we start out holding
- the lock on 'queue'; releases the lock around 'queue' for each deferred_cb
- we process.
- */
-static int
-event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr,
- int max_to_process, const struct timeval *endtime)
-{
- int count = 0;
- struct deferred_cb *cb;
-#define MAX_DEFERRED 16
- if (max_to_process > MAX_DEFERRED)
- max_to_process = MAX_DEFERRED;
-#undef MAX_DEFERRED
-
- while ((cb = TAILQ_FIRST(&queue->deferred_cb_list))) {
- cb->queued = 0;
- TAILQ_REMOVE(&queue->deferred_cb_list, cb, cb_next);
- --queue->active_count;
- UNLOCK_DEFERRED_QUEUE(queue);
-
- cb->cb(cb, cb->arg);
-
- LOCK_DEFERRED_QUEUE(queue);
- if (*breakptr)
- return -1;
- if (++count >= max_to_process)
- break;
- if (endtime) {
- struct timeval now;
- update_time_cache(queue->base);
- gettime(queue->base, &now);
- if (evutil_timercmp(&now, endtime, >=))
- return count;
- }
- }
- return count;
-}
-
/*
* Active events are stored in priority queues. Lower priorities are always
* process before higher priorities. Low priority events can starve high
}
}
- event_process_deferred_callbacks(&base->defer_queue,&base->event_break,
- maxcb-c, endtime);
base->event_running_priority = -1;
return c;
}
event_callback_activate_later_nolock_(base, event_to_event_callback(ev));
}
-void
+int
+event_callback_activate_(struct event_base *base,
+ struct event_callback *evcb)
+{
+ int r;
+ EVBASE_ACQUIRE_LOCK(base, th_base_lock);
+ r = event_callback_activate_nolock_(base, evcb);
+ EVBASE_RELEASE_LOCK(base, th_base_lock);
+ return r;
+}
+
+int
event_callback_activate_nolock_(struct event_base *base,
struct event_callback *evcb)
{
- if (evcb->evcb_flags & EVLIST_ACTIVE_LATER)
+ int r = 1;
+
+ switch (evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) {
+ default:
+ EVUTIL_ASSERT(0);
+ case EVLIST_ACTIVE_LATER:
event_queue_remove_active_later(base, evcb);
+ r = 0;
+ break;
+ case EVLIST_ACTIVE:
+ return 0;
+ case 0:
+ break;
+ }
event_queue_insert_active(base, evcb);
if (EVBASE_NEED_NOTIFY(base))
evthread_notify_base(base);
+
+ return r;
}
void
}
void
-event_deferred_cb_init_(struct deferred_cb *cb, deferred_cb_fn fn, void *arg)
+event_deferred_cb_init_(struct event_base *base, struct event_callback *cb, deferred_cb_fn fn, void *arg)
{
- memset(cb, 0, sizeof(struct deferred_cb));
- cb->cb = fn;
- cb->arg = arg;
+ if (!base)
+ base = current_base;
+ memset(cb, 0, sizeof(*cb));
+ cb->evcb_cb_union.evcb_selfcb = fn;
+ cb->evcb_arg = arg;
+ cb->evcb_pri = base->nactivequeues - 1;
+ cb->evcb_closure = EV_CLOSURE_CB_SELF;
}
void
-event_deferred_cb_cancel_(struct deferred_cb_queue *queue,
- struct deferred_cb *cb)
+event_deferred_cb_cancel_(struct event_base *base, struct event_callback *cb)
{
- if (!queue) {
- if (current_base)
- queue = ¤t_base->defer_queue;
- else
- return;
- }
-
- LOCK_DEFERRED_QUEUE(queue);
- if (cb->queued) {
- TAILQ_REMOVE(&queue->deferred_cb_list, cb, cb_next);
- --queue->active_count;
- cb->queued = 0;
- }
- UNLOCK_DEFERRED_QUEUE(queue);
+ if (!base)
+ base = current_base;
+ event_callback_cancel_(base, cb);
}
-void
-event_deferred_cb_schedule_(struct deferred_cb_queue *queue,
- struct deferred_cb *cb)
+int
+event_deferred_cb_schedule_(struct event_base *base, struct event_callback *cb)
{
- if (!queue) {
- if (current_base)
- queue = ¤t_base->defer_queue;
- else
- return;
- }
-
- LOCK_DEFERRED_QUEUE(queue);
- if (!cb->queued) {
- cb->queued = 1;
- TAILQ_INSERT_TAIL(&queue->deferred_cb_list, cb, cb_next);
- ++queue->active_count;
- if (queue->notify_fn)
- queue->notify_fn(queue, queue->notify_arg);
- }
- UNLOCK_DEFERRED_QUEUE(queue);
+ if (!base)
+ base = current_base;
+ return event_callback_activate_(base, cb);
}
static int
bufferevent_base_set(base, evcon->bufev);
}
- event_deferred_cb_init_(&evcon->read_more_deferred_cb,
+ event_deferred_cb_init_(evcon->base,
+ &evcon->read_more_deferred_cb,
evhttp_deferred_read_cb, evcon);
evcon->dns_base = dnsbase;
ev_uint8_t evcb_pri; /* smaller numbers are higher priority */
ev_uint8_t evcb_closure;
/* allows us to adopt for different types of events */
- void (*evcb_callback)(evutil_socket_t, short, void *arg);
+ union {
+ void (*evcb_callback)(evutil_socket_t, short, void *arg);
+ void (*evcb_selfcb)(struct event_callback *, void *arg);
+ } evcb_cb_union;
void *evcb_arg;
};
#include "event2/buffer_compat.h"
#include "event2/util.h"
+#include "defer-internal.h"
#include "evbuffer-internal.h"
#include "log-internal.h"
size_t i;
for (i = 0; i < CB_COUNT; ++i) {
- event_deferred_cb_init_(&data->cbs[i], deferred_callback, NULL);
+ event_deferred_cb_init_(data->queue, &data->cbs[i], deferred_callback, NULL);
event_deferred_cb_schedule_(data->queue, &data->cbs[i]);
SLEEP_MS(1);
}