]> granicus.if.org Git - libevent/commitdiff
Add locking to evbuffers.
authorNick Mathewson <nickm@torproject.org>
Sun, 5 Apr 2009 02:44:17 +0000 (02:44 +0000)
committerNick Mathewson <nickm@torproject.org>
Sun, 5 Apr 2009 02:44:17 +0000 (02:44 +0000)
svn:r1134

buffer.c
evbuffer-internal.h
include/event2/buffer.h

index 41fc71d156a35b975f5efe734b86f75582e94678..f5f8a2c0a583939ce47c65a6cf8693594fdb650d 100644 (file)
--- a/buffer.c
+++ b/buffer.c
 #include "event2/event.h"
 #include "event2/buffer.h"
 #include "event2/buffer_compat.h"
+#include "event2/thread.h"
 #include "event-config.h"
 #include "log-internal.h"
 #include "mm-internal.h"
 #include "util-internal.h"
+#include "evthread-internal.h"
 #include "evbuffer-internal.h"
 
 /* some systems do not have MAP_FAILED */
@@ -199,6 +201,7 @@ evbuffer_chain_free(struct evbuffer_chain *chain)
 static inline void
 evbuffer_chain_insert(struct evbuffer *buf, struct evbuffer_chain *chain)
 {
+        ASSERT_EVBUFFER_LOCKED(buf);
        if (buf->first == NULL) {
                buf->first = buf->last = chain;
                buf->previous_to_last = NULL;
@@ -230,6 +233,30 @@ evbuffer_new(void)
        return (buffer);
 }
 
+int
+evbuffer_enable_locking(struct evbuffer *buf, void *lock)
+{
+#ifdef _EVENT_DISABLE_THREAD_SUPPORT
+        return -1;
+#else
+        if (buf->lock)
+                return -1;
+
+        if (!lock) {
+                EVTHREAD_ALLOC_LOCK(lock);
+                if (!lock)
+                        return -1;
+                buf->lock = lock;
+                buf->own_lock = 1;
+        } else {
+                buf->lock = lock;
+                buf->own_lock = 0;
+        }
+
+        return 0;
+#endif
+}
+
 static inline void
 evbuffer_invoke_callbacks(struct evbuffer *buffer)
 {
@@ -237,6 +264,8 @@ evbuffer_invoke_callbacks(struct evbuffer *buffer)
         struct evbuffer_cb_info info;
        size_t new_size;
 
+        ASSERT_EVBUFFER_LOCKED(buffer);
+
        if (TAILQ_EMPTY(&buffer->callbacks)) {
                 buffer->n_add_for_cb = buffer->n_del_for_cb = 0;
                return;
@@ -276,6 +305,7 @@ static void
 evbuffer_remove_all_callbacks(struct evbuffer *buffer)
 {
        struct evbuffer_cb_entry *cbent;
+
        while ((cbent = TAILQ_FIRST(&buffer->callbacks))) {
            TAILQ_REMOVE(&buffer->callbacks, cbent, next);
            mm_free(cbent);
@@ -286,57 +316,104 @@ void
 evbuffer_free(struct evbuffer *buffer)
 {
        struct evbuffer_chain *chain, *next;
+
+        ASSERT_EVBUFFER_UNLOCKED(buffer);
+
        for (chain = buffer->first; chain != NULL; chain = next) {
                next = chain->next;
                evbuffer_chain_free(chain);
        }
        evbuffer_remove_all_callbacks(buffer);
+        if (buffer->own_lock)
+                EVTHREAD_FREE_LOCK(buffer->lock);
        mm_free(buffer);
 }
 
+void
+evbuffer_lock(struct evbuffer *buf)
+{
+        EVBUFFER_LOCK(buf, EVTHREAD_WRITE);
+}
+
+void
+evbuffer_unlock(struct evbuffer *buf)
+{
+        EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE);
+}
+
 size_t
 evbuffer_get_length(const struct evbuffer *buffer)
 {
-       return (buffer->total_len);
+        size_t result;
+
+        EVBUFFER_LOCK(buffer, EVTHREAD_READ);
+
+       result = (buffer->total_len);
+
+        EVBUFFER_UNLOCK(buffer, EVTHREAD_READ);
+
+        return result;
 }
 
 size_t
 evbuffer_get_contiguous_space(const struct evbuffer *buf)
 {
-       struct evbuffer_chain *chain = buf->first;
+       struct evbuffer_chain *chain;
+        size_t result;
+
+        EVBUFFER_LOCK(buf, EVTHREAD_READ);
+        chain = buf->first;
+       result = (chain != NULL ? chain->off : 0);
+        EVBUFFER_UNLOCK(buf, EVTHREAD_READ);
 
-       return (chain != NULL ? chain->off : 0);
+        return result;
 }
 
 unsigned char *
 evbuffer_reserve_space(struct evbuffer *buf, size_t size)
 {
        struct evbuffer_chain *chain;
+        unsigned char *result = NULL;
+
+        EVBUFFER_LOCK(buf, EVTHREAD_WRITE);
 
        if (evbuffer_expand(buf, size) == -1)
-               return (NULL);
+                goto done;
 
        chain = buf->last;
 
-       return (chain->buffer + chain->misalign + chain->off);
+       result = (chain->buffer + chain->misalign + chain->off);
+
+done:
+        EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE);
+
+        return result;
 }
 
 int
 evbuffer_commit_space(struct evbuffer *buf, size_t size)
 {
-       struct evbuffer_chain *chain = buf->last;
+       struct evbuffer_chain *chain;
+        int result = -1;
+
+        EVBUFFER_LOCK(buf, EVTHREAD_WRITE);
+        chain = buf->last;
 
        if (chain == NULL ||
            chain->buffer_len - chain->off - chain->misalign < size)
-               return (-1);
+               goto done;
 
        chain->off += size;
        buf->total_len += size;
 
-       return (0);
+        result = 0;
+done:
+        EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE);
+       return result;
 }
 
 #define ZERO_CHAIN(dst) do { \
+                ASSERT_EVBUFFER_LOCKED(dst);    \
                (dst)->first = NULL;            \
                (dst)->last = NULL;             \
                (dst)->previous_to_last = NULL; \
@@ -344,6 +421,8 @@ evbuffer_commit_space(struct evbuffer *buf, size_t size)
        } while (0)
 
 #define COPY_CHAIN(dst, src) do { \
+                ASSERT_EVBUFFER_LOCKED(dst);                       \
+                ASSERT_EVBUFFER_LOCKED(src);                       \
                (dst)->first = (src)->first;                       \
                (dst)->previous_to_last = (src)->previous_to_last; \
                (dst)->last = (src)->last;                         \
@@ -351,6 +430,8 @@ evbuffer_commit_space(struct evbuffer *buf, size_t size)
        } while (0)
 
 #define APPEND_CHAIN(dst, src) do {                                    \
+                ASSERT_EVBUFFER_LOCKED(dst);                            \
+                ASSERT_EVBUFFER_LOCKED(src);                            \
                (dst)->last->next = (src)->first;                       \
                (dst)->previous_to_last = (src)->previous_to_last ?     \
                    (src)->previous_to_last : (dst)->last;              \
@@ -358,23 +439,29 @@ evbuffer_commit_space(struct evbuffer *buf, size_t size)
                (dst)->total_len += (src)->total_len;                   \
        } while (0)
 
-#define PREPEND_CHAIN(dst, src) do {                           \
-               (src)->last->next = (dst)->first;               \
-               (dst)->first = (src)->first;                    \
-               (dst)->total_len += (src)->total_len;           \
-               if ((dst)->previous_to_last == NULL)            \
-                       (dst)->previous_to_last = (src)->last;  \
+#define PREPEND_CHAIN(dst, src) do {                               \
+                ASSERT_EVBUFFER_LOCKED(dst);                       \
+                ASSERT_EVBUFFER_LOCKED(src);                       \
+               (src)->last->next = (dst)->first;                  \
+               (dst)->first = (src)->first;                       \
+               (dst)->total_len += (src)->total_len;              \
+               if ((dst)->previous_to_last == NULL)               \
+                       (dst)->previous_to_last = (src)->last;     \
        } while (0)
 
 
 int
 evbuffer_add_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf)
 {
-       size_t in_total_len = inbuf->total_len;
-       size_t out_total_len = outbuf->total_len;
+       size_t in_total_len, out_total_len;
+
+        EVBUFFER_LOCK2(inbuf, outbuf);
+
+        in_total_len = inbuf->total_len;
+       out_total_len = outbuf->total_len;
 
        if (in_total_len == 0 || outbuf == inbuf)
-               return (0);
+               goto done;
 
        if (out_total_len == 0) {
                COPY_CHAIN(outbuf, inbuf);
@@ -390,17 +477,23 @@ evbuffer_add_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf)
        evbuffer_invoke_callbacks(inbuf);
        evbuffer_invoke_callbacks(outbuf);
 
+done:
+        EVBUFFER_UNLOCK2(inbuf, outbuf);
        return (0);
 }
 
 void
 evbuffer_prepend_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf)
 {
-       size_t in_total_len = inbuf->total_len;
-       size_t out_total_len = outbuf->total_len;
+       size_t in_total_len, out_total_len;
+
+        EVBUFFER_LOCK2(inbuf, outbuf);
+
+        in_total_len = inbuf->total_len;
+       out_total_len = outbuf->total_len;
 
        if (!in_total_len || inbuf == outbuf)
-               return;
+               goto done;
 
        if (out_total_len == 0) {
                COPY_CHAIN(outbuf, inbuf);
@@ -415,16 +508,21 @@ evbuffer_prepend_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf)
 
        evbuffer_invoke_callbacks(inbuf);
        evbuffer_invoke_callbacks(outbuf);
+done:
+        EVBUFFER_UNLOCK2(inbuf, outbuf);
 }
 
 void
 evbuffer_drain(struct evbuffer *buf, size_t len)
 {
        struct evbuffer_chain *chain, *next;
-        size_t old_len = buf->total_len;
+        size_t old_len;
+
+        EVBUFFER_LOCK(buf, EVTHREAD_WRITE);
+        old_len = buf->total_len;
 
        if (old_len == 0)
-               return;
+               goto done;
 
        if (len >= old_len) {
                 len = old_len;
@@ -455,6 +553,9 @@ evbuffer_drain(struct evbuffer *buf, size_t len)
         buf->n_del_for_cb += len;
        /* Tell someone about changes in this buffer */
        evbuffer_invoke_callbacks(buf);
+
+done:
+        EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE);
 }
 
 /* Reads data from an event buffer and drains the bytes read */
@@ -463,15 +564,20 @@ int
 evbuffer_remove(struct evbuffer *buf, void *data_out, size_t datlen)
 {
         /*XXX fails badly on sendfile case. */
-       struct evbuffer_chain *chain = buf->first, *tmp;
+       struct evbuffer_chain *chain, *tmp;
        char *data = data_out;
        size_t nread;
+        int result = 0;
+
+        EVBUFFER_LOCK(buf, EVTHREAD_WRITE);
+
+        chain = buf->first;
 
        if (datlen >= buf->total_len)
                datlen = buf->total_len;
 
        if (datlen == 0)
-               return (0);
+                goto done;
 
        nread = datlen;
 
@@ -503,7 +609,10 @@ evbuffer_remove(struct evbuffer *buf, void *data_out, size_t datlen)
        if (nread)
                evbuffer_invoke_callbacks(buf);
 
-       return (nread);
+       result = nread;
+done:
+        EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE);
+        return result;
 }
 
 /* reads data from the src buffer to the dst buffer, avoids memcpy as
@@ -515,18 +624,25 @@ evbuffer_remove_buffer(struct evbuffer *src, struct evbuffer *dst,
        /*XXX We should have an option to force this to be zero-copy.*/
 
        /*XXX can fail badly on sendfile case. */
-       struct evbuffer_chain *chain = src->first;
-       struct evbuffer_chain *previous = chain, *previous_to_previous = NULL;
+       struct evbuffer_chain *chain, *previous, *previous_to_previous = NULL;
        size_t nread = 0;
+        int result;
 
-       if (datlen == 0 || dst == src)
-               return (0);
+        EVBUFFER_LOCK2(src, dst);
+
+        chain = previous = src->first;
+
+       if (datlen == 0 || dst == src) {
+               result = 0;
+                goto done;
+        }
 
        /* short-cut if there is no more data buffered */
        if (datlen >= src->total_len) {
                datlen = src->total_len;
                evbuffer_add_buffer(dst, src);
-               return (datlen);
+               result = datlen;
+                goto done;
        }
 
        /* removes chains if possible */
@@ -570,15 +686,22 @@ evbuffer_remove_buffer(struct evbuffer *src, struct evbuffer *dst,
                evbuffer_invoke_callbacks(dst);
                evbuffer_invoke_callbacks(src);
        }
+        result = nread;
 
-       return (nread);
+done:
+        EVBUFFER_UNLOCK2(src, dst);
+       return result;
 }
 
 unsigned char *
 evbuffer_pullup(struct evbuffer *buf, ssize_t size)
 {
-       struct evbuffer_chain *chain = buf->first, *next, *tmp;
-       unsigned char *buffer;
+       struct evbuffer_chain *chain, *next, *tmp;
+       unsigned char *buffer, *result = NULL;
+
+        EVBUFFER_LOCK(buf, EVTHREAD_WRITE);
+
+        chain = buf->first;
 
        if (size == -1)
                size = buf->total_len;
@@ -586,12 +709,14 @@ evbuffer_pullup(struct evbuffer *buf, ssize_t size)
         * is going to have a long enough buffer afterwards; so we return
         * NULL */
        if (size == 0 || size > buf->total_len)
-               return (NULL);
+                goto done;
 
        /* No need to pull up anything; the first size bytes are
         * already here. */
-       if (chain->off >= size)
-               return chain->buffer + chain->misalign;
+        if (chain->off >= size) {
+                result = chain->buffer + chain->misalign;
+                goto done;
+        }
 
        if (chain->buffer_len - chain->misalign >= size) {
                /* already have enough space in the first chain */
@@ -604,7 +729,7 @@ evbuffer_pullup(struct evbuffer *buf, ssize_t size)
        } else {
                if ((tmp = evbuffer_chain_new(size)) == NULL) {
                        event_warn("%s: out of memory\n", __func__);
-                       return (NULL);
+                       goto done;
                }
                buffer = tmp->buffer;
                tmp->off = size;
@@ -638,7 +763,11 @@ evbuffer_pullup(struct evbuffer *buf, ssize_t size)
 
        tmp->next = chain;
 
-       return (tmp->buffer + tmp->misalign);
+       result = (tmp->buffer + tmp->misalign);
+
+done:
+        EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE);
+        return result;
 }
 
 /*
@@ -754,6 +883,9 @@ evbuffer_readln(struct evbuffer *buffer, size_t *n_read_out,
        char *line, chr;
        unsigned int n_to_copy, extra_drain;
        int count = 0;
+        char *result = NULL;
+
+        EVBUFFER_LOCK(buffer, EVTHREAD_WRITE);
 
        it.chain = buffer->first;
        it.off = 0;
@@ -764,7 +896,7 @@ evbuffer_readln(struct evbuffer *buffer, size_t *n_read_out,
        case EVBUFFER_EOL_ANY:
                count = evbuffer_strpbrk(&it, "\r\n");
                if (count == -1)
-                       return (NULL);
+                       goto done;
 
                n_to_copy = count;
                extra_drain = evbuffer_strspn(it.chain, it.off, "\r\n");
@@ -775,7 +907,7 @@ evbuffer_readln(struct evbuffer *buffer, size_t *n_read_out,
                        count += tmp;
                        ++it.off;
                        if (evbuffer_getchr(&it, &chr) == -1)
-                               return (NULL);
+                               goto done;
                        if (chr == '\n') {
                                n_to_copy = count;
                                break;
@@ -783,7 +915,7 @@ evbuffer_readln(struct evbuffer *buffer, size_t *n_read_out,
                        ++count;
                }
                if (tmp == -1)
-                       return (NULL);
+                       goto done;
                extra_drain = 2;
                break;
        }
@@ -791,18 +923,18 @@ evbuffer_readln(struct evbuffer *buffer, size_t *n_read_out,
                /* we might strip a preceding '\r' */
        case EVBUFFER_EOL_LF:
                if ((count = evbuffer_strchr(&it, '\n')) == -1)
-                       return (NULL);
+                       goto done;
                n_to_copy = count;
                extra_drain = 1;
                break;
        default:
-               return (NULL);
+               goto done;
        }
 
        if ((line = mm_malloc(n_to_copy+1)) == NULL) {
                event_warn("%s: out of memory\n", __func__);
                evbuffer_drain(buffer, n_to_copy + extra_drain);
-               return (NULL);
+               goto done;
        }
 
        evbuffer_remove(buffer, line, n_to_copy);
@@ -815,7 +947,10 @@ evbuffer_readln(struct evbuffer *buffer, size_t *n_read_out,
        if (n_read_out)
                *n_read_out = (size_t)n_to_copy;
 
-       return (line);
+        result = line;
+done:
+        EVBUFFER_UNLOCK(buffer, EVTHREAD_WRITE);
+        return result;
 }
 
 #define EVBUFFER_CHAIN_MAX_AUTO_SIZE 4096
@@ -825,15 +960,20 @@ evbuffer_readln(struct evbuffer *buffer, size_t *n_read_out,
 int
 evbuffer_add(struct evbuffer *buf, const void *data_in, size_t datlen)
 {
-       struct evbuffer_chain *chain = buf->last, *tmp;
+       struct evbuffer_chain *chain, *tmp;
        const unsigned char *data = data_in;
        size_t remain, to_alloc;
+        int result = -1;
+
+        EVBUFFER_LOCK(buf, EVTHREAD_WRITE);
+
+        chain = buf->last;
 
        /* If there are no chains allocated for this buffer, allocate one
         * big enough to hold all the data. */
        if (chain == NULL) {
                if (evbuffer_expand(buf, datlen) == -1)
-                       return (-1);
+                       goto done;
                chain = buf->last;
        }
 
@@ -874,7 +1014,7 @@ evbuffer_add(struct evbuffer *buf, const void *data_in, size_t datlen)
                to_alloc = datlen;
        tmp = evbuffer_chain_new(to_alloc);
        if (tmp == NULL)
-               return (-1);
+               goto done;
 
        if (remain) {
                memcpy(chain->buffer + chain->misalign + chain->off,
@@ -893,18 +1033,24 @@ evbuffer_add(struct evbuffer *buf, const void *data_in, size_t datlen)
 
 out:
        evbuffer_invoke_callbacks(buf);
-
-       return (0);
+        result = 0;
+done:
+        EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE);
+       return result;
 }
 
 int
 evbuffer_prepend(struct evbuffer *buf, const void *data, size_t datlen)
 {
-       struct evbuffer_chain *chain = buf->first, *tmp;
+       struct evbuffer_chain *chain, *tmp;
+        int result = -1;
+
+        EVBUFFER_LOCK(buf, EVTHREAD_WRITE);
+        chain = buf->first;
 
        if (chain == NULL) {
                if (evbuffer_expand(buf, datlen) == -1)
-                       return (-1);
+                       goto done;
                chain = buf->first;
                chain->misalign = chain->buffer_len;
        }
@@ -934,7 +1080,7 @@ evbuffer_prepend(struct evbuffer *buf, const void *data, size_t datlen)
 
        /* we need to add another chain */
        if ((tmp = evbuffer_chain_new(datlen)) == NULL)
-               return (-1);
+               goto done;
        buf->first = tmp;
        if (buf->previous_to_last == NULL)
                buf->previous_to_last = tmp;
@@ -949,8 +1095,10 @@ evbuffer_prepend(struct evbuffer *buf, const void *data, size_t datlen)
 
 out:
        evbuffer_invoke_callbacks(buf);
-
-       return (0);
+        result = 0;
+done:
+        EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE);
+       return result;
 }
 
 /** Helper: realigns the memory in chain->buffer so that misalign is 0. */
@@ -969,23 +1117,28 @@ evbuffer_expand(struct evbuffer *buf, size_t datlen)
 {
        /* XXX we should either make this function less costly, or call it
         * less often.  */
-       struct evbuffer_chain *chain = buf->last, *tmp;
+       struct evbuffer_chain *chain, *tmp;
        size_t need, length;
+        int result = -1;
+
+        EVBUFFER_LOCK(buf, EVTHREAD_WRITE);
+
+        chain = buf->last;
 
        if (chain == NULL || (chain->flags & EVBUFFER_IMMUTABLE)) {
                chain = evbuffer_chain_new(datlen);
                if (chain == NULL)
-                       return (-1);
+                       goto err;
 
                evbuffer_chain_insert(buf, chain);
-               return (0);
+                goto ok;
        }
 
        need = chain->misalign + chain->off + datlen;
 
        /* If we can fit all the data, then we don't have to do anything */
        if (chain->buffer_len >= need)
-               return (0);
+                goto ok;
 
        /* If the misalignment plus the remaining space fulfils our
         * data needs, we just force an alignment to happen.
@@ -993,14 +1146,14 @@ evbuffer_expand(struct evbuffer *buf, size_t datlen)
         */
        if (chain->buffer_len - chain->off >= datlen) {
                evbuffer_chain_align(chain);
-               return (0);
+               goto ok;
        }
 
        /* figure out how much space we need */
        length = chain->buffer_len - chain->misalign + datlen;
        tmp = evbuffer_chain_new(length);
        if (tmp == NULL)
-               return (-1);
+               goto err;
        /* copy the data over that we had so far */
        tmp->off = chain->off;
        tmp->misalign = 0;
@@ -1015,7 +1168,11 @@ evbuffer_expand(struct evbuffer *buf, size_t datlen)
 
        evbuffer_chain_free(chain);
 
-       return (0);
+ok:
+        result = 0;
+err:
+        EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE);
+       return result;
 }
 
 /* Make sure that datlen bytes are available for writing in the last two
@@ -1026,6 +1183,8 @@ _evbuffer_expand_fast(struct evbuffer *buf, size_t datlen)
        struct evbuffer_chain *chain = buf->last, *tmp;
        size_t avail, avail_in_prev = 0;
 
+        ASSERT_EVBUFFER_LOCKED(buf);
+
        if (chain == NULL || (chain->flags & EVBUFFER_IMMUTABLE)) {
                chain = evbuffer_chain_new(datlen);
                if (chain == NULL)
@@ -1121,15 +1280,21 @@ evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch)
 {
        struct evbuffer_chain *chain = buf->last;
        int n = EVBUFFER_MAX_READ;
+        int result;
+
 #ifdef USE_IOVEC_IMPL
        int nvecs;
 #else
        unsigned char *p;
 #endif
+#if defined(FIONREAD) && defined(WIN32)
+       long lng = n;
+#endif
+
+        EVBUFFER_LOCK(buf, EVTHREAD_WRITE);
 
 #if defined(FIONREAD)
 #ifdef WIN32
-       long lng = n;
        if (ioctlsocket(fd, FIONREAD, &lng) == -1 || (n=lng) == 0) {
 #else
        if (ioctl(fd, FIONREAD, &n) == -1 || n == 0) {
@@ -1156,7 +1321,8 @@ evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch)
        /* Since we can use iovecs, we're willing to use the last
         * _two_ chains. */
        if (_evbuffer_expand_fast(buf, howmuch) == -1) {
-               return(-1);
+                result = -1;
+                goto done;
        } else {
                IOV_TYPE vecs[2];
                chain = buf->last;
@@ -1214,8 +1380,10 @@ evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch)
        /* If we don't have FIONREAD, we might waste some space here */
        /* XXX we _will_ waste some space here if there is any space left
         * over on buf->last. */
-       if (evbuffer_expand(buf, howmuch) == -1)
-               return (-1);
+       if (evbuffer_expand(buf, howmuch) == -1) {
+               result = -1;
+                goto done;
+        }
 
        chain = buf->last;
 
@@ -1229,15 +1397,19 @@ evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch)
 #endif
 #endif /* USE_IOVEC_IMPL */
 
-       if (n == -1)
-               return (-1);
-       if (n == 0)
-               return (0);
+       if (n == -1) {
+               result = -1;
+                goto done;
+        }
+       if (n == 0) {
+               result = 0;
+                goto done;
+        }
 
 #ifdef USE_IOVEC_IMPL
        if (nvecs == 2) {
                size_t space = CHAIN_SPACE_LEN(buf->previous_to_last);
-               if (space < n) {
+                       if (space < n) {
                        buf->previous_to_last->off += space;
                        chain->off += n-space;
                } else {
@@ -1254,8 +1426,10 @@ evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch)
 
        /* Tell someone about changes in this buffer */
        evbuffer_invoke_callbacks(buf);
-
-       return (n);
+        result = n;
+done:
+        EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE);
+       return result;
 }
 
 #ifdef USE_IOVEC_IMPL
@@ -1266,6 +1440,8 @@ ssize_t howmuch)
        IOV_TYPE iov[NUM_IOVEC];
        struct evbuffer_chain *chain = buffer->first;
        int n, i = 0;
+
+        ASSERT_EVBUFFER_LOCKED(buffer);
        /* XXX make this top out at some maximal data length?  if the
         * buffer has (say) 1MB in it, split over 128 chains, there's
         * no way it all gets written in one go. */
@@ -1311,6 +1487,14 @@ evbuffer_write_sendfile(struct evbuffer *buffer, evutil_socket_t fd,
 #ifdef SENDFILE_IS_FREEBSD
        int res;
        off_t len = chain->off;
+#elif SENDFILE_IS_LINUX
+       ssize_t res;
+       off_t offset = chain->misalign;
+#endif
+
+        ASSERT_EVBUFFER_LOCKED(buffer);
+
+#ifdef SENDFILE_IS_FREEBSD
        res = sendfile(info->fd, fd, chain->misalign, &len, NULL, 0);
        if (res == -1 && !EVUTIL_ERR_RW_RETRIABLE(errno))
                return (-1);
@@ -1318,8 +1502,6 @@ evbuffer_write_sendfile(struct evbuffer *buffer, evutil_socket_t fd,
        return (len);
 #elif SENDFILE_IS_LINUX
        /* TODO(niels): implement splice */
-       ssize_t res;
-       off_t offset = chain->misalign;
        res = sendfile(fd, info->fd, &offset, chain->off);
        if (res == -1 && EVUTIL_ERR_RW_RETRIABLE(errno)) {
                /* if this is EGAIN or EINTR return 0; otherwise, -1 */
@@ -1336,6 +1518,8 @@ evbuffer_write_atmost(struct evbuffer *buffer, evutil_socket_t fd,
 {
        int n;
 
+        EVBUFFER_LOCK(buffer, EVTHREAD_WRITE);
+
        if (howmuch < 0)
                howmuch = buffer->total_len;
 
@@ -1359,12 +1543,10 @@ evbuffer_write_atmost(struct evbuffer *buffer, evutil_socket_t fd,
 #endif
        }
 
-       if (n == -1)
-               return (-1);
-       if (n == 0)
-               return (0);
-       evbuffer_drain(buffer, n);
+        if (n > 0)
+                evbuffer_drain(buffer, n);
 
+        EVBUFFER_UNLOCK(buffer, EVTHREAD_WRITE);
        return (n);
 }
 
@@ -1380,12 +1562,17 @@ evbuffer_find(struct evbuffer *buffer, const unsigned char *what, size_t len)
         unsigned char *search;
         struct evbuffer_ptr ptr;
 
-        ptr = evbuffer_search(buffer, (const char *)what, len, NULL);
-        if (ptr.pos < 0)
-                return (NULL);
+        EVBUFFER_LOCK(buffer, EVTHREAD_WRITE);
 
-        search = evbuffer_pullup(buffer, ptr.pos + len);
-        return search + ptr.pos;
+        ptr = evbuffer_search(buffer, (const char *)what, len, NULL);
+        if (ptr.pos < 0) {
+                search = NULL;
+        } else {
+                search = evbuffer_pullup(buffer, ptr.pos + len);
+                search += ptr.pos;
+        }
+        EVBUFFER_UNLOCK(buffer,EVTHREAD_WRITE);
+        return search;
 }
 
 int
@@ -1395,6 +1582,8 @@ evbuffer_ptr_set(struct evbuffer *buf, struct evbuffer_ptr *pos,
         size_t left = position;
        struct evbuffer_chain *chain = NULL;
 
+        EVBUFFER_LOCK(buf, EVTHREAD_READ);
+
        switch (how) {
        case EVBUFFER_PTR_SET:
                chain = buf->first;
@@ -1423,6 +1612,8 @@ evbuffer_ptr_set(struct evbuffer *buf, struct evbuffer_ptr *pos,
                pos->pos = -1;
        }
 
+        EVBUFFER_UNLOCK(buf, EVTHREAD_READ);
+
        return chain != NULL ? 0 : -1;
 }
 
@@ -1438,6 +1629,8 @@ evbuffer_ptr_memcmp(const struct evbuffer *buf, const struct evbuffer_ptr *pos,
         size_t position;
         int r;
 
+        ASSERT_EVBUFFER_LOCKED(buf);
+
         if (pos->pos + len > buf->total_len)
                 return -1;
 
@@ -1470,6 +1663,8 @@ evbuffer_search(struct evbuffer *buffer, const char *what, size_t len, const str
        const unsigned char *p;
         char first;
 
+        EVBUFFER_LOCK(buffer, EVTHREAD_READ);
+
         if (start) {
                 memcpy(&pos, start, sizeof(pos));
                 chain = pos._internal.chain;
@@ -1480,7 +1675,7 @@ evbuffer_search(struct evbuffer *buffer, const char *what, size_t len, const str
         }
 
         if (!len)
-                return pos;
+                goto done;
 
         first = what[0];
 
@@ -1494,7 +1689,7 @@ evbuffer_search(struct evbuffer *buffer, const char *what, size_t len, const str
                         pos.pos += p - start_at;
                         pos._internal.pos_in_chain += p - start_at;
                         if (!evbuffer_ptr_memcmp(buffer, &pos, what, len))
-                                return pos;
+                                goto done;
                         ++pos.pos;
                         ++pos._internal.pos_in_chain;
                         if (pos._internal.pos_in_chain == chain->off) {
@@ -1510,6 +1705,8 @@ evbuffer_search(struct evbuffer *buffer, const char *what, size_t len, const str
 
         pos.pos = -1;
         pos._internal.chain = NULL;
+done:
+        EVBUFFER_UNLOCK(buffer, EVTHREAD_READ);
         return pos;
 }
 
@@ -1519,12 +1716,14 @@ evbuffer_add_vprintf(struct evbuffer *buf, const char *fmt, va_list ap)
 {
        char *buffer;
        size_t space;
-       int sz;
+       int sz, result = -1;
        va_list aq;
 
+        EVBUFFER_LOCK(buf, EVTHREAD_WRITE);
+
        /* make sure that at least some space is available */
        if (evbuffer_expand(buf, 64) == -1)
-               return (-1);
+               goto done;
 
        for (;;) {
                struct evbuffer_chain *chain = buf->last;
@@ -1543,20 +1742,24 @@ evbuffer_add_vprintf(struct evbuffer *buf, const char *fmt, va_list ap)
                va_end(aq);
 
                if (sz < 0)
-                       return (-1);
+                       goto done;
                if (sz < space) {
                        chain->off += sz;
                        buf->total_len += sz;
                         buf->n_add_for_cb += sz;
 
                        evbuffer_invoke_callbacks(buf);
-                       return (sz);
+                       result = sz;
+                        goto done;
                }
                if (evbuffer_expand(buf, sz + 1) == -1)
-                       return (-1);
-
-       }
+                        goto done;
+        }
        /* NOTREACHED */
+
+done:
+        EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE);
+        return result;
 }
 
 int
@@ -1592,10 +1795,12 @@ evbuffer_add_reference(struct evbuffer *outbuf,
        info->cleanupfn = cleanupfn;
        info->extra = extra;
 
+        EVBUFFER_LOCK(outbuf, EVTHREAD_WRITE);
        evbuffer_chain_insert(outbuf, chain);
         outbuf->n_add_for_cb += datlen;
 
        evbuffer_invoke_callbacks(outbuf);
+        EVBUFFER_UNLOCK(outbuf, EVTHREAD_WRITE);
 
        return (0);
 }
@@ -1633,6 +1838,7 @@ evbuffer_add_file(struct evbuffer *outbuf, int fd,
                info = EVBUFFER_CHAIN_EXTRA(struct evbuffer_chain_fd, chain);
                info->fd = fd;
 
+                EVBUFFER_LOCK(outbuf, EVTHREAD_WRITE);
                 outbuf->n_add_for_cb += length;
                evbuffer_chain_insert(outbuf, chain);
        } else
@@ -1671,6 +1877,7 @@ evbuffer_add_file(struct evbuffer *outbuf, int fd,
                info = EVBUFFER_CHAIN_EXTRA(struct evbuffer_chain_fd, chain);
                info->fd = fd;
 
+                EVBUFFER_LOCK(outbuf, EVTHREAD_WRITE);
                 outbuf->n_add_for_cb += length;
 
                evbuffer_chain_insert(outbuf, chain);
@@ -1705,6 +1912,7 @@ evbuffer_add_file(struct evbuffer *outbuf, int fd,
                        length -= read;
                }
 
+                EVBUFFER_LOCK(outbuf, EVTHREAD_WRITE);
                evbuffer_add_buffer(outbuf, tmp);
                evbuffer_free(tmp);
 
@@ -1712,6 +1920,7 @@ evbuffer_add_file(struct evbuffer *outbuf, int fd,
        }
 
        evbuffer_invoke_callbacks(outbuf);
+        EVBUFFER_UNLOCK(outbuf, EVTHREAD_WRITE);
 
        return (0);
 }
@@ -1720,6 +1929,8 @@ evbuffer_add_file(struct evbuffer *outbuf, int fd,
 void
 evbuffer_setcb(struct evbuffer *buffer, evbuffer_cb cb, void *cbarg)
 {
+        EVBUFFER_LOCK(buffer, EVTHREAD_WRITE);
+
        if (!TAILQ_EMPTY(&buffer->callbacks))
                evbuffer_remove_all_callbacks(buffer);
 
@@ -1729,6 +1940,7 @@ evbuffer_setcb(struct evbuffer *buffer, evbuffer_cb cb, void *cbarg)
                 ent->cb.cb_obsolete = cb;
                 ent->flags |= EVBUFFER_CB_OBSOLETE;
         }
+        EVBUFFER_UNLOCK(buffer, EVTHREAD_WRITE);
 }
 
 struct evbuffer_cb_entry *
@@ -1737,10 +1949,12 @@ evbuffer_add_cb(struct evbuffer *buffer, evbuffer_cb_func cb, void *cbarg)
        struct evbuffer_cb_entry *e;
        if (! (e = mm_calloc(1, sizeof(struct evbuffer_cb_entry))))
                return NULL;
+        EVBUFFER_LOCK(buffer, EVTHREAD_WRITE);
        e->cb.cb_func = cb;
        e->cbarg = cbarg;
        e->flags = EVBUFFER_CB_ENABLED;
        TAILQ_INSERT_HEAD(&buffer->callbacks, e, next);
+        EVBUFFER_UNLOCK(buffer, EVTHREAD_WRITE);
        return e;
 }
 
@@ -1748,7 +1962,9 @@ int
 evbuffer_remove_cb_entry(struct evbuffer *buffer,
                         struct evbuffer_cb_entry *ent)
 {
+        EVBUFFER_LOCK(buffer, EVTHREAD_WRITE);
        TAILQ_REMOVE(&buffer->callbacks, ent, next);
+        EVBUFFER_UNLOCK(buffer, EVTHREAD_WRITE);
        mm_free(ent);
        return 0;
 }
@@ -1757,20 +1973,26 @@ int
 evbuffer_remove_cb(struct evbuffer *buffer, evbuffer_cb_func cb, void *cbarg)
 {
        struct evbuffer_cb_entry *cbent;
+        int result = -1;
+        EVBUFFER_LOCK(buffer, EVTHREAD_WRITE);
        TAILQ_FOREACH(cbent, &buffer->callbacks, next) {
                if (cb == cbent->cb.cb_func && cbarg == cbent->cbarg) {
-                       return evbuffer_remove_cb_entry(buffer, cbent);
+                       result = evbuffer_remove_cb_entry(buffer, cbent);
+                        goto done;
                }
        }
-       return -1;
+done:
+        EVBUFFER_UNLOCK(buffer, EVTHREAD_WRITE);
+       return result;
 }
 
 int
 evbuffer_cb_set_flags(struct evbuffer *buffer,
                      struct evbuffer_cb_entry *cb, ev_uint32_t flags)
 {
-       (void)buffer; /* unused */
+        EVBUFFER_LOCK(buffer, EVTHREAD_WRITE);
        cb->flags = (cb->flags & EVBUFFER_CB_INTERNAL_FLAGS) | flags;
+        EVBUFFER_UNLOCK(buffer, EVTHREAD_WRITE);
        return 0;
 }
 
index 2751a4720b7fc2ece77495d9f7dfd5c4e10a4d64..41128cfc264f4a0eecddf9558c767a3b90ca284d 100644 (file)
@@ -72,6 +72,12 @@ struct evbuffer {
         size_t n_add_for_cb;
         size_t n_del_for_cb;
 
+#ifndef _EVENT_DISABLE_THREAD_SUPPORT
+        void *lock;
+#endif
+       unsigned own_lock : 1;
+        int lock_count : 31;
+
        TAILQ_HEAD(evbuffer_cb_queue, evbuffer_cb_entry) callbacks;
 };
 
@@ -125,6 +131,50 @@ struct evbuffer_chain_reference {
 #define EVBUFFER_CHAIN_SIZE sizeof(struct evbuffer_chain)
 #define EVBUFFER_CHAIN_EXTRA(t, c) (t *)((struct evbuffer_chain *)(c) + 1)
 
+#define ASSERT_EVBUFFER_LOCKED(buffer)                  \
+       do {                                            \
+               assert((buffer)->lock_count > 0);       \
+       } while (0)
+#define ASSERT_EVBUFFER_UNLOCKED(buffer)                  \
+       do {                                            \
+               assert((buffer)->lock_count == 0);      \
+       } while (0)
+#define _EVBUFFER_INCREMENT_LOCK_COUNT(buffer)                 \
+       do {                                                   \
+               ((struct evbuffer*)(buffer))->lock_count++;    \
+       } while (0)
+#define _EVBUFFER_DECREMENT_LOCK_COUNT(buffer)               \
+       do {                                                  \
+               ASSERT_EVBUFFER_LOCKED(buffer);               \
+               ((struct evbuffer*)(buffer))->lock_count--;   \
+       } while (0)
+
+#define EVBUFFER_LOCK(buffer, mode)                                    \
+       do {                                                            \
+               EVLOCK_LOCK((buffer)->lock, (mode));                    \
+               _EVBUFFER_INCREMENT_LOCK_COUNT(buffer);                 \
+       } while(0)
+#define EVBUFFER_UNLOCK(buffer, mode)                                  \
+       do {                                                            \
+               _EVBUFFER_DECREMENT_LOCK_COUNT(buffer);                 \
+               EVLOCK_UNLOCK((buffer)->lock, (mode));                  \
+       } while(0)
+
+#define EVBUFFER_LOCK2(buffer1, buffer2)                               \
+       do {                                                            \
+               EVLOCK_LOCK2((buffer1)->lock, (buffer2)->lock,          \
+                   EVTHREAD_WRITE, EVTHREAD_WRITE);                    \
+               _EVBUFFER_INCREMENT_LOCK_COUNT(buffer1);                \
+               _EVBUFFER_INCREMENT_LOCK_COUNT(buffer2);                \
+       } while(0)
+#define EVBUFFER_UNLOCK2(buffer1, buffer2)                             \
+       do {                                                            \
+               _EVBUFFER_DECREMENT_LOCK_COUNT(buffer1);                \
+               _EVBUFFER_DECREMENT_LOCK_COUNT(buffer2);                \
+               EVLOCK_UNLOCK2((buffer1)->lock, (buffer2)->lock,        \
+                   EVTHREAD_WRITE, EVTHREAD_WRITE);                    \
+       } while(0)
+
 #ifdef __cplusplus
 }
 #endif
index b10d15359e5c5ae0b1af125ca52e866cb4bb4885..8e94acf14414d95e2f77bf5ec7a279310e71feec 100644 (file)
@@ -101,6 +101,25 @@ struct evbuffer *evbuffer_new(void);
  */
 void evbuffer_free(struct evbuffer *buf);
 
+/**
+   Enable locking on an evbuffer so that it can safely be used by multiple
+   threads at the same time.
+
+   NOTE: when locking is enabled, the lock will be held when callbacks are
+   invoked.  This could result in deadlock if you aren't careful.  Plan
+   accordingly!
+
+   @param buf An evbuffer to make lockable.
+   @param lock A lock object, or NULL if we should allocate our own.
+   @return 0 on success, -1 on failure.
+ */
+int evbuffer_enable_locking(struct evbuffer *buf, void *lock);
+
+/* DOCDOC */
+void evbuffer_lock(struct evbuffer *buf);
+void evbuffer_unlock(struct evbuffer *buf);
+
+
 /**
   Returns the total number of bytes stored in the event buffer
 
@@ -421,7 +440,7 @@ struct evbuffer_cb_info {
     one: watch out!
 
     @param buffer the buffer whose size has changed
-    @param info a structure describing how the buffer changed
+    @param info a structure describing how the buffer changed.
     @param arg a pointer to user data
 */
 typedef void (*evbuffer_cb_func)(struct evbuffer *buffer, const struct evbuffer_cb_info *info, void *arg);