void _bufferevent_run_writecb(struct bufferevent *bufev);
void _bufferevent_run_errorcb(struct bufferevent *bufev, short what);
+/* =========
+ * These next functions implement timeouts for bufferevents that aren't doing
+ * anything else with ev_read and ev_write, to handle timeouts.
+ * ========= */
+/** Internal use: Set up the ev_read and ev_write callbacks so that
+ * the other "generic_timeout" functions will work on it. Call this from
+ * the constuctor function. */
+void _bufferevent_init_generic_timeout_cbs(struct bufferevent *bev);
+/** Internal use: Delete the ev_read and ev_write callbacks if they're pending.
+ * Call thiss from the destructor function. */
+void _bufferevent_del_generic_timeout_cbs(struct bufferevent *bev);
+/** Internal use: Add or delete the generic timeout events as appropriate.
+ * (If an event is enabled and a timeout is set, we add the event. Otherwise
+ * we delete it.) Call this from anything that changes the timeout values,
+ * that enabled EV_READ or EV_WRITE, or that disables EV_READ or EV_WRITE. */
+void _bufferevent_generic_adj_timeouts(struct bufferevent *bev);
+
+/** Internal use: We have just successfully read data into an inbuf, so
+ * reset the read timout (if any). */
+#define BEV_RESET_GENERIC_READ_TIMEOUT(bev) \
+ do { \
+ if (evutil_timerisset(&(bev)->timeout_read)) \
+ event_add(&(bev)->ev_read, &(bev)->timeout_read); \
+ } while (0)
+/** Internal use: We have just successfully written data from an inbuf, so
+ * reset the read timout (if any). */
+#define BEV_RESET_GENERIC_WRITE_TIMEOUT(bev) \
+ do { \
+ if (evutil_timerisset(&(bev)->timeout_write)) \
+ event_add(&(bev)->ev_write, &(bev)->timeout_write); \
+ } while (0)
+
#define BEV_UPCAST(b) EVUTIL_UPCAST((b), struct bufferevent_private, bev)
#define BEV_LOCK(b) do { \
BEV_UNLOCK(bev);
return (res<0) ? NULL : d.ptr;
}
+
+static void
+bufferevent_generic_read_timeout_cb(evutil_socket_t fd, short event, void *ctx)
+{
+ struct bufferevent *bev = ctx;
+ _bufferevent_run_errorcb(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_READING);
+}
+static void
+bufferevent_generic_write_timeout_cb(evutil_socket_t fd, short event, void *ctx)
+{
+ struct bufferevent *bev = ctx;
+ _bufferevent_run_errorcb(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING);
+}
+
+void
+_bufferevent_init_generic_timeout_cbs(struct bufferevent *bev)
+{
+ evtimer_assign(&bev->ev_read, bev->ev_base,
+ bufferevent_generic_read_timeout_cb, bev);
+ evtimer_assign(&bev->ev_read, bev->ev_base,
+ bufferevent_generic_write_timeout_cb, bev);
+}
+
+void
+_bufferevent_del_generic_timeout_cbs(struct bufferevent *bev)
+{
+ event_del(&bev->ev_read);
+ event_del(&bev->ev_write);
+}
+
+void
+_bufferevent_generic_adj_timeouts(struct bufferevent *bev)
+{
+ const short enabled = bev->enabled;
+ if ((enabled & EV_READ) && evutil_timerisset(&bev->timeout_read))
+ event_add(&bev->ev_read, &bev->timeout_read);
+ else
+ event_del(&bev->ev_read);
+
+ if ((enabled & EV_WRITE) && evutil_timerisset(&bev->timeout_write))
+ event_add(&bev->ev_write, &bev->timeout_write);
+ else
+ event_del(&bev->ev_write);
+}
+
static int be_async_enable(struct bufferevent *, short);
static int be_async_disable(struct bufferevent *, short);
static void be_async_destruct(struct bufferevent *);
-static void be_async_adj_timeouts(struct bufferevent *);
static int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode);
static int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
be_async_enable,
be_async_disable,
be_async_destruct,
- be_async_adj_timeouts,
+ _bufferevent_generic_adj_timeouts,
be_async_flush,
be_async_ctrl,
};
if (cbinfo->n_added || cbinfo->n_deleted)
bev_async_consider_writing(bev_async);
- if (cbinfo->n_deleted &&
- bev->writecb != NULL &&
- evbuffer_get_length(bev->output) <= bev->wm_write.low)
- _bufferevent_run_writecb(bev);
+ if (cbinfo->n_deleted) {
+ BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
+
+ if (bev->writecb != NULL &&
+ evbuffer_get_length(bev->output) <= bev->wm_write.low)
+ _bufferevent_run_writecb(bev);
+ }
BEV_UNLOCK(bev);
}
if (cbinfo->n_added || cbinfo->n_deleted)
bev_async_consider_reading(bev_async);
- if (cbinfo->n_added &&
- evbuffer_get_length(bev->input) >= bev->wm_read.low &&
- bev->readcb != NULL)
- _bufferevent_run_readcb(bev);
+ if (cbinfo->n_added) {
+ BEV_RESET_GENERIC_READ_TIMEOUT(bev);
+
+ if (evbuffer_get_length(bev->input) >= bev->wm_read.low &&
+ bev->readcb != NULL)
+ _bufferevent_run_readcb(bev);
+ }
BEV_UNLOCK(bev);
}
{
struct bufferevent_async *bev_async = upcast(buf);
+ _bufferevent_generic_adj_timeouts(buf);
+
/* If we newly enable reading or writing, and we aren't reading or
writing already, consider launching a new read or write. */
/* XXXX If we disable reading or writing, we may want to consider
* canceling any in-progress read or write operation, though it might
* not work. */
+
+ _bufferevent_generic_adj_timeouts(bev);
+
return 0;
}
static void
be_async_destruct(struct bufferevent *bev)
{
+ _bufferevent_del_generic_timeout_cbs(bev);
}
-static void
-be_async_adj_timeouts(struct bufferevent *bev)
-{
-}
+
static int
be_async_flush(struct bufferevent *bev, short what,
enum bufferevent_flush_mode mode)
evbuffer_defer_callbacks(bev->input, base);
evbuffer_defer_callbacks(bev->output, base);
+ _bufferevent_init_generic_timeout_cbs(&bev_a->bev.bev);
+
return bev;
err:
bufferevent_free(&bev_a->bev.bev);
static int be_filter_enable(struct bufferevent *, short);
static int be_filter_disable(struct bufferevent *, short);
static void be_filter_destruct(struct bufferevent *);
-static void be_filter_adj_timeouts(struct bufferevent *);
static void be_filter_readcb(struct bufferevent *, void *);
static void be_filter_writecb(struct bufferevent *, void *);
be_filter_enable,
be_filter_disable,
be_filter_destruct,
- be_filter_adj_timeouts,
+ _bufferevent_generic_adj_timeouts,
be_filter_flush,
be_filter_ctrl,
};
bufev_f->outbuf_cb = evbuffer_add_cb(downcast(bufev_f)->output,
bufferevent_filtered_outbuf_cb, bufev_f);
+ _bufferevent_init_generic_timeout_cbs(downcast(bufev_f));
+
return downcast(bufev_f);
}
if (bevf->bev.options & BEV_OPT_CLOSE_ON_FREE)
bufferevent_free(bevf->underlying);
+
+ _bufferevent_del_generic_timeout_cbs(bev);
}
static int
be_filter_enable(struct bufferevent *bev, short event)
{
struct bufferevent_filtered *bevf = upcast(bev);
+ _bufferevent_generic_adj_timeouts(bev);
return bufferevent_enable(bevf->underlying, event);
}
be_filter_disable(struct bufferevent *bev, short event)
{
struct bufferevent_filtered *bevf = upcast(bev);
+ _bufferevent_generic_adj_timeouts(bev);
return bufferevent_disable(bevf->underlying, event);
}
-static void
-be_filter_adj_timeouts(struct bufferevent *bev)
-{
- struct bufferevent_filtered *bevf = upcast(bev);
- struct timeval *r = NULL, *w = NULL;
-
- if (bev->timeout_read.tv_sec >= 0)
- r = &bev->timeout_read;
- if (bev->timeout_write.tv_sec >= 0)
- w = &bev->timeout_write;
-
- bufferevent_set_timeouts(bevf->underlying, r, w);
-}
-
static enum bufferevent_filter_result
be_filter_process_input(struct bufferevent_filtered *bevf,
enum bufferevent_flush_mode state,
evbuffer_get_length(bevf->underlying->input) &&
!be_readbuf_full(bevf, state));
+ if (*processed_out)
+ BEV_RESET_GENERIC_READ_TIMEOUT(bev);
+
return res;
}
evbuffer_cb_set_flags(bufev->output,bevf->outbuf_cb,
EVBUFFER_CB_ENABLED);
+ if (*processed_out)
+ BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev);
+
return res;
}
res = be_filter_process_input(bevf, state, &processed_any);
if (processed_any &&
- evbuffer_get_length(bufev->input) >= bufev->wm_read.low &&
- bufev->readcb != NULL)
+ evbuffer_get_length(bufev->input) >= bufev->wm_read.low &&
+ bufev->readcb != NULL)
_bufferevent_run_readcb(bufev);
}
mm_free(bufev);
return NULL;
}
- /* XXX set read timeout event */
- /* XXX set write timeout event */
if (!evbuffer_add_cb(bufev->bev.bev.output, be_pair_outbuf_cb, bufev)) {
bufferevent_free(downcast(bufev));
return NULL;
}
+ _bufferevent_init_generic_timeout_cbs(&bufev->bev.bev);
+
return bufev;
}
evbuffer_add_buffer(dst->input, src->output);
}
+ BEV_RESET_GENERIC_READ_TIMEOUT(dst);
+ BEV_RESET_GENERIC_WRITE_TIMEOUT(dst);
+
src_size = evbuffer_get_length(src->output);
dst_size = evbuffer_get_length(dst->input);
struct bufferevent_pair *bev_p = upcast(bufev);
struct bufferevent_pair *partner = bev_p->partner;
+ _bufferevent_generic_adj_timeouts(bufev);
+
/* We're starting to read! Does the other side have anything to write?*/
if ((events & EV_READ) && partner &&
be_pair_wants_to_talk(partner, bev_p)) {
static int
be_pair_disable(struct bufferevent *bev, short events)
{
+ _bufferevent_generic_adj_timeouts(bev);
return 0;
}
bev_p->partner->partner = NULL;
bev_p->partner = NULL;
}
-}
-static void
-be_pair_adj_timeouts(struct bufferevent *bev)
-{
- /* TODO: implement. */
+ _bufferevent_del_generic_timeout_cbs(bev);
}
static int
be_pair_enable,
be_pair_disable,
be_pair_destruct,
- be_pair_adj_timeouts,
+ _bufferevent_generic_adj_timeouts,
be_pair_flush,
NULL, /* ctrl */
};