int bufferevent_enable_locking(struct bufferevent *bufev, void *lock);
/** Internal: Increment the reference count on bufev. */
void bufferevent_incref(struct bufferevent *bufev);
+/** Internal: Lock bufev and increase its reference count.
+ * unlocking it otherwise. */
+void _bufferevent_incref_and_lock(struct bufferevent *bufev);
/** Internal: Drop the reference count on bufev, freeing as necessary, and
* unlocking it otherwise. */
void _bufferevent_decref_and_unlock(struct bufferevent *bufev);
void
_bufferevent_run_readcb(struct bufferevent *bufev)
{
- /* Requires lock. */
+ /* Requires that we hold the lock and a reference */
struct bufferevent_private *p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
if (p->options & BEV_OPT_DEFER_CALLBACKS) {
void
_bufferevent_run_writecb(struct bufferevent *bufev)
{
- /* Requires lock. */
+ /* Requires that we hold the lock and a reference */
struct bufferevent_private *p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
if (p->options & BEV_OPT_DEFER_CALLBACKS) {
void
_bufferevent_run_eventcb(struct bufferevent *bufev, short what)
{
- /* Requires lock. */
+ /* Requires that we hold the lock and a reference */
struct bufferevent_private *p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
if (p->options & BEV_OPT_DEFER_CALLBACKS) {
return r;
}
+void
+_bufferevent_incref_and_lock(struct bufferevent *bufev)
+{
+ struct bufferevent_private *bufev_private =
+ BEV_UPCAST(bufev);
+ BEV_LOCK(bufev);
+ ++bufev_private->refcnt;
+}
+
void
_bufferevent_decref_and_unlock(struct bufferevent *bufev)
{
bufferevent_generic_read_timeout_cb(evutil_socket_t fd, short event, void *ctx)
{
struct bufferevent *bev = ctx;
- BEV_LOCK(bev);
+ _bufferevent_incref_and_lock(bev);
_bufferevent_run_eventcb(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_READING);
- BEV_UNLOCK(bev);
+ _bufferevent_decref_and_unlock(bev);
}
static void
bufferevent_generic_write_timeout_cb(evutil_socket_t fd, short event, void *ctx)
{
struct bufferevent *bev = ctx;
- BEV_LOCK(bev);
+ _bufferevent_incref_and_lock(bev);
_bufferevent_run_eventcb(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING);
- BEV_UNLOCK(bev);
+ _bufferevent_decref_and_unlock(bev);
}
void
/* If we successfully wrote from the outbuf, or we added data to the
* outbuf and were not writing before, we may want to write now. */
- BEV_LOCK(bev);
+ _bufferevent_incref_and_lock(bev);
if (cbinfo->n_deleted) {
/* XXXX can't detect 0-length write completion */
bev_async->write_in_progress = 0;
_bufferevent_run_writecb(bev);
}
- BEV_UNLOCK(bev);
+ _bufferevent_decref_and_unlock(bev);
}
static void
/* If we successfully read into the inbuf, or we drained data from
* the inbuf and were not reading before, we may want to read now */
- BEV_LOCK(bev);
+ _bufferevent_incref_and_lock(bev);
if (cbinfo->n_added) {
/* XXXX can't detect 0-length read completion */
bev_async->read_in_progress = 0;
_bufferevent_run_readcb(bev);
}
- BEV_UNLOCK(bev);
+ _bufferevent_decref_and_unlock(bev);
}
static int
enum bufferevent_flush_mode state,
int *processed_out)
{
+ /* Requires references and lock: might call writecb */
enum bufferevent_filter_result res = BEV_OK;
struct bufferevent *bufev = downcast(bevf);
int again = 0;
const struct evbuffer_cb_info *cbinfo, void *arg)
{
struct bufferevent_filtered *bevf = arg;
+ struct bufferevent *bev = downcast(bevf);
if (cbinfo->n_added) {
int processed_any = 0;
/* Somebody added more data to the output buffer. Try to
* process it, if we should. */
+ _bufferevent_incref_and_lock(bev);
be_filter_process_output(bevf, BEV_NORMAL, &processed_any);
+ _bufferevent_decref_and_unlock(bev);
}
}
struct bufferevent *bufev = downcast(bevf);
int processed_any = 0;
+ _bufferevent_incref_and_lock(bufev);
+
if (bevf->got_eof)
state = BEV_FINISHED;
else
res = be_filter_process_input(bevf, state, &processed_any);
+ /* XXX This should be in process_input, not here. There are
+ * other places that can call process-input, and they should
+ * force readcb calls as needed. */
if (processed_any &&
evbuffer_get_length(bufev->input) >= bufev->wm_read.low &&
bufev->readcb != NULL)
_bufferevent_run_readcb(bufev);
+
+ _bufferevent_decref_and_unlock(bufev);
}
/* Called when the underlying socket has drained enough that we can write to
be_filter_writecb(struct bufferevent *underlying, void *_me)
{
struct bufferevent_filtered *bevf = _me;
+ struct bufferevent *bev = downcast(bevf);
int processed_any = 0;
+ _bufferevent_incref_and_lock(bev);
be_filter_process_output(bevf, BEV_NORMAL, &processed_any);
+ _bufferevent_decref_and_unlock(bev);
}
/* Called when the underlying socket has given us an error */
struct bufferevent_filtered *bevf = _me;
struct bufferevent *bev = downcast(bevf);
+ _bufferevent_incref_and_lock(bev);
/* All we can really to is tell our own eventcb. */
if (bev->errorcb)
_bufferevent_run_eventcb(bev, what);
+ _bufferevent_decref_and_unlock(bev);
}
static int
int processed_any = 0;
assert(bevf);
+ _bufferevent_incref_and_lock(bufev);
+
if (iotype & EV_READ) {
be_filter_process_input(bevf, mode, &processed_any);
}
/* XXX does this want to recursively call lower-level flushes? */
bufferevent_flush(bevf->underlying, iotype, mode);
+ _bufferevent_decref_and_unlock(bufev);
+
return processed_any;
}
#define downcast(bev_pair) (&(bev_pair)->bev.bev)
+static inline void
+incref_and_lock(struct bufferevent *b)
+{
+ struct bufferevent_pair *bevp;
+ _bufferevent_incref_and_lock(b);
+ bevp = upcast(b);
+ if (bevp->partner)
+ _bufferevent_incref_and_lock(downcast(bevp->partner));
+}
+
+static inline void
+decref_and_unlock(struct bufferevent *b)
+{
+ struct bufferevent_pair *bevp = upcast(b);
+ if (bevp->partner)
+ _bufferevent_decref_and_unlock(downcast(bevp->partner));
+ _bufferevent_decref_and_unlock(b);
+}
+
/* XXX Handle close */
static void be_pair_outbuf_cb(struct evbuffer *,
struct bufferevent_pair *bev_pair = arg;
struct bufferevent_pair *partner = bev_pair->partner;
+ incref_and_lock(downcast(bev_pair));
+
if (info->n_added > info->n_deleted && partner) {
/* We got more data. If the other side's reading, then
hand it over. */
be_pair_transfer(downcast(bev_pair), downcast(partner), 0);
}
}
+
+ decref_and_unlock(downcast(bev_pair));
}
static int
struct bufferevent_pair *bev_p = upcast(bufev);
struct bufferevent_pair *partner = bev_p->partner;
+ incref_and_lock(bufev);
+
_bufferevent_generic_adj_timeouts(bufev);
/* We're starting to read! Does the other side have anything to write?*/
be_pair_wants_to_talk(bev_p, partner)) {
be_pair_transfer(bufev, downcast(partner), 0);
}
+ decref_and_unlock(bufev);
return 0;
}
{
struct bufferevent_pair *bev_p = upcast(bev);
struct bufferevent *partner;
+ incref_and_lock(bev);
if (!bev_p->partner)
return -1;
if (partner->errorcb)
_bufferevent_run_eventcb(partner, iotype|BEV_EVENT_EOF);
}
+ decref_and_unlock(bev);
return 0;
}
short what = BEV_EVENT_READING;
int howmuch = -1;
- BEV_LOCK(arg);
+ _bufferevent_incref_and_lock(bufev);
if (event == EV_TIMEOUT) {
what |= BEV_EVENT_TIMEOUT;
_bufferevent_run_eventcb(bufev, what);
done:
- BEV_UNLOCK(bufev);
+ _bufferevent_decref_and_unlock(bufev);
}
static void
int res = 0;
short what = BEV_EVENT_WRITING;
- BEV_LOCK(bufev);
+ _bufferevent_incref_and_lock(bufev);
if (event == EV_TIMEOUT) {
what |= BEV_EVENT_TIMEOUT;
_bufferevent_run_eventcb(bufev, what);
done:
- BEV_UNLOCK(bufev);
+ _bufferevent_decref_and_unlock(bufev);
}
struct bufferevent *
int family = sa->sa_family;
evutil_socket_t fd;
int made_socket = 0;
+ int result = -1;
+
+ _bufferevent_incref_and_lock(bev);
if (!bufev_p)
- return -1;
+ goto done;
fd = event_get_fd(&bev->ev_read);
if (fd < 0) {
made_socket = 1;
if ((fd = socket(family, SOCK_STREAM, 0)) < 0)
- return -1;
+ goto done;
if (evutil_make_socket_nonblocking(fd) < 0) {
EVUTIL_CLOSESOCKET(fd);
- return -1;
+ goto done;
}
be_socket_setfd(bev, fd);
}
if (EVUTIL_ERR_CONNECT_RETRIABLE(e)) {
if (! be_socket_enable(bev, EV_WRITE)) {
bufev_p->connecting = 1;
- return 0;
+ result = 0;
+ goto done;
}
}
_bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
_bufferevent_run_eventcb(bev, BEV_EVENT_CONNECTED);
}
- return 0;
+ result = 0;
+done:
+ _bufferevent_decref_and_unlock(bev);
+ return result;
}
/*