]> granicus.if.org Git - libevent/commitdiff
Add locking to event_base_loop.
authorNick Mathewson <nickm@torproject.org>
Wed, 21 Oct 2009 03:54:00 +0000 (03:54 +0000)
committerNick Mathewson <nickm@torproject.org>
Wed, 21 Oct 2009 03:54:00 +0000 (03:54 +0000)
This is harder than it sounds, since we need to make sure to
release the lock around the key call to the kernel (e.g.,
select, epoll_wait, kevent), AND we need to make sure that
none of the fields that are used in that call are touched by
anything that might be running concurrently in another
thread.  I managed to do this pretty well for everything but
poll().  With poll, I needed to introduce a copy of the
event_set structure.

This patch also fixes a bug in win32.c where we called
realloc() instead of mm_realloc().

svn:r1450

ChangeLog
WIN32-Code/win32.c
devpoll.c
epoll.c
event.c
evport.c
kqueue.c
poll.c
select.c

index b844de5c0851862e3908d19d7fd14834d2af6335..f896936a888872fc57a7dce625b65d82f18d3ede 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -28,6 +28,8 @@ Changes in 2.0.3-alpha:
  o Fix some bugs when using the old evdns interfaces to initialize the evdns module.
  o Detect errors during bufferevent_connect().  Patch from Christopher Davis.
  o Fix compilation for listener.h for C++ - missing extern "C".  Patch from Ferenc Szalai.
+ o Make the event_base_loop() family of functions respect thread-safety better.  This should clear up a few hard-to-debug race conditions.
+ o Fix a bug when using a specialized memory allocator on win32.
 
 
 Changes in 2.0.2-alpha:
index 470558858fd920820d41bdca077164d3a9657c1c..cb47ced4bde8351d4a934f20ec7aa0f84bf1f373 100644 (file)
@@ -65,6 +65,7 @@ struct idx_info {
 
 struct win32op {
        int fd_setsz;
+       int resize_out_sets;
        struct win_fd_set *readset_in;
        struct win_fd_set *writeset_in;
        struct win_fd_set *readset_out;
@@ -103,16 +104,11 @@ realloc_fd_sets(struct win32op *op, size_t new_size)
        assert(new_size >= 1);
 
        size = FD_SET_ALLOC_SIZE(new_size);
-       if (!(op->readset_in = realloc(op->readset_in, size)))
+       if (!(op->readset_in = mm_realloc(op->readset_in, size)))
                return (-1);
-       if (!(op->writeset_in = realloc(op->writeset_in, size)))
-               return (-1);
-       if (!(op->readset_out = realloc(op->readset_out, size)))
-               return (-1);
-       if (!(op->exset_out = realloc(op->exset_out, size)))
-               return (-1);
-       if (!(op->writeset_out = realloc(op->writeset_out, size)))
+       if (!(op->writeset_in = mm_realloc(op->writeset_in, size)))
                return (-1);
+       op->resize_out_sets = 1;
        op->fd_setsz = new_size;
        return (0);
 }
@@ -286,6 +282,16 @@ win32_dispatch(struct event_base *base, struct timeval *tv)
        int fd_count;
        SOCKET s;
 
+       if (op->resize_out_sets) {
+               if (!(op->readset_out = mm_realloc(op->readset_out, size)))
+                       return (-1);
+               if (!(op->exset_out = mm_realloc(op->exset_out, size)))
+                       return (-1);
+               if (!(op->writeset_out = mm_realloc(op->writeset_out, size)))
+                       return (-1);
+               op->resize_out_sets = 0;
+       }
+
        fd_set_copy(win32op->readset_out, win32op->readset_in);
        fd_set_copy(win32op->exset_out, win32op->readset_in);
        fd_set_copy(win32op->writeset_out, win32op->writeset_in);
@@ -301,11 +307,15 @@ win32_dispatch(struct event_base *base, struct timeval *tv)
                return (0);
        }
 
+       EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
        res = select(fd_count,
                     (struct fd_set*)win32op->readset_out,
                     (struct fd_set*)win32op->writeset_out,
                     (struct fd_set*)win32op->exset_out, tv);
 
+       EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
        event_debug(("%s: select returned %d", __func__, res));
 
        if(res <= 0) {
index eaaac4ac5e291968518143cf4e4513f454149240..cf8e5c6dcf9e702e8af46df830116b500c603f9d 100644 (file)
--- a/devpoll.c
+++ b/devpoll.c
@@ -140,6 +140,8 @@ devpoll_init(struct event_base *base)
        devpollop->dpfd = dpfd;
 
        /* Initialize fields */
+       /* FIXME: allocating 'nfiles' worth of space here can be
+        * expensive and unnecessary.  See how epoll.c does it instead. */
        devpollop->events = mm_calloc(nfiles, sizeof(struct pollfd));
        if (devpollop->events == NULL) {
                mm_free(devpollop);
@@ -179,8 +181,12 @@ devpoll_dispatch(struct event_base *base, struct timeval *tv)
        dvp.dp_nfds = devpollop->nevents;
        dvp.dp_timeout = timeout;
 
+       EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
        res = ioctl(devpollop->dpfd, DP_POLL, &dvp);
 
+       EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
        if (res == -1) {
                if (errno != EINTR) {
                        event_warn("ioctl: DP_POLL");
diff --git a/epoll.c b/epoll.c
index b6fa28ede935f4cb960cb929f30925141d255d4d..a01e501fa77fd1f0c96b5765459ebb94f24c99a0 100644 (file)
--- a/epoll.c
+++ b/epoll.c
@@ -51,6 +51,8 @@
 
 #include "event-internal.h"
 #include "evsignal-internal.h"
+#include "event2/thread.h"
+#include "evthread-internal.h"
 #include "log-internal.h"
 #include "evmap-internal.h"
 
@@ -148,8 +150,12 @@ epoll_dispatch(struct event_base *base, struct timeval *tv)
                timeout = MAX_EPOLL_TIMEOUT_MSEC;
        }
 
+       EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
        res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout);
 
+       EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
        if (res == -1) {
                if (errno != EINTR) {
                        event_warn("epoll_wait");
diff --git a/event.c b/event.c
index 5f1cd909509bf221c3a81bf8bb41f220bb03ac27..3c907139a73437876e32767f405026d85162ec3a 100644 (file)
--- a/event.c
+++ b/event.c
@@ -615,6 +615,7 @@ event_base_priority_init(struct event_base *base, int npriorities)
 static int
 event_haveevents(struct event_base *base)
 {
+       /* Caller must hold th_base_lock */
        return (base->event_count > 0);
 }
 
@@ -737,17 +738,16 @@ event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr)
 static void
 event_process_active(struct event_base *base)
 {
+       /* Caller must hold th_base_lock */
        struct event_list *activeq = NULL;
        int i, c;
 
-       EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
-
        for (i = 0; i < base->nactivequeues; ++i) {
                if (TAILQ_FIRST(base->activequeues[i]) != NULL) {
                        activeq = base->activequeues[i];
                        c = event_process_active_single_queue(base, activeq);
                        if (c < 0)
-                               goto unlock;
+                               return;
                        else if (c > 0)
                                break; /* Processed a real event; do not
                                        * consider lower-priority events */
@@ -757,9 +757,6 @@ event_process_active(struct event_base *base)
        }
 
        event_process_deferred_callbacks(&base->defer_queue,&base->event_break);
-
-unlock:
-       EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
 }
 
 /*
@@ -866,6 +863,10 @@ event_base_loop(struct event_base *base, int flags)
        struct timeval *tv_p;
        int res, done;
 
+       /* Grab the lock.  We will release it inside evsel.dispatch, and again
+        * as we invoke user callbacks. */
+       EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
        /* clear time cache */
        base->tv_cache.tv_sec = 0;
 
@@ -933,6 +934,8 @@ event_base_loop(struct event_base *base, int flags)
        /* clear time cache */
        base->tv_cache.tv_sec = 0;
 
+       EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
        event_debug(("%s: asked to terminate loop.", __func__));
        return (0);
 }
@@ -1496,12 +1499,12 @@ event_deferred_cb_schedule(struct deferred_cb_queue *queue,
 static int
 timeout_next(struct event_base *base, struct timeval **tv_p)
 {
+       /* Caller must hold th_base_lock */
        struct timeval now;
        struct event *ev;
        struct timeval *tv = *tv_p;
        int res = 0;
 
-       EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
        ev = min_heap_top(&base->timeheap);
 
        if (ev == NULL) {
@@ -1527,7 +1530,6 @@ timeout_next(struct event_base *base, struct timeval **tv_p)
        event_debug(("timeout_next: in %d seconds", (int)tv->tv_sec));
 
 out:
-       EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
        return (res);
 }
 
@@ -1540,6 +1542,7 @@ out:
 static void
 timeout_correct(struct event_base *base, struct timeval *tv)
 {
+       /* Caller must hold th_base_lock. */
        struct event **pev;
        unsigned int size;
        struct timeval off;
@@ -1549,11 +1552,9 @@ timeout_correct(struct event_base *base, struct timeval *tv)
 
        /* Check if time is running backwards */
        gettime(base, tv);
-       EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
 
        if (evutil_timercmp(tv, &base->event_tv, >=)) {
                base->event_tv = *tv;
-               EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
                return;
        }
 
@@ -1573,16 +1574,15 @@ timeout_correct(struct event_base *base, struct timeval *tv)
        }
        /* Now remember what the new time turned out to be. */
        base->event_tv = *tv;
-       EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
 }
 
 static void
 timeout_process(struct event_base *base)
 {
+       /* Caller must hold lock. */
        struct timeval now;
        struct event *ev;
 
-       EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
        if (min_heap_empty(&base->timeheap)) {
                EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
                return;
@@ -1601,7 +1601,6 @@ timeout_process(struct event_base *base)
                         ev->ev_callback));
                event_active_internal(ev, EV_TIMEOUT, 1);
        }
-       EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
 }
 
 static void
index 55bbe800ee9b398d432d4669a2c5680915d0f36c..32e373c0c884a65c03b5f0c8d44cd962dc7a9701 100644 (file)
--- a/evport.c
+++ b/evport.c
@@ -303,8 +303,14 @@ evport_dispatch(struct event_base *base, struct timeval *tv)
                }
        }
 
-       if ((res = port_getn(epdp->ed_port, pevtlist, EVENTS_PER_GETN,
-                   (unsigned int *) &nevents, ts_p)) == -1) {
+       EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
+       res = port_getn(epdp->ed_port, pevtlist, EVENTS_PER_GETN,
+           (unsigned int *) &nevents, ts_p);
+
+       EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
+       if (res == -1) {
                if (errno == EINTR || errno == EAGAIN) {
                        evsig_process(base);
                        return (0);
index be10dd6997c6c0ed117358703dcd5235ed718aca..aa9fc1c2b1bf05307ff36ab8328b8279482ee65b 100644 (file)
--- a/kqueue.c
+++ b/kqueue.c
 struct kqop {
        struct kevent *changes;
        int nchanges;
+       int changes_size;
+       struct kevent *pend_changes;
+       int n_pend_changes;
+       int pend_changes_size;
+
        struct kevent *events;
-       int nevents;
+       int events_size;
        int kq;
        pid_t pid;
 };
@@ -133,13 +138,21 @@ kq_init(struct event_base *base)
                mm_free (kqueueop);
                return (NULL);
        }
+       kqueueop->pend_changes = mm_malloc(NEVENT * sizeof(struct kevent));
+       if (kqueueop->pendchanges == NULL) {
+               mm_free (kqueueop->changes);
+               mm_free (kqueueop);
+               return (NULL);
+       }
        kqueueop->events = mm_malloc(NEVENT * sizeof(struct kevent));
        if (kqueueop->events == NULL) {
                mm_free (kqueueop->changes);
+               mm_free (kqueueop->pend_changes);
                mm_free (kqueueop);
                return (NULL);
        }
-       kqueueop->nevents = NEVENT;
+       kqueueop->events_size = kqueueop->changes_size =
+           kqueueop->pend_changes_size = NEVENT;
 
        /* Check for Mac OS X kqueue bug. */
        kqueueop->changes[0].ident = -1;
@@ -171,36 +184,21 @@ kq_init(struct event_base *base)
 static int
 kq_insert(struct kqop *kqop, struct kevent *kev)
 {
-       int nevents = kqop->nevents;
+       int size = kqop->changes_size;
 
-       if (kqop->nchanges == nevents) {
+       if (kqop->nchanges == size) {
                struct kevent *newchange;
-               struct kevent *newresult;
 
-               nevents *= 2;
+               size *= 2;
 
                newchange = mm_realloc(kqop->changes,
-                                   nevents * sizeof(struct kevent));
+                                   size * sizeof(struct kevent));
                if (newchange == NULL) {
                        event_warn("%s: malloc", __func__);
                        return (-1);
                }
                kqop->changes = newchange;
-
-               newresult = mm_realloc(kqop->events,
-                                   nevents * sizeof(struct kevent));
-
-               /*
-                * If we fail, we don't have to worry about freeing,
-                * the next realloc will pick it up.
-                */
-               if (newresult == NULL) {
-                       event_warn("%s: malloc", __func__);
-                       return (-1);
-               }
-               kqop->events = newresult;
-
-               kqop->nevents = nevents;
+               kqop->changes_size = size;
        }
 
        memcpy(&kqop->changes[kqop->nchanges++], kev, sizeof(struct kevent));
@@ -219,11 +217,17 @@ kq_sighandler(int sig)
        /* Do nothing here */
 }
 
+#define SWAP(tp,a,b)                           \
+       do {                                    \
+               tp tmp_swap_var = (a);          \
+               a = b;                          \
+               b = tmp_swap_var;               \
+       } while (0);
+
 static int
 kq_dispatch(struct event_base *base, struct timeval *tv)
 {
        struct kqop *kqop = base->evbase;
-       struct kevent *changes = kqop->changes;
        struct kevent *events = kqop->events;
        struct timespec ts, *ts_p = NULL;
        int i, res;
@@ -233,9 +237,23 @@ kq_dispatch(struct event_base *base, struct timeval *tv)
                ts_p = &ts;
        }
 
-       res = kevent(kqop->kq, changes, kqop->nchanges,
-           events, kqop->nevents, ts_p);
-       kqop->nchanges = 0;
+       /* We can't hold the lock while we're calling kqueue, so another
+        * thread might potentially mess with changes before the kernel has a
+        * chance to read it.  Therefore, we need to keep the change list
+        * we're looking at in pend_changes, and let other threads mess with
+        * changes. */
+       SWAP(struct kevent *, kqop->changes, kqop->pend_changes);
+       SWAP(int, kqop->nchanges, kqop->npend_changes);
+       SWAP(int, kqop->changes_size, kqop->pend_changes_size);
+
+       EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
+       res = kevent(kqop->kq, kqop->pend_changes, kqop->npend_changes,
+           events, kqop->events_size, ts_p);
+
+       EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
+       kqop->npend_changes = 0;
        if (res == -1) {
                if (errno != EINTR) {
                         event_warn("kevent");
@@ -289,6 +307,20 @@ kq_dispatch(struct event_base *base, struct timeval *tv)
                }
        }
 
+       if (res == kqop->nevents) {
+               struct kevent *newresult;
+               int size = kqop->events_size;
+               /* We used all the events space that we have. Maybe we should
+                  make it bigger. */
+               size *= 2;
+               newresult = mm_realloc(kqop->events,
+                   size * sizeof(struct kevent));
+               if (newresult) {
+                       kqop->events = newresult;
+                       kqop->events_size = size;
+               }
+       }
+
        return (0);
 }
 
diff --git a/poll.c b/poll.c
index 08e148cb277ca0e97bdde97f0199b96d7350ef42..e7b9941e82f3f58eacde4294d8b89996bc067ce6 100644 (file)
--- a/poll.c
+++ b/poll.c
@@ -50,6 +50,8 @@
 #include "evsignal-internal.h"
 #include "log-internal.h"
 #include "evmap-internal.h"
+#include "event2/thread.h"
+#include "evthread-internal.h"
 
 struct pollidx {
        int idxplus1;
@@ -57,8 +59,11 @@ struct pollidx {
 
 struct pollop {
        int event_count;                /* Highest number alloc */
-       int nfds;                       /* Size of event_* */
+       int nfds;                       /* Highest number used */
+       int realloc_copy;               /* True iff we must realloc
+                                        * event_set_copy */
        struct pollfd *event_set;
+       struct pollfd *event_set_copy;
 };
 
 static void *poll_init (struct event_base *);
@@ -119,14 +124,43 @@ poll_dispatch(struct event_base *base, struct timeval *tv)
 {
        int res, i, j, msec = -1, nfds;
        struct pollop *pop = base->evbase;
+       struct pollfd *event_set;
 
        poll_check_ok(pop);
 
+       nfds = pop->nfds;
+
+       if (base->th_base_lock) {
+               /* If we're using this backend in a multithreaded setting,
+                * then we need to work on a copy of event_set, so that we can
+                * let other threads modify the main event_set while we're
+                * polling. If we're not multithreaded, then we'll skip the
+                * copy step here to save memory and time. */
+               if (pop->realloc_copy) {
+                       struct pollfd *tmp = mm_realloc(pop->event_set_copy,
+                           pop->event_count * sizeof(struct pollfd));
+                       if (tmp == NULL) {
+                               event_warn("realloc");
+                               return -1;
+                       }
+                       pop->event_set_copy = tmp;
+                       pop->realloc_copy = 0;
+               }
+               memcpy(pop->event_set_copy, pop->event_set,
+                   sizeof(struct pollfd)*nfds);
+               event_set = pop->event_set_copy;
+       } else {
+               event_set = pop->event_set;
+       }
+
        if (tv != NULL)
                msec = tv->tv_sec * 1000 + (tv->tv_usec + 999) / 1000;
 
-       nfds = pop->nfds;
-       res = poll(pop->event_set, nfds, msec);
+       EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
+       res = poll(event_set, nfds, msec);
+
+       EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
 
        if (res == -1) {
                if (errno != EINTR) {
@@ -150,7 +184,7 @@ poll_dispatch(struct event_base *base, struct timeval *tv)
                int what;
                if (++i == nfds)
                        i = 0;
-               what = pop->event_set[i].revents;
+               what = event_set[i].revents;
                if (!what)
                        continue;
 
@@ -166,7 +200,7 @@ poll_dispatch(struct event_base *base, struct timeval *tv)
                if (res == 0)
                        continue;
 
-               evmap_io_active(base, pop->event_set[i].fd, res);
+               evmap_io_active(base, event_set[i].fd, res);
        }
 
        return (0);
@@ -204,6 +238,7 @@ poll_add(struct event_base *base, int fd, short old, short events, void *_idx)
                pop->event_set = tmp_event_set;
 
                pop->event_count = tmp_event_count;
+               pop->realloc_copy = 1;
        }
 
        i = idx->idxplus1 - 1;
@@ -289,6 +324,8 @@ poll_dealloc(struct event_base *base)
        evsig_dealloc(base);
        if (pop->event_set)
                mm_free(pop->event_set);
+       if (pop->event_set_copy)
+               mm_free(pop->event_set_copy);
 
        memset(pop, 0, sizeof(struct pollop));
        mm_free(pop);
index ace8037c80dc152e73c6eaa841a58da34f00701f..9164ac91eda2c7b3085ee880f23900f0d9e2b181 100644 (file)
--- a/select.c
+++ b/select.c
@@ -50,6 +50,8 @@
 
 #include "event-internal.h"
 #include "evsignal-internal.h"
+#include "event2/thread.h"
+#include "evthread-internal.h"
 #include "log-internal.h"
 #include "evmap-internal.h"
 
@@ -67,6 +69,7 @@ typedef unsigned long fd_mask;
 struct selectop {
        int event_fds;          /* Highest fd in fd set */
        int event_fdsz;
+       int resize_out_sets;
        fd_set *event_readset_in;
        fd_set *event_writeset_in;
        fd_set *event_readset_out;
@@ -121,19 +124,38 @@ check_selectop(struct selectop *sop)
 static int
 select_dispatch(struct event_base *base, struct timeval *tv)
 {
-       int res, i, j;
+       int res=0, i, j, nfds;
        struct selectop *sop = base->evbase;
 
        check_selectop(sop);
+       if (sop->resize_out_sets) {
+               fd_set *readset_out=NULL, *writeset_out=NULL;
+               size_t sz = sop->event_fdsz;
+               if (!(readset_out = mm_realloc(sop->event_readset_out, sz)))
+                       return (-1);
+               if (!(writeset_out = mm_realloc(sop->event_writeset_out, sz))) {
+                       mm_free(readset_out);
+                       return (-1);
+               }
+               sop->event_readset_out = readset_out;
+               sop->event_writeset_out = writeset_out;
+               sop->resize_out_sets = 0;
+       }
 
        memcpy(sop->event_readset_out, sop->event_readset_in,
               sop->event_fdsz);
        memcpy(sop->event_writeset_out, sop->event_writeset_in,
               sop->event_fdsz);
 
-       res = select(sop->event_fds + 1, sop->event_readset_out,
+       nfds = sop->event_fds+1;
+
+       EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
+       res = select(nfds, sop->event_readset_out,
            sop->event_writeset_out, NULL, tv);
 
+       EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
        check_selectop(sop);
 
        if (res == -1) {
@@ -151,9 +173,9 @@ select_dispatch(struct event_base *base, struct timeval *tv)
        event_debug(("%s: select reports %d", __func__, res));
 
        check_selectop(sop);
-       i = random() % (sop->event_fds+1);
-       for (j = 0; j <= sop->event_fds; ++j) {
-               if (++i >= sop->event_fds+1)
+       i = random() % (nfds+1);
+       for (j = 0; j <= nfds; ++j) {
+               if (++i >= nfds+1)
                        i = 0;
                res = 0;
                if (FD_ISSET(i, sop->event_readset_out))