]> granicus.if.org Git - libevent/commitdiff
Locking support for bufferevents.
authorNick Mathewson <nickm@torproject.org>
Mon, 13 Apr 2009 03:17:19 +0000 (03:17 +0000)
committerNick Mathewson <nickm@torproject.org>
Mon, 13 Apr 2009 03:17:19 +0000 (03:17 +0000)
svn:r1170

bufferevent-internal.h
bufferevent.c
bufferevent_filter.c
bufferevent_pair.c
bufferevent_sock.c
include/event2/bufferevent.h

index f369659ffa606fcd0f734f2239f1deca59f0bf85..b1026f681370a679522f896cf3885ecf7bb5f423 100644 (file)
@@ -33,6 +33,8 @@ extern "C" {
 #include "event-config.h"
 #include "evutil.h"
 #include "defer-internal.h"
+#include "evthread-internal.h"
+#include "event2/thread.h"
 
 struct bufferevent_private {
        struct bufferevent bev;
@@ -42,6 +44,7 @@ struct bufferevent_private {
 
        /** If set, read is suspended until evbuffer some. */
        unsigned read_suspended : 1;
+       unsigned own_lock : 1;
 
        enum bufferevent_options options;
 
@@ -106,6 +109,22 @@ void bufferevent_wm_suspend_read(struct bufferevent *bufev);
  * read buffer is too full. */
 void bufferevent_wm_unsuspend_read(struct bufferevent *bufev);
 
+int bufferevent_enable_locking(struct bufferevent *bufev, void *lock);
+
+#define BEV_UPCAST(b) EVUTIL_UPCAST((b), struct bufferevent_private, bev)
+
+#define BEV_LOCK(b) do {                                               \
+               struct bufferevent_private *locking =  BEV_UPCAST(b);   \
+               if (locking->lock)                                      \
+                       EVLOCK_LOCK(locking->lock, EVTHREAD_WRITE);     \
+       } while(0)
+
+#define BEV_UNLOCK(b) do {                                             \
+               struct bufferevent_private *locking =  BEV_UPCAST(b);   \
+               if (locking->lock)                                      \
+                       EVLOCK_UNLOCK(locking->lock, EVTHREAD_WRITE);   \
+       } while(0)
+
 #ifdef __cplusplus
 }
 #endif
index 69b8670243b6c6326664a5e6368f4133975edc32..f92a52bcd92f67d935b4cae75313913fadfb0b36 100644 (file)
 #include "bufferevent-internal.h"
 #include "util-internal.h"
 
-
 void
 bufferevent_wm_suspend_read(struct bufferevent *bufev)
 {
        struct bufferevent_private *bufev_private =
            EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
+       BEV_LOCK(bufev);
        if (!bufev_private->read_suspended) {
                bufev->be_ops->disable(bufev, EV_READ);
                bufev_private->read_suspended = 1;
        }
+       BEV_LOCK(bufev);
 }
 
 void
@@ -76,11 +77,14 @@ bufferevent_wm_unsuspend_read(struct bufferevent *bufev)
 {
        struct bufferevent_private *bufev_private =
            EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
+
+       BEV_LOCK(bufev);
        if (bufev_private->read_suspended) {
                bufev_private->read_suspended = 0;
                if (bufev->enabled & EV_READ)
                        bufev->be_ops->enable(bufev, EV_READ);
        }
+       BEV_LOCK(bufev);
 }
 
 /* Callback to implement watermarks on the input buffer.  Only enabled
@@ -91,7 +95,9 @@ bufferevent_inbuf_wm_cb(struct evbuffer *buf,
     void *arg)
 {
        struct bufferevent *bufev = arg;
-        size_t size = evbuffer_get_length(buf);
+        size_t size;
+
+       size = evbuffer_get_length(buf);
 
        if (cbinfo->n_added > cbinfo->n_deleted) {
                /* Data got added.  If it put us over the watermark, stop
@@ -137,6 +143,15 @@ bufferevent_init_common(struct bufferevent_private *bufev_private,
         */
        bufev->enabled = EV_WRITE;
 
+#ifndef _EVENT_DISABLE_THREAD_SUPPORT
+       if (options & BEV_OPT_THREADSAFE) {
+               if (bufferevent_enable_locking(bufev, NULL) < 0) {
+                       /* cleanup */
+                       return -1;
+               }
+       }
+#endif
+
        bufev_private->options = options;
 
        return 0;
@@ -146,11 +161,14 @@ void
 bufferevent_setcb(struct bufferevent *bufev,
     evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg)
 {
+       BEV_LOCK(bufev);
+
        bufev->readcb = readcb;
        bufev->writecb = writecb;
        bufev->errorcb = errorcb;
 
        bufev->cbarg = cbarg;
+       BEV_UNLOCK(bufev);
 }
 
 struct evbuffer *
@@ -206,15 +224,19 @@ bufferevent_enable(struct bufferevent *bufev, short event)
        struct bufferevent_private *bufev_private =
            EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
        short impl_events = event;
+       int r = 0;
+
+       BEV_LOCK(bufev);
        if (bufev_private->read_suspended)
                impl_events &= ~EV_READ;
 
        bufev->enabled |= event;
 
        if (bufev->be_ops->enable(bufev, impl_events) < 0)
-               return -1;
+               r = -1;
 
-       return (0);
+       BEV_UNLOCK(bufev);
+       return r;
 }
 
 void
@@ -222,6 +244,7 @@ bufferevent_set_timeouts(struct bufferevent *bufev,
                         const struct timeval *tv_read,
                         const struct timeval *tv_write)
 {
+       BEV_LOCK(bufev);
        if (tv_read) {
                bufev->timeout_read = *tv_read;
        } else {
@@ -235,6 +258,7 @@ bufferevent_set_timeouts(struct bufferevent *bufev,
 
        if (bufev->be_ops->adj_timeouts)
                bufev->be_ops->adj_timeouts(bufev);
+       BEV_UNLOCK(bufev);
 }
 
 
@@ -265,12 +289,16 @@ bufferevent_settimeout(struct bufferevent *bufev,
 int
 bufferevent_disable(struct bufferevent *bufev, short event)
 {
+       int r = 0;
+
+       BEV_LOCK(bufev);
        bufev->enabled &= ~event;
 
        if (bufev->be_ops->disable(bufev, event) < 0)
-               return (-1);
+               r = -1;
 
-       return (0);
+       BEV_UNLOCK(bufev);
+       return r;
 }
 
 /*
@@ -284,6 +312,7 @@ bufferevent_setwatermark(struct bufferevent *bufev, short events,
        struct bufferevent_private *bufev_private =
            EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
 
+       BEV_LOCK(bufev);
        if (events & EV_WRITE) {
                bufev->wm_write.low = lowmark;
                bufev->wm_write.high = highmark;
@@ -321,6 +350,7 @@ bufferevent_setwatermark(struct bufferevent *bufev, short events,
                        bufferevent_wm_unsuspend_read(bufev);
                }
        }
+       BEV_UNLOCK(bufev);
 }
 
 int
@@ -328,10 +358,12 @@ bufferevent_flush(struct bufferevent *bufev,
     short iotype,
     enum bufferevent_flush_mode mode)
 {
+       int r = -1;
+       BEV_LOCK(bufev);
         if (bufev->be_ops->flush)
-                return bufev->be_ops->flush(bufev, iotype, mode);
-        else
-                return -1;
+                r = bufev->be_ops->flush(bufev, iotype, mode);
+       BEV_UNLOCK(bufev);
+       return r;
 }
 
 void
@@ -347,5 +379,32 @@ bufferevent_free(struct bufferevent *bufev)
 
        /* Free the actual allocated memory. */
        mm_free(bufev - bufev->be_ops->mem_offset);
+       /* Free lock XXX */
+}
+
+int
+bufferevent_enable_locking(struct bufferevent *bufev, void *lock)
+{
+#ifdef _EVENT_DISABLE_THREAD_SUPPORT
+       return -1;
+#else
+       if (BEV_UPCAST(bufev)->lock)
+               return -1;
+
+       if (!lock) {
+               EVTHREAD_ALLOC_LOCK(lock);
+               if (!lock)
+                       return -1;
+               BEV_UPCAST(bufev)->lock = lock;
+               BEV_UPCAST(bufev)->own_lock = 1;
+       } else {
+               BEV_UPCAST(bufev)->lock = lock;
+               BEV_UPCAST(bufev)->own_lock = 0;
+       }
+       evbuffer_enable_locking(bufev->input, lock);
+       evbuffer_enable_locking(bufev->output, lock);
+
+       return 0;
+#endif
 }
 
index 54fd23c828b732fbfb37399a4a0ededd155bfc99..b427cfb207456e1d9b5d30f69417e23e67942534 100644 (file)
@@ -170,6 +170,7 @@ bufferevent_filter_new(struct bufferevent *underlying,
                       void *ctx)
 {
        struct bufferevent_filtered *bufev_f;
+       enum bufferevent_options tmp_options = options & ~BEV_OPT_THREADSAFE;
 
        if (!input_filter)
                input_filter = be_null_filter;
@@ -181,10 +182,18 @@ bufferevent_filter_new(struct bufferevent *underlying,
                return NULL;
 
        if (bufferevent_init_common(&bufev_f->bev, underlying->ev_base,
-                                   &bufferevent_ops_filter, options) < 0) {
+                                   &bufferevent_ops_filter, tmp_options) < 0) {
                mm_free(bufev_f);
                return NULL;
        }
+       if (options & BEV_OPT_THREADSAFE) {
+               void *lock = BEV_UPCAST(underlying)->lock;
+               if (!lock) {
+                       bufferevent_enable_locking(underlying, NULL);
+                       lock = BEV_UPCAST(underlying)->lock;
+               }
+               bufferevent_enable_locking(downcast(bufev_f), lock);
+       }
 
        bufev_f->underlying = underlying;
        bufev_f->process_in = input_filter;
index 6fed0e0331ad5b6b59e85db1ebd7f1209a7488b6..6080421c0cee7ba233a0c4df0b851637faff660b 100644 (file)
@@ -75,6 +75,7 @@ run_callback(struct deferred_cb *cb, void *arg)
        struct bufferevent_pair *bufev = arg;
        struct bufferevent *bev = downcast(bufev);
 
+       BEV_LOCK(bev);
        if (cb == &bufev->deferred_read_cb) {
                if (bev->readcb) {
                        bev->readcb(bev, bev->cbarg);
@@ -84,6 +85,7 @@ run_callback(struct deferred_cb *cb, void *arg)
                        bev->writecb(bev, bev->cbarg);
                }
        }
+       BEV_UNLOCK(bev);
 }
 
 static struct bufferevent_pair *
@@ -115,16 +117,22 @@ bufferevent_pair_new(struct event_base *base, enum bufferevent_options options,
     struct bufferevent *pair[2])
 {
         struct bufferevent_pair *bufev1 = NULL, *bufev2 = NULL;
+       enum bufferevent_options tmp_options = options & ~BEV_OPT_THREADSAFE;
 
        bufev1 = bufferevent_pair_elt_new(base, options);
        if (!bufev1)
                return -1;
-       bufev2 = bufferevent_pair_elt_new(base, options);
+       bufev2 = bufferevent_pair_elt_new(base, tmp_options);
        if (!bufev2) {
                bufferevent_free(downcast(bufev1));
                return -1;
        }
 
+       if (options & BEV_OPT_THREADSAFE) {
+               /*XXXX check return */
+               bufferevent_enable_locking(downcast(bufev2), bufev1->bev.lock);
+       }
+
        bufev1->partner = bufev2;
        bufev2->partner = bufev1;
 
index 0e5e3a7b779269d5094182a153dbd0add582b88b..5d6b92be27fd90f24d0f666e298f76077a0f9967 100644 (file)
@@ -342,6 +342,7 @@ be_socket_flush(struct bufferevent *bev, short iotype,
 void
 bufferevent_setfd(struct bufferevent *bufev, evutil_socket_t fd)
 {
+       BEV_LOCK(bufev);
        assert(bufev->be_ops == &bufferevent_ops_socket);
 
        event_del(&bufev->ev_read);
@@ -351,37 +352,48 @@ bufferevent_setfd(struct bufferevent *bufev, evutil_socket_t fd)
            EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
        event_assign(&bufev->ev_write, bufev->ev_base, fd,
            EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);
+       BEV_UNLOCK(bufev);
 }
 
 /* XXXX Should non-socket buffferevents support this? */
 int
 bufferevent_priority_set(struct bufferevent *bufev, int priority)
 {
+       int r = -1;
+
+       BEV_LOCK(bufev);
        if (bufev->be_ops != &bufferevent_ops_socket)
-               return -1;
+               goto done;
 
        if (event_priority_set(&bufev->ev_read, priority) == -1)
-               return (-1);
+               goto done;
        if (event_priority_set(&bufev->ev_write, priority) == -1)
-               return (-1);
+               goto done;
 
-       return (0);
+       r = 0;
+done:
+       BEV_UNLOCK(bufev);
+       return r;
 }
 
 /* XXXX Should non-socket buffferevents support this? */
 int
 bufferevent_base_set(struct event_base *base, struct bufferevent *bufev)
 {
-       int res;
+       int res = -1;
+
+       BEV_LOCK(bufev);
        if (bufev->be_ops != &bufferevent_ops_socket)
-               return -1;
+               goto done;
 
        bufev->ev_base = base;
 
        res = event_base_set(base, &bufev->ev_read);
        if (res == -1)
-               return (res);
+               goto done;
 
        res = event_base_set(base, &bufev->ev_write);
-       return (res);
+done:
+       BEV_UNLOCK(bufev);
+       return res;
 }
index bbb70ad86d76e9e4b795540107c3fea578cce397..41f976b68dff6e3d9d3f5a7dbeb920c8dd3cdab1 100644 (file)
@@ -118,6 +118,7 @@ typedef void (*everrorcb)(struct bufferevent *bev, short what, void *ctx);
 
 enum bufferevent_options {
        BEV_OPT_CLOSE_ON_FREE = (1<<0),
+       BEV_OPT_THREADSAFE = (1<<1),
 };
 
 /**