]> granicus.if.org Git - libevent/commitdiff
Use finalization feature so bufferevents can avoid deadlocks
authorNick Mathewson <nickm@torproject.org>
Tue, 9 Apr 2013 22:16:13 +0000 (18:16 -0400)
committerNick Mathewson <nickm@torproject.org>
Fri, 26 Apr 2013 16:18:07 +0000 (12:18 -0400)
Since the bufferevents' events are now EV_FINALIZE (name pending),
they won't deadlock.  To clean up properly, though, we must use the
finalization feature.

This patch also split bufferevent deallocation into an "unlink" step
that happens fast, and a "destruct" step that happens after
finalization.

More work is needed: there needs to be a way to specify a finalizer
for the bufferevent's argument itself.  Also, this finalizer business
makes lots of the reference counting we were doing unnecessary.

Also, more testing is needed.

buffer.c
bufferevent-internal.h
bufferevent.c
bufferevent_async.c
bufferevent_filter.c
bufferevent_openssl.c
bufferevent_pair.c
bufferevent_ratelim.c
bufferevent_sock.c
evbuffer-internal.h

index 7c35a69beb4c2fda050ccf2fe7f46f4ece10d078..860ba0dcd13287b712aa104da9b90036e1855acf 100644 (file)
--- a/buffer.c
+++ b/buffer.c
@@ -3345,3 +3345,21 @@ evbuffer_cb_unsuspend(struct evbuffer *buffer, struct evbuffer_cb_entry *cb)
 }
 #endif
 
+int
+evbuffer_get_callbacks_(struct evbuffer *buffer, struct event_callback **cbs,
+    int max_cbs)
+{
+       int r = 0;
+       EVBUFFER_LOCK(buffer);
+       if (buffer->deferred_cbs) {
+               if (max_cbs < 1) {
+                       r = -1;
+                       goto done;
+               }
+               cbs[0] = &buffer->deferred;
+               r = 1;
+       }
+done:
+       EVBUFFER_UNLOCK(buffer);
+       return r;
+}
index 63bf470844c502b91fdf2f1ca4da2d12b8c81fe0..ccfc70459f15487d4874d78cdea0c2d5c164a19c 100644 (file)
@@ -252,8 +252,11 @@ struct bufferevent_ops {
         */
        int (*disable)(struct bufferevent *, short);
 
+       /** DOCUMENT */
+       void (*unlink)(struct bufferevent *);
+
        /** Free any storage and deallocate any extra data or structures used
-           in this implementation.
+           in this implementation. DOCUMENT
         */
        void (*destruct)(struct bufferevent *);
 
index 7c03ce90799150f3ad8a2040369c62dd98beb1a8..b2bb0ac33c5154e1dadd845616872a43f5921268 100644 (file)
@@ -54,6 +54,7 @@
 #include "event2/bufferevent_struct.h"
 #include "event2/bufferevent_compat.h"
 #include "event2/event.h"
+#include "event-internal.h"
 #include "log-internal.h"
 #include "mm-internal.h"
 #include "bufferevent-internal.h"
@@ -61,7 +62,7 @@
 #include "util-internal.h"
 
 static void bufferevent_cancel_all_(struct bufferevent *bev);
-
+static void bufferevent_finalize_cb_(struct event_callback *evcb, void *arg_);
 
 void
 bufferevent_suspend_read_(struct bufferevent *bufev, bufferevent_suspend_flags what)
@@ -640,7 +641,9 @@ bufferevent_decref_and_unlock_(struct bufferevent *bufev)
 {
        struct bufferevent_private *bufev_private =
            EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
-       struct bufferevent *underlying;
+       int n_cbs = 0;
+#define MAX_CBS 16
+       struct event_callback *cbs[MAX_CBS];
 
        EVUTIL_ASSERT(bufev_private->refcnt > 0);
 
@@ -649,6 +652,41 @@ bufferevent_decref_and_unlock_(struct bufferevent *bufev)
                return 0;
        }
 
+       if (bufev->be_ops->unlink)
+               bufev->be_ops->unlink(bufev);
+
+       /* Okay, we're out of references. Let's finalize this once all the
+        * callbacks are done running. */
+       cbs[0] = &bufev->ev_read.ev_evcallback;
+       cbs[1] = &bufev->ev_write.ev_evcallback;
+       cbs[2] = &bufev_private->deferred;
+       n_cbs = 3;
+       if (bufev_private->rate_limiting) {
+               struct event *e = &bufev_private->rate_limiting->refill_bucket_event;
+               if (event_initialized(e))
+                       cbs[n_cbs++] = &e->ev_evcallback;
+       }
+       n_cbs += evbuffer_get_callbacks_(bufev->input, cbs+n_cbs, MAX_CBS-n_cbs);
+       n_cbs += evbuffer_get_callbacks_(bufev->output, cbs+n_cbs, MAX_CBS-n_cbs);
+
+       event_callback_finalize_many_(bufev->ev_base, n_cbs, cbs,
+           bufferevent_finalize_cb_);
+
+#undef MAX_CBS
+       BEV_UNLOCK(bufev);
+
+       return 1;
+}
+
+static void
+bufferevent_finalize_cb_(struct event_callback *evcb, void *arg_)
+{
+       struct bufferevent *bufev = arg_;
+       struct bufferevent *underlying;
+       struct bufferevent_private *bufev_private =
+           EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
+
+       BEV_LOCK(bufev);
        underlying = bufferevent_get_underlying(bufev);
 
        /* Clean up the shared info */
@@ -665,17 +703,13 @@ bufferevent_decref_and_unlock_(struct bufferevent *bufev)
        if (bufev_private->rate_limiting) {
                if (bufev_private->rate_limiting->group)
                        bufferevent_remove_from_rate_limit_group_internal_(bufev,0);
-               if (event_initialized(&bufev_private->rate_limiting->refill_bucket_event))
-                       event_del(&bufev_private->rate_limiting->refill_bucket_event);
-               event_debug_unassign(&bufev_private->rate_limiting->refill_bucket_event);
                mm_free(bufev_private->rate_limiting);
                bufev_private->rate_limiting = NULL;
        }
 
-       event_debug_unassign(&bufev->ev_read);
-       event_debug_unassign(&bufev->ev_write);
 
        BEV_UNLOCK(bufev);
+
        if (bufev_private->own_lock)
                EVTHREAD_FREE_LOCK(bufev_private->lock,
                    EVTHREAD_LOCKTYPE_RECURSIVE);
@@ -695,8 +729,6 @@ bufferevent_decref_and_unlock_(struct bufferevent *bufev)
         */
        if (underlying)
                bufferevent_decref_(underlying);
-
-       return 1;
 }
 
 int
@@ -844,9 +876,9 @@ bufferevent_generic_write_timeout_cb(evutil_socket_t fd, short event, void *ctx)
 void
 bufferevent_init_generic_timeout_cbs_(struct bufferevent *bev)
 {
-       evtimer_assign(&bev->ev_read, bev->ev_base,
+       event_assign(&bev->ev_read, bev->ev_base, -1, EV_FINALIZE,
            bufferevent_generic_read_timeout_cb, bev);
-       evtimer_assign(&bev->ev_write, bev->ev_base,
+       event_assign(&bev->ev_write, bev->ev_base, -1, EV_FINALIZE,
            bufferevent_generic_write_timeout_cb, bev);
 }
 
index 83b5c14191a8096bd28c79060097ec98d1197731..0152fd164714487578ee793aafdfc5bdbfcc5a83 100644 (file)
@@ -93,6 +93,7 @@ const struct bufferevent_ops bufferevent_ops_async = {
        evutil_offsetof(struct bufferevent_async, bev.bev),
        be_async_enable,
        be_async_disable,
+       NULL, /* Unlink */
        be_async_destruct,
        bufferevent_generic_adj_timeouts_,
        be_async_flush,
@@ -384,11 +385,6 @@ be_async_destruct(struct bufferevent *bev)
                /* XXXX possible double-close */
                evutil_closesocket(fd);
        }
-       /* delete this in case non-blocking connect was used */
-       if (event_initialized(&bev->ev_write)) {
-               event_del(&bev->ev_write);
-               bufferevent_del_generic_timeout_cbs_(bev);
-       }
 }
 
 /* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so
index 8a74f808afad7ae59406ce9b5d66a1ef13918fe0..cc02230cf52d629037073d374eb7ba00a2d4e013 100644 (file)
@@ -61,6 +61,7 @@
 /* prototypes */
 static int be_filter_enable(struct bufferevent *, short);
 static int be_filter_disable(struct bufferevent *, short);
+static void be_filter_unlink(struct bufferevent *);
 static void be_filter_destruct(struct bufferevent *);
 
 static void be_filter_readcb(struct bufferevent *, void *);
@@ -99,6 +100,7 @@ const struct bufferevent_ops bufferevent_ops_filter = {
        evutil_offsetof(struct bufferevent_filtered, bev.bev),
        be_filter_enable,
        be_filter_disable,
+       be_filter_unlink,
        be_filter_destruct,
        bufferevent_generic_adj_timeouts_,
        be_filter_flush,
@@ -214,12 +216,10 @@ bufferevent_filter_new(struct bufferevent *underlying,
 }
 
 static void
-be_filter_destruct(struct bufferevent *bev)
+be_filter_unlink(struct bufferevent *bev)
 {
        struct bufferevent_filtered *bevf = upcast(bev);
        EVUTIL_ASSERT(bevf);
-       if (bevf->free_context)
-               bevf->free_context(bevf->context);
 
        if (bevf->bev.options & BEV_OPT_CLOSE_ON_FREE) {
                /* Yes, there is also a decref in bufferevent_decref_.
@@ -242,8 +242,15 @@ be_filter_destruct(struct bufferevent *bev)
                            BEV_SUSPEND_FILT_READ);
                }
        }
+}
 
-       bufferevent_del_generic_timeout_cbs_(bev);
+static void
+be_filter_destruct(struct bufferevent *bev)
+{
+       struct bufferevent_filtered *bevf = upcast(bev);
+       EVUTIL_ASSERT(bevf);
+       if (bevf->free_context)
+               bevf->free_context(bevf->context);
 }
 
 static int
index 99ed5f8d68e308159ef3b7fdd62e2f9b621a5116..48c61c086ad0c40978dcd58315a85d06f4ab401d 100644 (file)
@@ -326,6 +326,7 @@ struct bufferevent_openssl {
 
 static int be_openssl_enable(struct bufferevent *, short);
 static int be_openssl_disable(struct bufferevent *, short);
+static void be_openssl_unlink(struct bufferevent *);
 static void be_openssl_destruct(struct bufferevent *);
 static int be_openssl_adj_timeouts(struct bufferevent *);
 static int be_openssl_flush(struct bufferevent *bufev,
@@ -337,6 +338,7 @@ const struct bufferevent_ops bufferevent_ops_openssl = {
        evutil_offsetof(struct bufferevent_openssl, bev.bev),
        be_openssl_enable,
        be_openssl_disable,
+       be_openssl_unlink,
        be_openssl_destruct,
        be_openssl_adj_timeouts,
        be_openssl_flush,
@@ -977,9 +979,11 @@ set_open_callbacks(struct bufferevent_openssl *bev_ssl, evutil_socket_t fd)
                        event_del(&bev->ev_write);
                }
                event_assign(&bev->ev_read, bev->ev_base, fd,
-                   EV_READ|EV_PERSIST, be_openssl_readeventcb, bev_ssl);
+                   EV_READ|EV_PERSIST|EV_FINALIZE,
+                   be_openssl_readeventcb, bev_ssl);
                event_assign(&bev->ev_write, bev->ev_base, fd,
-                   EV_WRITE|EV_PERSIST, be_openssl_writeeventcb, bev_ssl);
+                   EV_WRITE|EV_PERSIST|EV_FINALIZE,
+                   be_openssl_writeeventcb, bev_ssl);
                if (rpending)
                        r1 = bufferevent_add_event_(&bev->ev_read, &bev->timeout_read);
                if (wpending)
@@ -1079,9 +1083,11 @@ set_handshake_callbacks(struct bufferevent_openssl *bev_ssl, evutil_socket_t fd)
                        event_del(&bev->ev_write);
                }
                event_assign(&bev->ev_read, bev->ev_base, fd,
-                   EV_READ|EV_PERSIST, be_openssl_handshakeeventcb, bev_ssl);
+                   EV_READ|EV_PERSIST|EV_FINALIZE,
+                   be_openssl_handshakeeventcb, bev_ssl);
                event_assign(&bev->ev_write, bev->ev_base, fd,
-                   EV_WRITE|EV_PERSIST, be_openssl_handshakeeventcb, bev_ssl);
+                   EV_WRITE|EV_PERSIST|EV_FINALIZE,
+                   be_openssl_handshakeeventcb, bev_ssl);
                if (fd >= 0) {
                        r1 = bufferevent_add_event_(&bev->ev_read, &bev->timeout_read);
                        r2 = bufferevent_add_event_(&bev->ev_write, &bev->timeout_write);
@@ -1176,17 +1182,10 @@ be_openssl_disable(struct bufferevent *bev, short events)
 }
 
 static void
-be_openssl_destruct(struct bufferevent *bev)
+be_openssl_unlink(struct bufferevent *bev)
 {
        struct bufferevent_openssl *bev_ssl = upcast(bev);
 
-       if (bev_ssl->underlying) {
-               bufferevent_del_generic_timeout_cbs_(bev);
-       } else {
-               event_del(&bev->ev_read);
-               event_del(&bev->ev_write);
-       }
-
        if (bev_ssl->bev.options & BEV_OPT_CLOSE_ON_FREE) {
                if (bev_ssl->underlying) {
                        if (BEV_UPCAST(bev_ssl->underlying)->refcnt < 2) {
@@ -1194,17 +1193,11 @@ be_openssl_destruct(struct bufferevent *bev)
                                    "bufferevent with too few references");
                        } else {
                                bufferevent_free(bev_ssl->underlying);
-                               bev_ssl->underlying = NULL;
+                               /* We still have a reference to it, since DOCUMENT. So we don't
+                                * drop this. */
+                               // bev_ssl->underlying = NULL;
                        }
-               } else {
-                       evutil_socket_t fd = -1;
-                       BIO *bio = SSL_get_wbio(bev_ssl->ssl);
-                       if (bio)
-                               fd = BIO_get_fd(bio, NULL);
-                       if (fd >= 0)
-                               evutil_closesocket(fd);
                }
-               SSL_free(bev_ssl->ssl);
        } else {
                if (bev_ssl->underlying) {
                        if (bev_ssl->underlying->errorcb == be_openssl_eventcb)
@@ -1216,6 +1209,24 @@ be_openssl_destruct(struct bufferevent *bev)
        }
 }
 
+static void
+be_openssl_destruct(struct bufferevent *bev)
+{
+       struct bufferevent_openssl *bev_ssl = upcast(bev);
+
+       if (bev_ssl->bev.options & BEV_OPT_CLOSE_ON_FREE) {
+               if (! bev_ssl->underlying) {
+                       evutil_socket_t fd = -1;
+                       BIO *bio = SSL_get_wbio(bev_ssl->ssl);
+                       if (bio)
+                               fd = BIO_get_fd(bio, NULL);
+                       if (fd >= 0)
+                               evutil_closesocket(fd);
+               }
+               SSL_free(bev_ssl->ssl);
+       }
+}
+
 static int
 be_openssl_adj_timeouts(struct bufferevent *bev)
 {
index 16edad3dd7eee44a0d38889f57d862b6d50b51aa..4d467260b5bfddea9a29f112ada6f52aa8db3b99 100644 (file)
@@ -267,7 +267,7 @@ be_pair_disable(struct bufferevent *bev, short events)
 }
 
 static void
-be_pair_destruct(struct bufferevent *bev)
+be_pair_unlink(struct bufferevent *bev)
 {
        struct bufferevent_pair *bev_p = upcast(bev);
 
@@ -275,8 +275,6 @@ be_pair_destruct(struct bufferevent *bev)
                bev_p->partner->partner = NULL;
                bev_p->partner = NULL;
        }
-
-       bufferevent_del_generic_timeout_cbs_(bev);
 }
 
 static int
@@ -327,7 +325,8 @@ const struct bufferevent_ops bufferevent_ops_pair = {
        evutil_offsetof(struct bufferevent_pair, bev.bev),
        be_pair_enable,
        be_pair_disable,
-       be_pair_destruct,
+       be_pair_unlink,
+       NULL, /* be_pair_destruct, */
        bufferevent_generic_adj_timeouts_,
        be_pair_flush,
        NULL, /* ctrl */
index f7de86a9309ece739cfca1007562a7e75dad9630..28fc0356bc4a540c4372da4eb691709b7edbf81c 100644 (file)
@@ -609,8 +609,8 @@ bufferevent_set_rate_limit(struct bufferevent *bev,
                EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
                event_del(&rlim->refill_bucket_event);
        }
-       evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
-           bev_refill_callback_, bevp);
+       event_assign(&rlim->refill_bucket_event, bev->ev_base,
+           -1, EV_FINALIZE, bev_refill_callback_, bevp);
 
        if (rlim->limit.read_limit > 0) {
                bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
@@ -654,7 +654,7 @@ bufferevent_rate_limit_group_new(struct event_base *base,
 
        ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
 
-       event_assign(&g->master_refill_event, base, -1, EV_PERSIST,
+       event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
            bev_group_refill_callback_, g);
        /*XXXX handle event_add failure */
        event_add(&g->master_refill_event, &cfg->tick_timeout);
@@ -748,8 +748,8 @@ bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
                        BEV_UNLOCK(bev);
                        return -1;
                }
-               evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
-                   bev_refill_callback_, bevp);
+               event_assign(&rlim->refill_bucket_event, bev->ev_base,
+                   -1, EV_FINALIZE, bev_refill_callback_, bevp);
                bevp->rate_limiting = rlim;
        }
 
index 592be3a8d1a7983d28524dcac370b21e44655942..5ce4953b8dfa4cae25fb13d0cab9dd58c1529840 100644 (file)
@@ -90,6 +90,7 @@ const struct bufferevent_ops bufferevent_ops_socket = {
        evutil_offsetof(struct bufferevent_private, bev),
        be_socket_enable,
        be_socket_disable,
+       NULL, /* unlink */
        be_socket_destruct,
        be_socket_adj_timeouts,
        be_socket_flush,
@@ -338,9 +339,9 @@ bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,
        evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD);
 
        event_assign(&bufev->ev_read, bufev->ev_base, fd,
-           EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
+           EV_READ|EV_PERSIST|EV_FINALIZE, bufferevent_readcb, bufev);
        event_assign(&bufev->ev_write, bufev->ev_base, fd,
-           EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);
+           EV_WRITE|EV_PERSIST|EV_FINALIZE, bufferevent_writecb, bufev);
 
        evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);
 
@@ -399,7 +400,7 @@ bufferevent_socket_connect(struct bufferevent *bev,
         * on a non-blocking connect() when ConnectEx() is unavailable. */
        if (BEV_IS_ASYNC(bev)) {
                event_assign(&bev->ev_write, bev->ev_base, fd,
-                   EV_WRITE|EV_PERSIST, bufferevent_writecb, bev);
+                   EV_WRITE|EV_PERSIST|EV_FINALIZE, bufferevent_writecb, bev);
        }
 #endif
        bufferevent_setfd(bev, fd);
@@ -589,9 +590,6 @@ be_socket_destruct(struct bufferevent *bufev)
 
        fd = event_get_fd(&bufev->ev_read);
 
-       event_del(&bufev->ev_read);
-       event_del(&bufev->ev_write);
-
        if ((bufev_p->options & BEV_OPT_CLOSE_ON_FREE) && fd >= 0)
                EVUTIL_CLOSESOCKET(fd);
 }
@@ -637,9 +635,9 @@ be_socket_setfd(struct bufferevent *bufev, evutil_socket_t fd)
        event_del(&bufev->ev_write);
 
        event_assign(&bufev->ev_read, bufev->ev_base, fd,
-           EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
+           EV_READ|EV_PERSIST|EV_FINALIZE, bufferevent_readcb, bufev);
        event_assign(&bufev->ev_write, bufev->ev_base, fd,
-           EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);
+           EV_WRITE|EV_PERSIST|EV_FINALIZE, bufferevent_writecb, bufev);
 
        if (fd >= 0)
                bufferevent_enable(bufev, bufev->enabled);
index 911243384aa7db81cb2391500b204cddf4947d41..fb67ec0957257603433b9418d3f971f0ab3ed41c 100644 (file)
@@ -327,6 +327,11 @@ void evbuffer_set_parent_(struct evbuffer *buf, struct bufferevent *bev);
 
 void evbuffer_invoke_callbacks_(struct evbuffer *buf);
 
+
+int evbuffer_get_callbacks_(struct evbuffer *buffer,
+    struct event_callback **cbs,
+    int max_cbs);
+
 #ifdef __cplusplus
 }
 #endif