From: Nick Mathewson Date: Tue, 9 Apr 2013 22:16:13 +0000 (-0400) Subject: Use finalization feature so bufferevents can avoid deadlocks X-Git-Tag: release-2.1.3-alpha~19^2~5 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=02fbf68770d3dcb864c867124e159b3680036206;p=libevent Use finalization feature so bufferevents can avoid deadlocks Since the bufferevents' events are now EV_FINALIZE (name pending), they won't deadlock. To clean up properly, though, we must use the finalization feature. This patch also split bufferevent deallocation into an "unlink" step that happens fast, and a "destruct" step that happens after finalization. More work is needed: there needs to be a way to specify a finalizer for the bufferevent's argument itself. Also, this finalizer business makes lots of the reference counting we were doing unnecessary. Also, more testing is needed. --- diff --git a/buffer.c b/buffer.c index 7c35a69b..860ba0dc 100644 --- a/buffer.c +++ b/buffer.c @@ -3345,3 +3345,21 @@ evbuffer_cb_unsuspend(struct evbuffer *buffer, struct evbuffer_cb_entry *cb) } #endif +int +evbuffer_get_callbacks_(struct evbuffer *buffer, struct event_callback **cbs, + int max_cbs) +{ + int r = 0; + EVBUFFER_LOCK(buffer); + if (buffer->deferred_cbs) { + if (max_cbs < 1) { + r = -1; + goto done; + } + cbs[0] = &buffer->deferred; + r = 1; + } +done: + EVBUFFER_UNLOCK(buffer); + return r; +} diff --git a/bufferevent-internal.h b/bufferevent-internal.h index 63bf4708..ccfc7045 100644 --- a/bufferevent-internal.h +++ b/bufferevent-internal.h @@ -252,8 +252,11 @@ struct bufferevent_ops { */ int (*disable)(struct bufferevent *, short); + /** DOCUMENT */ + void (*unlink)(struct bufferevent *); + /** Free any storage and deallocate any extra data or structures used - in this implementation. + in this implementation. DOCUMENT */ void (*destruct)(struct bufferevent *); diff --git a/bufferevent.c b/bufferevent.c index 7c03ce90..b2bb0ac3 100644 --- a/bufferevent.c +++ b/bufferevent.c @@ -54,6 +54,7 @@ #include "event2/bufferevent_struct.h" #include "event2/bufferevent_compat.h" #include "event2/event.h" +#include "event-internal.h" #include "log-internal.h" #include "mm-internal.h" #include "bufferevent-internal.h" @@ -61,7 +62,7 @@ #include "util-internal.h" static void bufferevent_cancel_all_(struct bufferevent *bev); - +static void bufferevent_finalize_cb_(struct event_callback *evcb, void *arg_); void bufferevent_suspend_read_(struct bufferevent *bufev, bufferevent_suspend_flags what) @@ -640,7 +641,9 @@ bufferevent_decref_and_unlock_(struct bufferevent *bufev) { struct bufferevent_private *bufev_private = EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); - struct bufferevent *underlying; + int n_cbs = 0; +#define MAX_CBS 16 + struct event_callback *cbs[MAX_CBS]; EVUTIL_ASSERT(bufev_private->refcnt > 0); @@ -649,6 +652,41 @@ bufferevent_decref_and_unlock_(struct bufferevent *bufev) return 0; } + if (bufev->be_ops->unlink) + bufev->be_ops->unlink(bufev); + + /* Okay, we're out of references. Let's finalize this once all the + * callbacks are done running. */ + cbs[0] = &bufev->ev_read.ev_evcallback; + cbs[1] = &bufev->ev_write.ev_evcallback; + cbs[2] = &bufev_private->deferred; + n_cbs = 3; + if (bufev_private->rate_limiting) { + struct event *e = &bufev_private->rate_limiting->refill_bucket_event; + if (event_initialized(e)) + cbs[n_cbs++] = &e->ev_evcallback; + } + n_cbs += evbuffer_get_callbacks_(bufev->input, cbs+n_cbs, MAX_CBS-n_cbs); + n_cbs += evbuffer_get_callbacks_(bufev->output, cbs+n_cbs, MAX_CBS-n_cbs); + + event_callback_finalize_many_(bufev->ev_base, n_cbs, cbs, + bufferevent_finalize_cb_); + +#undef MAX_CBS + BEV_UNLOCK(bufev); + + return 1; +} + +static void +bufferevent_finalize_cb_(struct event_callback *evcb, void *arg_) +{ + struct bufferevent *bufev = arg_; + struct bufferevent *underlying; + struct bufferevent_private *bufev_private = + EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); + + BEV_LOCK(bufev); underlying = bufferevent_get_underlying(bufev); /* Clean up the shared info */ @@ -665,17 +703,13 @@ bufferevent_decref_and_unlock_(struct bufferevent *bufev) if (bufev_private->rate_limiting) { if (bufev_private->rate_limiting->group) bufferevent_remove_from_rate_limit_group_internal_(bufev,0); - if (event_initialized(&bufev_private->rate_limiting->refill_bucket_event)) - event_del(&bufev_private->rate_limiting->refill_bucket_event); - event_debug_unassign(&bufev_private->rate_limiting->refill_bucket_event); mm_free(bufev_private->rate_limiting); bufev_private->rate_limiting = NULL; } - event_debug_unassign(&bufev->ev_read); - event_debug_unassign(&bufev->ev_write); BEV_UNLOCK(bufev); + if (bufev_private->own_lock) EVTHREAD_FREE_LOCK(bufev_private->lock, EVTHREAD_LOCKTYPE_RECURSIVE); @@ -695,8 +729,6 @@ bufferevent_decref_and_unlock_(struct bufferevent *bufev) */ if (underlying) bufferevent_decref_(underlying); - - return 1; } int @@ -844,9 +876,9 @@ bufferevent_generic_write_timeout_cb(evutil_socket_t fd, short event, void *ctx) void bufferevent_init_generic_timeout_cbs_(struct bufferevent *bev) { - evtimer_assign(&bev->ev_read, bev->ev_base, + event_assign(&bev->ev_read, bev->ev_base, -1, EV_FINALIZE, bufferevent_generic_read_timeout_cb, bev); - evtimer_assign(&bev->ev_write, bev->ev_base, + event_assign(&bev->ev_write, bev->ev_base, -1, EV_FINALIZE, bufferevent_generic_write_timeout_cb, bev); } diff --git a/bufferevent_async.c b/bufferevent_async.c index 83b5c141..0152fd16 100644 --- a/bufferevent_async.c +++ b/bufferevent_async.c @@ -93,6 +93,7 @@ const struct bufferevent_ops bufferevent_ops_async = { evutil_offsetof(struct bufferevent_async, bev.bev), be_async_enable, be_async_disable, + NULL, /* Unlink */ be_async_destruct, bufferevent_generic_adj_timeouts_, be_async_flush, @@ -384,11 +385,6 @@ be_async_destruct(struct bufferevent *bev) /* XXXX possible double-close */ evutil_closesocket(fd); } - /* delete this in case non-blocking connect was used */ - if (event_initialized(&bev->ev_write)) { - event_del(&bev->ev_write); - bufferevent_del_generic_timeout_cbs_(bev); - } } /* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so diff --git a/bufferevent_filter.c b/bufferevent_filter.c index 8a74f808..cc02230c 100644 --- a/bufferevent_filter.c +++ b/bufferevent_filter.c @@ -61,6 +61,7 @@ /* prototypes */ static int be_filter_enable(struct bufferevent *, short); static int be_filter_disable(struct bufferevent *, short); +static void be_filter_unlink(struct bufferevent *); static void be_filter_destruct(struct bufferevent *); static void be_filter_readcb(struct bufferevent *, void *); @@ -99,6 +100,7 @@ const struct bufferevent_ops bufferevent_ops_filter = { evutil_offsetof(struct bufferevent_filtered, bev.bev), be_filter_enable, be_filter_disable, + be_filter_unlink, be_filter_destruct, bufferevent_generic_adj_timeouts_, be_filter_flush, @@ -214,12 +216,10 @@ bufferevent_filter_new(struct bufferevent *underlying, } static void -be_filter_destruct(struct bufferevent *bev) +be_filter_unlink(struct bufferevent *bev) { struct bufferevent_filtered *bevf = upcast(bev); EVUTIL_ASSERT(bevf); - if (bevf->free_context) - bevf->free_context(bevf->context); if (bevf->bev.options & BEV_OPT_CLOSE_ON_FREE) { /* Yes, there is also a decref in bufferevent_decref_. @@ -242,8 +242,15 @@ be_filter_destruct(struct bufferevent *bev) BEV_SUSPEND_FILT_READ); } } +} - bufferevent_del_generic_timeout_cbs_(bev); +static void +be_filter_destruct(struct bufferevent *bev) +{ + struct bufferevent_filtered *bevf = upcast(bev); + EVUTIL_ASSERT(bevf); + if (bevf->free_context) + bevf->free_context(bevf->context); } static int diff --git a/bufferevent_openssl.c b/bufferevent_openssl.c index 99ed5f8d..48c61c08 100644 --- a/bufferevent_openssl.c +++ b/bufferevent_openssl.c @@ -326,6 +326,7 @@ struct bufferevent_openssl { static int be_openssl_enable(struct bufferevent *, short); static int be_openssl_disable(struct bufferevent *, short); +static void be_openssl_unlink(struct bufferevent *); static void be_openssl_destruct(struct bufferevent *); static int be_openssl_adj_timeouts(struct bufferevent *); static int be_openssl_flush(struct bufferevent *bufev, @@ -337,6 +338,7 @@ const struct bufferevent_ops bufferevent_ops_openssl = { evutil_offsetof(struct bufferevent_openssl, bev.bev), be_openssl_enable, be_openssl_disable, + be_openssl_unlink, be_openssl_destruct, be_openssl_adj_timeouts, be_openssl_flush, @@ -977,9 +979,11 @@ set_open_callbacks(struct bufferevent_openssl *bev_ssl, evutil_socket_t fd) event_del(&bev->ev_write); } event_assign(&bev->ev_read, bev->ev_base, fd, - EV_READ|EV_PERSIST, be_openssl_readeventcb, bev_ssl); + EV_READ|EV_PERSIST|EV_FINALIZE, + be_openssl_readeventcb, bev_ssl); event_assign(&bev->ev_write, bev->ev_base, fd, - EV_WRITE|EV_PERSIST, be_openssl_writeeventcb, bev_ssl); + EV_WRITE|EV_PERSIST|EV_FINALIZE, + be_openssl_writeeventcb, bev_ssl); if (rpending) r1 = bufferevent_add_event_(&bev->ev_read, &bev->timeout_read); if (wpending) @@ -1079,9 +1083,11 @@ set_handshake_callbacks(struct bufferevent_openssl *bev_ssl, evutil_socket_t fd) event_del(&bev->ev_write); } event_assign(&bev->ev_read, bev->ev_base, fd, - EV_READ|EV_PERSIST, be_openssl_handshakeeventcb, bev_ssl); + EV_READ|EV_PERSIST|EV_FINALIZE, + be_openssl_handshakeeventcb, bev_ssl); event_assign(&bev->ev_write, bev->ev_base, fd, - EV_WRITE|EV_PERSIST, be_openssl_handshakeeventcb, bev_ssl); + EV_WRITE|EV_PERSIST|EV_FINALIZE, + be_openssl_handshakeeventcb, bev_ssl); if (fd >= 0) { r1 = bufferevent_add_event_(&bev->ev_read, &bev->timeout_read); r2 = bufferevent_add_event_(&bev->ev_write, &bev->timeout_write); @@ -1176,17 +1182,10 @@ be_openssl_disable(struct bufferevent *bev, short events) } static void -be_openssl_destruct(struct bufferevent *bev) +be_openssl_unlink(struct bufferevent *bev) { struct bufferevent_openssl *bev_ssl = upcast(bev); - if (bev_ssl->underlying) { - bufferevent_del_generic_timeout_cbs_(bev); - } else { - event_del(&bev->ev_read); - event_del(&bev->ev_write); - } - if (bev_ssl->bev.options & BEV_OPT_CLOSE_ON_FREE) { if (bev_ssl->underlying) { if (BEV_UPCAST(bev_ssl->underlying)->refcnt < 2) { @@ -1194,17 +1193,11 @@ be_openssl_destruct(struct bufferevent *bev) "bufferevent with too few references"); } else { bufferevent_free(bev_ssl->underlying); - bev_ssl->underlying = NULL; + /* We still have a reference to it, since DOCUMENT. So we don't + * drop this. */ + // bev_ssl->underlying = NULL; } - } else { - evutil_socket_t fd = -1; - BIO *bio = SSL_get_wbio(bev_ssl->ssl); - if (bio) - fd = BIO_get_fd(bio, NULL); - if (fd >= 0) - evutil_closesocket(fd); } - SSL_free(bev_ssl->ssl); } else { if (bev_ssl->underlying) { if (bev_ssl->underlying->errorcb == be_openssl_eventcb) @@ -1216,6 +1209,24 @@ be_openssl_destruct(struct bufferevent *bev) } } +static void +be_openssl_destruct(struct bufferevent *bev) +{ + struct bufferevent_openssl *bev_ssl = upcast(bev); + + if (bev_ssl->bev.options & BEV_OPT_CLOSE_ON_FREE) { + if (! bev_ssl->underlying) { + evutil_socket_t fd = -1; + BIO *bio = SSL_get_wbio(bev_ssl->ssl); + if (bio) + fd = BIO_get_fd(bio, NULL); + if (fd >= 0) + evutil_closesocket(fd); + } + SSL_free(bev_ssl->ssl); + } +} + static int be_openssl_adj_timeouts(struct bufferevent *bev) { diff --git a/bufferevent_pair.c b/bufferevent_pair.c index 16edad3d..4d467260 100644 --- a/bufferevent_pair.c +++ b/bufferevent_pair.c @@ -267,7 +267,7 @@ be_pair_disable(struct bufferevent *bev, short events) } static void -be_pair_destruct(struct bufferevent *bev) +be_pair_unlink(struct bufferevent *bev) { struct bufferevent_pair *bev_p = upcast(bev); @@ -275,8 +275,6 @@ be_pair_destruct(struct bufferevent *bev) bev_p->partner->partner = NULL; bev_p->partner = NULL; } - - bufferevent_del_generic_timeout_cbs_(bev); } static int @@ -327,7 +325,8 @@ const struct bufferevent_ops bufferevent_ops_pair = { evutil_offsetof(struct bufferevent_pair, bev.bev), be_pair_enable, be_pair_disable, - be_pair_destruct, + be_pair_unlink, + NULL, /* be_pair_destruct, */ bufferevent_generic_adj_timeouts_, be_pair_flush, NULL, /* ctrl */ diff --git a/bufferevent_ratelim.c b/bufferevent_ratelim.c index f7de86a9..28fc0356 100644 --- a/bufferevent_ratelim.c +++ b/bufferevent_ratelim.c @@ -609,8 +609,8 @@ bufferevent_set_rate_limit(struct bufferevent *bev, EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event)); event_del(&rlim->refill_bucket_event); } - evtimer_assign(&rlim->refill_bucket_event, bev->ev_base, - bev_refill_callback_, bevp); + event_assign(&rlim->refill_bucket_event, bev->ev_base, + -1, EV_FINALIZE, bev_refill_callback_, bevp); if (rlim->limit.read_limit > 0) { bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW); @@ -654,7 +654,7 @@ bufferevent_rate_limit_group_new(struct event_base *base, ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0); - event_assign(&g->master_refill_event, base, -1, EV_PERSIST, + event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE, bev_group_refill_callback_, g); /*XXXX handle event_add failure */ event_add(&g->master_refill_event, &cfg->tick_timeout); @@ -748,8 +748,8 @@ bufferevent_add_to_rate_limit_group(struct bufferevent *bev, BEV_UNLOCK(bev); return -1; } - evtimer_assign(&rlim->refill_bucket_event, bev->ev_base, - bev_refill_callback_, bevp); + event_assign(&rlim->refill_bucket_event, bev->ev_base, + -1, EV_FINALIZE, bev_refill_callback_, bevp); bevp->rate_limiting = rlim; } diff --git a/bufferevent_sock.c b/bufferevent_sock.c index 592be3a8..5ce4953b 100644 --- a/bufferevent_sock.c +++ b/bufferevent_sock.c @@ -90,6 +90,7 @@ const struct bufferevent_ops bufferevent_ops_socket = { evutil_offsetof(struct bufferevent_private, bev), be_socket_enable, be_socket_disable, + NULL, /* unlink */ be_socket_destruct, be_socket_adj_timeouts, be_socket_flush, @@ -338,9 +339,9 @@ bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD); event_assign(&bufev->ev_read, bufev->ev_base, fd, - EV_READ|EV_PERSIST, bufferevent_readcb, bufev); + EV_READ|EV_PERSIST|EV_FINALIZE, bufferevent_readcb, bufev); event_assign(&bufev->ev_write, bufev->ev_base, fd, - EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev); + EV_WRITE|EV_PERSIST|EV_FINALIZE, bufferevent_writecb, bufev); evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev); @@ -399,7 +400,7 @@ bufferevent_socket_connect(struct bufferevent *bev, * on a non-blocking connect() when ConnectEx() is unavailable. */ if (BEV_IS_ASYNC(bev)) { event_assign(&bev->ev_write, bev->ev_base, fd, - EV_WRITE|EV_PERSIST, bufferevent_writecb, bev); + EV_WRITE|EV_PERSIST|EV_FINALIZE, bufferevent_writecb, bev); } #endif bufferevent_setfd(bev, fd); @@ -589,9 +590,6 @@ be_socket_destruct(struct bufferevent *bufev) fd = event_get_fd(&bufev->ev_read); - event_del(&bufev->ev_read); - event_del(&bufev->ev_write); - if ((bufev_p->options & BEV_OPT_CLOSE_ON_FREE) && fd >= 0) EVUTIL_CLOSESOCKET(fd); } @@ -637,9 +635,9 @@ be_socket_setfd(struct bufferevent *bufev, evutil_socket_t fd) event_del(&bufev->ev_write); event_assign(&bufev->ev_read, bufev->ev_base, fd, - EV_READ|EV_PERSIST, bufferevent_readcb, bufev); + EV_READ|EV_PERSIST|EV_FINALIZE, bufferevent_readcb, bufev); event_assign(&bufev->ev_write, bufev->ev_base, fd, - EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev); + EV_WRITE|EV_PERSIST|EV_FINALIZE, bufferevent_writecb, bufev); if (fd >= 0) bufferevent_enable(bufev, bufev->enabled); diff --git a/evbuffer-internal.h b/evbuffer-internal.h index 91124338..fb67ec09 100644 --- a/evbuffer-internal.h +++ b/evbuffer-internal.h @@ -327,6 +327,11 @@ void evbuffer_set_parent_(struct evbuffer *buf, struct bufferevent *bev); void evbuffer_invoke_callbacks_(struct evbuffer *buf); + +int evbuffer_get_callbacks_(struct evbuffer *buffer, + struct event_callback **cbs, + int max_cbs); + #ifdef __cplusplus } #endif