enum event_base_config_flag flags;
/* Notify main thread to wake up break, etc. */
+ /** True if the base already has a pending notify, and we don't need
+ * to add any more. */
+ int is_notify_pending;
/** A socketpair used by some th_notify functions to wake up the main
* thread. */
int th_notify_fd[2];
static int
evthread_notify_base(struct event_base *base)
{
+ EVENT_BASE_ASSERT_LOCKED(base);
if (!base->th_notify_fn)
return -1;
+ if (base->is_notify_pending)
+ return 0;
+ base->is_notify_pending = 1;
return base->th_notify_fn(base);
}
cb->queued = 1;
TAILQ_INSERT_TAIL(&queue->deferred_cb_list, cb, cb_next);
++queue->active_count;
- /* XXXX Can we get away with doing this only when adding
- * the first active deferred_cb to the queue? */
if (queue->notify_fn)
queue->notify_fn(queue, queue->notify_arg);
}
{
ev_uint64_t msg;
ev_ssize_t r;
+ struct event_base *base = arg;
r = read(fd, (void*) &msg, sizeof(msg));
if (r<0 && errno != EAGAIN) {
event_sock_warn(fd, "Error reading from eventfd");
}
+ EVBASE_ACQUIRE_LOCK(base, th_base_lock);
+ base->is_notify_pending = 0;
+ EVBASE_RELEASE_LOCK(base, th_base_lock);
}
#endif
evthread_notify_drain_default(evutil_socket_t fd, short what, void *arg)
{
unsigned char buf[1024];
+ struct event_base *base = arg;
#ifdef WIN32
while (recv(fd, (char*)buf, sizeof(buf), 0) > 0)
;
while (read(fd, (char*)buf, sizeof(buf)) > 0)
;
#endif
+
+ EVBASE_ACQUIRE_LOCK(base, th_base_lock);
+ base->is_notify_pending = 0;
+ EVBASE_RELEASE_LOCK(base, th_base_lock);
}
int