o Fix some bugs when using the old evdns interfaces to initialize the evdns module.
o Detect errors during bufferevent_connect(). Patch from Christopher Davis.
o Fix compilation for listener.h for C++ - missing extern "C". Patch from Ferenc Szalai.
+ o Make the event_base_loop() family of functions respect thread-safety better. This should clear up a few hard-to-debug race conditions.
+ o Fix a bug when using a specialized memory allocator on win32.
Changes in 2.0.2-alpha:
struct win32op {
int fd_setsz;
+ int resize_out_sets;
struct win_fd_set *readset_in;
struct win_fd_set *writeset_in;
struct win_fd_set *readset_out;
assert(new_size >= 1);
size = FD_SET_ALLOC_SIZE(new_size);
- if (!(op->readset_in = realloc(op->readset_in, size)))
+ if (!(op->readset_in = mm_realloc(op->readset_in, size)))
return (-1);
- if (!(op->writeset_in = realloc(op->writeset_in, size)))
- return (-1);
- if (!(op->readset_out = realloc(op->readset_out, size)))
- return (-1);
- if (!(op->exset_out = realloc(op->exset_out, size)))
- return (-1);
- if (!(op->writeset_out = realloc(op->writeset_out, size)))
+ if (!(op->writeset_in = mm_realloc(op->writeset_in, size)))
return (-1);
+ op->resize_out_sets = 1;
op->fd_setsz = new_size;
return (0);
}
int fd_count;
SOCKET s;
+ if (op->resize_out_sets) {
+ if (!(op->readset_out = mm_realloc(op->readset_out, size)))
+ return (-1);
+ if (!(op->exset_out = mm_realloc(op->exset_out, size)))
+ return (-1);
+ if (!(op->writeset_out = mm_realloc(op->writeset_out, size)))
+ return (-1);
+ op->resize_out_sets = 0;
+ }
+
fd_set_copy(win32op->readset_out, win32op->readset_in);
fd_set_copy(win32op->exset_out, win32op->readset_in);
fd_set_copy(win32op->writeset_out, win32op->writeset_in);
return (0);
}
+ EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
res = select(fd_count,
(struct fd_set*)win32op->readset_out,
(struct fd_set*)win32op->writeset_out,
(struct fd_set*)win32op->exset_out, tv);
+ EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
event_debug(("%s: select returned %d", __func__, res));
if(res <= 0) {
devpollop->dpfd = dpfd;
/* Initialize fields */
+ /* FIXME: allocating 'nfiles' worth of space here can be
+ * expensive and unnecessary. See how epoll.c does it instead. */
devpollop->events = mm_calloc(nfiles, sizeof(struct pollfd));
if (devpollop->events == NULL) {
mm_free(devpollop);
dvp.dp_nfds = devpollop->nevents;
dvp.dp_timeout = timeout;
+ EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
res = ioctl(devpollop->dpfd, DP_POLL, &dvp);
+ EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
if (res == -1) {
if (errno != EINTR) {
event_warn("ioctl: DP_POLL");
#include "event-internal.h"
#include "evsignal-internal.h"
+#include "event2/thread.h"
+#include "evthread-internal.h"
#include "log-internal.h"
#include "evmap-internal.h"
timeout = MAX_EPOLL_TIMEOUT_MSEC;
}
+ EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout);
+ EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
if (res == -1) {
if (errno != EINTR) {
event_warn("epoll_wait");
static int
event_haveevents(struct event_base *base)
{
+ /* Caller must hold th_base_lock */
return (base->event_count > 0);
}
static void
event_process_active(struct event_base *base)
{
+ /* Caller must hold th_base_lock */
struct event_list *activeq = NULL;
int i, c;
- EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
-
for (i = 0; i < base->nactivequeues; ++i) {
if (TAILQ_FIRST(base->activequeues[i]) != NULL) {
activeq = base->activequeues[i];
c = event_process_active_single_queue(base, activeq);
if (c < 0)
- goto unlock;
+ return;
else if (c > 0)
break; /* Processed a real event; do not
* consider lower-priority events */
}
event_process_deferred_callbacks(&base->defer_queue,&base->event_break);
-
-unlock:
- EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
}
/*
struct timeval *tv_p;
int res, done;
+ /* Grab the lock. We will release it inside evsel.dispatch, and again
+ * as we invoke user callbacks. */
+ EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
/* clear time cache */
base->tv_cache.tv_sec = 0;
/* clear time cache */
base->tv_cache.tv_sec = 0;
+ EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
event_debug(("%s: asked to terminate loop.", __func__));
return (0);
}
static int
timeout_next(struct event_base *base, struct timeval **tv_p)
{
+ /* Caller must hold th_base_lock */
struct timeval now;
struct event *ev;
struct timeval *tv = *tv_p;
int res = 0;
- EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
ev = min_heap_top(&base->timeheap);
if (ev == NULL) {
event_debug(("timeout_next: in %d seconds", (int)tv->tv_sec));
out:
- EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
return (res);
}
static void
timeout_correct(struct event_base *base, struct timeval *tv)
{
+ /* Caller must hold th_base_lock. */
struct event **pev;
unsigned int size;
struct timeval off;
/* Check if time is running backwards */
gettime(base, tv);
- EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
if (evutil_timercmp(tv, &base->event_tv, >=)) {
base->event_tv = *tv;
- EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
return;
}
}
/* Now remember what the new time turned out to be. */
base->event_tv = *tv;
- EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
}
static void
timeout_process(struct event_base *base)
{
+ /* Caller must hold lock. */
struct timeval now;
struct event *ev;
- EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
if (min_heap_empty(&base->timeheap)) {
EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
return;
ev->ev_callback));
event_active_internal(ev, EV_TIMEOUT, 1);
}
- EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
}
static void
}
}
- if ((res = port_getn(epdp->ed_port, pevtlist, EVENTS_PER_GETN,
- (unsigned int *) &nevents, ts_p)) == -1) {
+ EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
+ res = port_getn(epdp->ed_port, pevtlist, EVENTS_PER_GETN,
+ (unsigned int *) &nevents, ts_p);
+
+ EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
+ if (res == -1) {
if (errno == EINTR || errno == EAGAIN) {
evsig_process(base);
return (0);
struct kqop {
struct kevent *changes;
int nchanges;
+ int changes_size;
+ struct kevent *pend_changes;
+ int n_pend_changes;
+ int pend_changes_size;
+
struct kevent *events;
- int nevents;
+ int events_size;
int kq;
pid_t pid;
};
mm_free (kqueueop);
return (NULL);
}
+ kqueueop->pend_changes = mm_malloc(NEVENT * sizeof(struct kevent));
+ if (kqueueop->pendchanges == NULL) {
+ mm_free (kqueueop->changes);
+ mm_free (kqueueop);
+ return (NULL);
+ }
kqueueop->events = mm_malloc(NEVENT * sizeof(struct kevent));
if (kqueueop->events == NULL) {
mm_free (kqueueop->changes);
+ mm_free (kqueueop->pend_changes);
mm_free (kqueueop);
return (NULL);
}
- kqueueop->nevents = NEVENT;
+ kqueueop->events_size = kqueueop->changes_size =
+ kqueueop->pend_changes_size = NEVENT;
/* Check for Mac OS X kqueue bug. */
kqueueop->changes[0].ident = -1;
static int
kq_insert(struct kqop *kqop, struct kevent *kev)
{
- int nevents = kqop->nevents;
+ int size = kqop->changes_size;
- if (kqop->nchanges == nevents) {
+ if (kqop->nchanges == size) {
struct kevent *newchange;
- struct kevent *newresult;
- nevents *= 2;
+ size *= 2;
newchange = mm_realloc(kqop->changes,
- nevents * sizeof(struct kevent));
+ size * sizeof(struct kevent));
if (newchange == NULL) {
event_warn("%s: malloc", __func__);
return (-1);
}
kqop->changes = newchange;
-
- newresult = mm_realloc(kqop->events,
- nevents * sizeof(struct kevent));
-
- /*
- * If we fail, we don't have to worry about freeing,
- * the next realloc will pick it up.
- */
- if (newresult == NULL) {
- event_warn("%s: malloc", __func__);
- return (-1);
- }
- kqop->events = newresult;
-
- kqop->nevents = nevents;
+ kqop->changes_size = size;
}
memcpy(&kqop->changes[kqop->nchanges++], kev, sizeof(struct kevent));
/* Do nothing here */
}
+#define SWAP(tp,a,b) \
+ do { \
+ tp tmp_swap_var = (a); \
+ a = b; \
+ b = tmp_swap_var; \
+ } while (0);
+
static int
kq_dispatch(struct event_base *base, struct timeval *tv)
{
struct kqop *kqop = base->evbase;
- struct kevent *changes = kqop->changes;
struct kevent *events = kqop->events;
struct timespec ts, *ts_p = NULL;
int i, res;
ts_p = &ts;
}
- res = kevent(kqop->kq, changes, kqop->nchanges,
- events, kqop->nevents, ts_p);
- kqop->nchanges = 0;
+ /* We can't hold the lock while we're calling kqueue, so another
+ * thread might potentially mess with changes before the kernel has a
+ * chance to read it. Therefore, we need to keep the change list
+ * we're looking at in pend_changes, and let other threads mess with
+ * changes. */
+ SWAP(struct kevent *, kqop->changes, kqop->pend_changes);
+ SWAP(int, kqop->nchanges, kqop->npend_changes);
+ SWAP(int, kqop->changes_size, kqop->pend_changes_size);
+
+ EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
+ res = kevent(kqop->kq, kqop->pend_changes, kqop->npend_changes,
+ events, kqop->events_size, ts_p);
+
+ EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
+ kqop->npend_changes = 0;
if (res == -1) {
if (errno != EINTR) {
event_warn("kevent");
}
}
+ if (res == kqop->nevents) {
+ struct kevent *newresult;
+ int size = kqop->events_size;
+ /* We used all the events space that we have. Maybe we should
+ make it bigger. */
+ size *= 2;
+ newresult = mm_realloc(kqop->events,
+ size * sizeof(struct kevent));
+ if (newresult) {
+ kqop->events = newresult;
+ kqop->events_size = size;
+ }
+ }
+
return (0);
}
#include "evsignal-internal.h"
#include "log-internal.h"
#include "evmap-internal.h"
+#include "event2/thread.h"
+#include "evthread-internal.h"
struct pollidx {
int idxplus1;
struct pollop {
int event_count; /* Highest number alloc */
- int nfds; /* Size of event_* */
+ int nfds; /* Highest number used */
+ int realloc_copy; /* True iff we must realloc
+ * event_set_copy */
struct pollfd *event_set;
+ struct pollfd *event_set_copy;
};
static void *poll_init (struct event_base *);
{
int res, i, j, msec = -1, nfds;
struct pollop *pop = base->evbase;
+ struct pollfd *event_set;
poll_check_ok(pop);
+ nfds = pop->nfds;
+
+ if (base->th_base_lock) {
+ /* If we're using this backend in a multithreaded setting,
+ * then we need to work on a copy of event_set, so that we can
+ * let other threads modify the main event_set while we're
+ * polling. If we're not multithreaded, then we'll skip the
+ * copy step here to save memory and time. */
+ if (pop->realloc_copy) {
+ struct pollfd *tmp = mm_realloc(pop->event_set_copy,
+ pop->event_count * sizeof(struct pollfd));
+ if (tmp == NULL) {
+ event_warn("realloc");
+ return -1;
+ }
+ pop->event_set_copy = tmp;
+ pop->realloc_copy = 0;
+ }
+ memcpy(pop->event_set_copy, pop->event_set,
+ sizeof(struct pollfd)*nfds);
+ event_set = pop->event_set_copy;
+ } else {
+ event_set = pop->event_set;
+ }
+
if (tv != NULL)
msec = tv->tv_sec * 1000 + (tv->tv_usec + 999) / 1000;
- nfds = pop->nfds;
- res = poll(pop->event_set, nfds, msec);
+ EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
+ res = poll(event_set, nfds, msec);
+
+ EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
if (res == -1) {
if (errno != EINTR) {
int what;
if (++i == nfds)
i = 0;
- what = pop->event_set[i].revents;
+ what = event_set[i].revents;
if (!what)
continue;
if (res == 0)
continue;
- evmap_io_active(base, pop->event_set[i].fd, res);
+ evmap_io_active(base, event_set[i].fd, res);
}
return (0);
pop->event_set = tmp_event_set;
pop->event_count = tmp_event_count;
+ pop->realloc_copy = 1;
}
i = idx->idxplus1 - 1;
evsig_dealloc(base);
if (pop->event_set)
mm_free(pop->event_set);
+ if (pop->event_set_copy)
+ mm_free(pop->event_set_copy);
memset(pop, 0, sizeof(struct pollop));
mm_free(pop);
#include "event-internal.h"
#include "evsignal-internal.h"
+#include "event2/thread.h"
+#include "evthread-internal.h"
#include "log-internal.h"
#include "evmap-internal.h"
struct selectop {
int event_fds; /* Highest fd in fd set */
int event_fdsz;
+ int resize_out_sets;
fd_set *event_readset_in;
fd_set *event_writeset_in;
fd_set *event_readset_out;
static int
select_dispatch(struct event_base *base, struct timeval *tv)
{
- int res, i, j;
+ int res=0, i, j, nfds;
struct selectop *sop = base->evbase;
check_selectop(sop);
+ if (sop->resize_out_sets) {
+ fd_set *readset_out=NULL, *writeset_out=NULL;
+ size_t sz = sop->event_fdsz;
+ if (!(readset_out = mm_realloc(sop->event_readset_out, sz)))
+ return (-1);
+ if (!(writeset_out = mm_realloc(sop->event_writeset_out, sz))) {
+ mm_free(readset_out);
+ return (-1);
+ }
+ sop->event_readset_out = readset_out;
+ sop->event_writeset_out = writeset_out;
+ sop->resize_out_sets = 0;
+ }
memcpy(sop->event_readset_out, sop->event_readset_in,
sop->event_fdsz);
memcpy(sop->event_writeset_out, sop->event_writeset_in,
sop->event_fdsz);
- res = select(sop->event_fds + 1, sop->event_readset_out,
+ nfds = sop->event_fds+1;
+
+ EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
+ res = select(nfds, sop->event_readset_out,
sop->event_writeset_out, NULL, tv);
+ EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
check_selectop(sop);
if (res == -1) {
event_debug(("%s: select reports %d", __func__, res));
check_selectop(sop);
- i = random() % (sop->event_fds+1);
- for (j = 0; j <= sop->event_fds; ++j) {
- if (++i >= sop->event_fds+1)
+ i = random() % (nfds+1);
+ for (j = 0; j <= nfds; ++j) {
+ if (++i >= nfds+1)
i = 0;
res = 0;
if (FD_ISSET(i, sop->event_readset_out))