}
#endif
+int
+evbuffer_get_callbacks_(struct evbuffer *buffer, struct event_callback **cbs,
+ int max_cbs)
+{
+ int r = 0;
+ EVBUFFER_LOCK(buffer);
+ if (buffer->deferred_cbs) {
+ if (max_cbs < 1) {
+ r = -1;
+ goto done;
+ }
+ cbs[0] = &buffer->deferred;
+ r = 1;
+ }
+done:
+ EVBUFFER_UNLOCK(buffer);
+ return r;
+}
*/
int (*disable)(struct bufferevent *, short);
+ /** DOCUMENT */
+ void (*unlink)(struct bufferevent *);
+
/** Free any storage and deallocate any extra data or structures used
- in this implementation.
+ in this implementation. DOCUMENT
*/
void (*destruct)(struct bufferevent *);
#include "event2/bufferevent_struct.h"
#include "event2/bufferevent_compat.h"
#include "event2/event.h"
+#include "event-internal.h"
#include "log-internal.h"
#include "mm-internal.h"
#include "bufferevent-internal.h"
#include "util-internal.h"
static void bufferevent_cancel_all_(struct bufferevent *bev);
-
+static void bufferevent_finalize_cb_(struct event_callback *evcb, void *arg_);
void
bufferevent_suspend_read_(struct bufferevent *bufev, bufferevent_suspend_flags what)
{
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
- struct bufferevent *underlying;
+ int n_cbs = 0;
+#define MAX_CBS 16
+ struct event_callback *cbs[MAX_CBS];
EVUTIL_ASSERT(bufev_private->refcnt > 0);
return 0;
}
+ if (bufev->be_ops->unlink)
+ bufev->be_ops->unlink(bufev);
+
+ /* Okay, we're out of references. Let's finalize this once all the
+ * callbacks are done running. */
+ cbs[0] = &bufev->ev_read.ev_evcallback;
+ cbs[1] = &bufev->ev_write.ev_evcallback;
+ cbs[2] = &bufev_private->deferred;
+ n_cbs = 3;
+ if (bufev_private->rate_limiting) {
+ struct event *e = &bufev_private->rate_limiting->refill_bucket_event;
+ if (event_initialized(e))
+ cbs[n_cbs++] = &e->ev_evcallback;
+ }
+ n_cbs += evbuffer_get_callbacks_(bufev->input, cbs+n_cbs, MAX_CBS-n_cbs);
+ n_cbs += evbuffer_get_callbacks_(bufev->output, cbs+n_cbs, MAX_CBS-n_cbs);
+
+ event_callback_finalize_many_(bufev->ev_base, n_cbs, cbs,
+ bufferevent_finalize_cb_);
+
+#undef MAX_CBS
+ BEV_UNLOCK(bufev);
+
+ return 1;
+}
+
+static void
+bufferevent_finalize_cb_(struct event_callback *evcb, void *arg_)
+{
+ struct bufferevent *bufev = arg_;
+ struct bufferevent *underlying;
+ struct bufferevent_private *bufev_private =
+ EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
+
+ BEV_LOCK(bufev);
underlying = bufferevent_get_underlying(bufev);
/* Clean up the shared info */
if (bufev_private->rate_limiting) {
if (bufev_private->rate_limiting->group)
bufferevent_remove_from_rate_limit_group_internal_(bufev,0);
- if (event_initialized(&bufev_private->rate_limiting->refill_bucket_event))
- event_del(&bufev_private->rate_limiting->refill_bucket_event);
- event_debug_unassign(&bufev_private->rate_limiting->refill_bucket_event);
mm_free(bufev_private->rate_limiting);
bufev_private->rate_limiting = NULL;
}
- event_debug_unassign(&bufev->ev_read);
- event_debug_unassign(&bufev->ev_write);
BEV_UNLOCK(bufev);
+
if (bufev_private->own_lock)
EVTHREAD_FREE_LOCK(bufev_private->lock,
EVTHREAD_LOCKTYPE_RECURSIVE);
*/
if (underlying)
bufferevent_decref_(underlying);
-
- return 1;
}
int
void
bufferevent_init_generic_timeout_cbs_(struct bufferevent *bev)
{
- evtimer_assign(&bev->ev_read, bev->ev_base,
+ event_assign(&bev->ev_read, bev->ev_base, -1, EV_FINALIZE,
bufferevent_generic_read_timeout_cb, bev);
- evtimer_assign(&bev->ev_write, bev->ev_base,
+ event_assign(&bev->ev_write, bev->ev_base, -1, EV_FINALIZE,
bufferevent_generic_write_timeout_cb, bev);
}
evutil_offsetof(struct bufferevent_async, bev.bev),
be_async_enable,
be_async_disable,
+ NULL, /* Unlink */
be_async_destruct,
bufferevent_generic_adj_timeouts_,
be_async_flush,
/* XXXX possible double-close */
evutil_closesocket(fd);
}
- /* delete this in case non-blocking connect was used */
- if (event_initialized(&bev->ev_write)) {
- event_del(&bev->ev_write);
- bufferevent_del_generic_timeout_cbs_(bev);
- }
}
/* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so
/* prototypes */
static int be_filter_enable(struct bufferevent *, short);
static int be_filter_disable(struct bufferevent *, short);
+static void be_filter_unlink(struct bufferevent *);
static void be_filter_destruct(struct bufferevent *);
static void be_filter_readcb(struct bufferevent *, void *);
evutil_offsetof(struct bufferevent_filtered, bev.bev),
be_filter_enable,
be_filter_disable,
+ be_filter_unlink,
be_filter_destruct,
bufferevent_generic_adj_timeouts_,
be_filter_flush,
}
static void
-be_filter_destruct(struct bufferevent *bev)
+be_filter_unlink(struct bufferevent *bev)
{
struct bufferevent_filtered *bevf = upcast(bev);
EVUTIL_ASSERT(bevf);
- if (bevf->free_context)
- bevf->free_context(bevf->context);
if (bevf->bev.options & BEV_OPT_CLOSE_ON_FREE) {
/* Yes, there is also a decref in bufferevent_decref_.
BEV_SUSPEND_FILT_READ);
}
}
+}
- bufferevent_del_generic_timeout_cbs_(bev);
+static void
+be_filter_destruct(struct bufferevent *bev)
+{
+ struct bufferevent_filtered *bevf = upcast(bev);
+ EVUTIL_ASSERT(bevf);
+ if (bevf->free_context)
+ bevf->free_context(bevf->context);
}
static int
static int be_openssl_enable(struct bufferevent *, short);
static int be_openssl_disable(struct bufferevent *, short);
+static void be_openssl_unlink(struct bufferevent *);
static void be_openssl_destruct(struct bufferevent *);
static int be_openssl_adj_timeouts(struct bufferevent *);
static int be_openssl_flush(struct bufferevent *bufev,
evutil_offsetof(struct bufferevent_openssl, bev.bev),
be_openssl_enable,
be_openssl_disable,
+ be_openssl_unlink,
be_openssl_destruct,
be_openssl_adj_timeouts,
be_openssl_flush,
event_del(&bev->ev_write);
}
event_assign(&bev->ev_read, bev->ev_base, fd,
- EV_READ|EV_PERSIST, be_openssl_readeventcb, bev_ssl);
+ EV_READ|EV_PERSIST|EV_FINALIZE,
+ be_openssl_readeventcb, bev_ssl);
event_assign(&bev->ev_write, bev->ev_base, fd,
- EV_WRITE|EV_PERSIST, be_openssl_writeeventcb, bev_ssl);
+ EV_WRITE|EV_PERSIST|EV_FINALIZE,
+ be_openssl_writeeventcb, bev_ssl);
if (rpending)
r1 = bufferevent_add_event_(&bev->ev_read, &bev->timeout_read);
if (wpending)
event_del(&bev->ev_write);
}
event_assign(&bev->ev_read, bev->ev_base, fd,
- EV_READ|EV_PERSIST, be_openssl_handshakeeventcb, bev_ssl);
+ EV_READ|EV_PERSIST|EV_FINALIZE,
+ be_openssl_handshakeeventcb, bev_ssl);
event_assign(&bev->ev_write, bev->ev_base, fd,
- EV_WRITE|EV_PERSIST, be_openssl_handshakeeventcb, bev_ssl);
+ EV_WRITE|EV_PERSIST|EV_FINALIZE,
+ be_openssl_handshakeeventcb, bev_ssl);
if (fd >= 0) {
r1 = bufferevent_add_event_(&bev->ev_read, &bev->timeout_read);
r2 = bufferevent_add_event_(&bev->ev_write, &bev->timeout_write);
}
static void
-be_openssl_destruct(struct bufferevent *bev)
+be_openssl_unlink(struct bufferevent *bev)
{
struct bufferevent_openssl *bev_ssl = upcast(bev);
- if (bev_ssl->underlying) {
- bufferevent_del_generic_timeout_cbs_(bev);
- } else {
- event_del(&bev->ev_read);
- event_del(&bev->ev_write);
- }
-
if (bev_ssl->bev.options & BEV_OPT_CLOSE_ON_FREE) {
if (bev_ssl->underlying) {
if (BEV_UPCAST(bev_ssl->underlying)->refcnt < 2) {
"bufferevent with too few references");
} else {
bufferevent_free(bev_ssl->underlying);
- bev_ssl->underlying = NULL;
+ /* We still have a reference to it, since DOCUMENT. So we don't
+ * drop this. */
+ // bev_ssl->underlying = NULL;
}
- } else {
- evutil_socket_t fd = -1;
- BIO *bio = SSL_get_wbio(bev_ssl->ssl);
- if (bio)
- fd = BIO_get_fd(bio, NULL);
- if (fd >= 0)
- evutil_closesocket(fd);
}
- SSL_free(bev_ssl->ssl);
} else {
if (bev_ssl->underlying) {
if (bev_ssl->underlying->errorcb == be_openssl_eventcb)
}
}
+static void
+be_openssl_destruct(struct bufferevent *bev)
+{
+ struct bufferevent_openssl *bev_ssl = upcast(bev);
+
+ if (bev_ssl->bev.options & BEV_OPT_CLOSE_ON_FREE) {
+ if (! bev_ssl->underlying) {
+ evutil_socket_t fd = -1;
+ BIO *bio = SSL_get_wbio(bev_ssl->ssl);
+ if (bio)
+ fd = BIO_get_fd(bio, NULL);
+ if (fd >= 0)
+ evutil_closesocket(fd);
+ }
+ SSL_free(bev_ssl->ssl);
+ }
+}
+
static int
be_openssl_adj_timeouts(struct bufferevent *bev)
{
}
static void
-be_pair_destruct(struct bufferevent *bev)
+be_pair_unlink(struct bufferevent *bev)
{
struct bufferevent_pair *bev_p = upcast(bev);
bev_p->partner->partner = NULL;
bev_p->partner = NULL;
}
-
- bufferevent_del_generic_timeout_cbs_(bev);
}
static int
evutil_offsetof(struct bufferevent_pair, bev.bev),
be_pair_enable,
be_pair_disable,
- be_pair_destruct,
+ be_pair_unlink,
+ NULL, /* be_pair_destruct, */
bufferevent_generic_adj_timeouts_,
be_pair_flush,
NULL, /* ctrl */
EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
event_del(&rlim->refill_bucket_event);
}
- evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
- bev_refill_callback_, bevp);
+ event_assign(&rlim->refill_bucket_event, bev->ev_base,
+ -1, EV_FINALIZE, bev_refill_callback_, bevp);
if (rlim->limit.read_limit > 0) {
bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
- event_assign(&g->master_refill_event, base, -1, EV_PERSIST,
+ event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
bev_group_refill_callback_, g);
/*XXXX handle event_add failure */
event_add(&g->master_refill_event, &cfg->tick_timeout);
BEV_UNLOCK(bev);
return -1;
}
- evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
- bev_refill_callback_, bevp);
+ event_assign(&rlim->refill_bucket_event, bev->ev_base,
+ -1, EV_FINALIZE, bev_refill_callback_, bevp);
bevp->rate_limiting = rlim;
}
evutil_offsetof(struct bufferevent_private, bev),
be_socket_enable,
be_socket_disable,
+ NULL, /* unlink */
be_socket_destruct,
be_socket_adj_timeouts,
be_socket_flush,
evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD);
event_assign(&bufev->ev_read, bufev->ev_base, fd,
- EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
+ EV_READ|EV_PERSIST|EV_FINALIZE, bufferevent_readcb, bufev);
event_assign(&bufev->ev_write, bufev->ev_base, fd,
- EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);
+ EV_WRITE|EV_PERSIST|EV_FINALIZE, bufferevent_writecb, bufev);
evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);
* on a non-blocking connect() when ConnectEx() is unavailable. */
if (BEV_IS_ASYNC(bev)) {
event_assign(&bev->ev_write, bev->ev_base, fd,
- EV_WRITE|EV_PERSIST, bufferevent_writecb, bev);
+ EV_WRITE|EV_PERSIST|EV_FINALIZE, bufferevent_writecb, bev);
}
#endif
bufferevent_setfd(bev, fd);
fd = event_get_fd(&bufev->ev_read);
- event_del(&bufev->ev_read);
- event_del(&bufev->ev_write);
-
if ((bufev_p->options & BEV_OPT_CLOSE_ON_FREE) && fd >= 0)
EVUTIL_CLOSESOCKET(fd);
}
event_del(&bufev->ev_write);
event_assign(&bufev->ev_read, bufev->ev_base, fd,
- EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
+ EV_READ|EV_PERSIST|EV_FINALIZE, bufferevent_readcb, bufev);
event_assign(&bufev->ev_write, bufev->ev_base, fd,
- EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);
+ EV_WRITE|EV_PERSIST|EV_FINALIZE, bufferevent_writecb, bufev);
if (fd >= 0)
bufferevent_enable(bufev, bufev->enabled);
void evbuffer_invoke_callbacks_(struct evbuffer *buf);
+
+int evbuffer_get_callbacks_(struct evbuffer *buffer,
+ struct event_callback **cbs,
+ int max_cbs);
+
#ifdef __cplusplus
}
#endif