if (evutil_timerisset(&(bev)->timeout_write)) \
event_add(&(bev)->ev_write, &(bev)->timeout_write); \
} while (0)
+#define BEV_DEL_GENERIC_READ_TIMEOUT(bev) \
+ event_del(&(bev)->ev_read)
+#define BEV_DEL_GENERIC_WRITE_TIMEOUT(bev) \
+ event_del(&(bev)->ev_write)
+
/** Internal: Given a bufferevent, return its corresponding
* bufferevent_private. */
{
struct bufferevent *bev = ctx;
_bufferevent_incref_and_lock(bev);
+ bufferevent_disable(bev, EV_READ);
_bufferevent_run_eventcb(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_READING);
_bufferevent_decref_and_unlock(bev);
}
{
struct bufferevent *bev = ctx;
_bufferevent_incref_and_lock(bev);
+ bufferevent_disable(bev, EV_WRITE);
_bufferevent_run_eventcb(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING);
_bufferevent_decref_and_unlock(bev);
}
_bufferevent_generic_adj_timeouts(struct bufferevent *bev)
{
const short enabled = bev->enabled;
+ struct bufferevent_private *bev_p =
+ EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
int r1=0, r2=0;
- if ((enabled & EV_READ) && evutil_timerisset(&bev->timeout_read))
+ if ((enabled & EV_READ) && !bev_p->read_suspended &&
+ evutil_timerisset(&bev->timeout_read))
r1 = event_add(&bev->ev_read, &bev->timeout_read);
else
r1 = event_del(&bev->ev_read);
- if ((enabled & EV_WRITE) && evutil_timerisset(&bev->timeout_write))
+ if ((enabled & EV_WRITE) && !bev_p->write_suspended &&
+ evutil_timerisset(&bev->timeout_write) &&
+ evbuffer_get_length(bev->output))
r2 = event_add(&bev->ev_write, &bev->timeout_write);
else
r2 = event_del(&bev->ev_write);
return -1;
/* NOTE: This interferes with non-blocking connect */
- if (_bufferevent_generic_adj_timeouts(buf) < 0)
- return -1;
+ if (event & EV_READ)
+ BEV_RESET_GENERIC_READ_TIMEOUT(bev);
+ if (event & EV_WRITE)
+ BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
/* If we newly enable reading or writing, and we aren't reading or
writing already, consider launching a new read or write. */
* canceling any in-progress read or write operation, though it might
* not work. */
- _bufferevent_generic_adj_timeouts(bev);
+ if (event & EV_READ)
+ BEV_DEL_GENERIC_READ_TIMEOUT(bev);
+ if (event & EV_WRITE)
+ BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
return 0;
}
be_filter_enable(struct bufferevent *bev, short event)
{
struct bufferevent_filtered *bevf = upcast(bev);
- _bufferevent_generic_adj_timeouts(bev);
+ if (event & EV_READ)
+ BEV_RESET_GENERIC_READ_TIMEOUT(bev);
+ if (event & EV_WRITE)
+ BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
return bufferevent_enable(bevf->underlying, event);
}
be_filter_disable(struct bufferevent *bev, short event)
{
struct bufferevent_filtered *bevf = upcast(bev);
- _bufferevent_generic_adj_timeouts(bev);
+ if (event & EV_READ)
+ BEV_DEL_GENERIC_READ_TIMEOUT(bev);
+ if (event & EV_WRITE)
+ BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
return bufferevent_disable(bevf->underlying, event);
}
r2 = start_writing(bev_ssl);
if (bev_ssl->underlying) {
- _bufferevent_generic_adj_timeouts(bev);
+ if (events & EV_READ)
+ BEV_RESET_GENERIC_READ_TIMEOUT(bev);
+ if (events & EV_WRITE)
+ BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
if (events & EV_READ)
consider_reading(bev_ssl);
if (events & EV_WRITE)
stop_writing(bev_ssl);
- if (bev_ssl->underlying)
- _bufferevent_generic_adj_timeouts(bev);
+ if (bev_ssl->underlying) {
+ if (events & EV_READ)
+ BEV_DEL_GENERIC_READ_TIMEOUT(bev);
+ if (events & EV_WRITE)
+ BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
+ }
return 0;
}
} else {
if (!ignore_wm)
goto done;
+ n = evbuffer_get_length(src->output);
evbuffer_add_buffer(dst->input, src->output);
}
} else {
+ n = evbuffer_get_length(src->output);
evbuffer_add_buffer(dst->input, src->output);
}
- BEV_RESET_GENERIC_READ_TIMEOUT(dst);
- BEV_RESET_GENERIC_WRITE_TIMEOUT(dst);
+ if (n) {
+ BEV_RESET_GENERIC_READ_TIMEOUT(dst);
+
+ if (evbuffer_get_length(dst->output))
+ BEV_RESET_GENERIC_WRITE_TIMEOUT(dst);
+ else
+ BEV_DEL_GENERIC_WRITE_TIMEOUT(dst);
+ }
src_size = evbuffer_get_length(src->output);
dst_size = evbuffer_get_length(dst->input);
incref_and_lock(bufev);
- if (_bufferevent_generic_adj_timeouts(bufev) < 0)
- return -1;
+ if (events & EV_READ) {
+ BEV_RESET_GENERIC_READ_TIMEOUT(bufev);
+ }
+ if ((events & EV_WRITE) && evbuffer_get_length(bufev->output))
+ BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev);
/* We're starting to read! Does the other side have anything to write?*/
if ((events & EV_READ) && partner &&
static int
be_pair_disable(struct bufferevent *bev, short events)
{
- return _bufferevent_generic_adj_timeouts(bev);
+ if (events & EV_READ) {
+ BEV_DEL_GENERIC_READ_TIMEOUT(bev);
+ }
+ if (events & EV_WRITE)
+ BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
+ return 0;
}
static void
void *arg)
{
struct bufferevent *bufev = arg;
+ struct bufferevent_private *bufev_p =
+ EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
if (cbinfo->n_added &&
(bufev->enabled & EV_WRITE) &&
- !event_pending(&bufev->ev_write, EV_WRITE, NULL)) {
+ !event_pending(&bufev->ev_write, EV_WRITE, NULL) &&
+ !bufev_p->write_suspended) {
/* Somebody added data to the buffer, and we would like to
* write, and we were not writing. So, start writing. */
be_socket_add(&bufev->ev_write, &bufev->timeout_write);
goto done;
error:
- event_del(&bufev->ev_read);
+ bufferevent_disable(bufev, EV_READ);
_bufferevent_run_eventcb(bufev, what);
done:
_bufferevent_decrement_write_buckets(bufev_p, res);
}
- if (evbuffer_get_length(bufev->output) == 0)
+ if (evbuffer_get_length(bufev->output) == 0) {
event_del(&bufev->ev_write);
+ }
/*
* Invoke the user callback if our buffer is drained or below the
goto done;
reschedule:
- if (evbuffer_get_length(bufev->output) == 0)
+ if (evbuffer_get_length(bufev->output) == 0) {
event_del(&bufev->ev_write);
+ }
goto done;
error:
- event_del(&bufev->ev_write);
+ bufferevent_disable(bufev, EV_WRITE);
_bufferevent_run_eventcb(bufev, what);
done:
if (event_pending(&bufev->ev_read, EV_READ, NULL))
if (be_socket_add(&bufev->ev_read, &bufev->timeout_read) < 0)
r = -1;
- if (event_pending(&bufev->ev_write, EV_WRITE, NULL))
+ if (event_pending(&bufev->ev_write, EV_WRITE, NULL)) {
if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) < 0)
r = -1;
+ }
return r;
}
/**
Set the read and write timeout for a buffered event.
+ A bufferevent's timeout will fire the first time that the indicated
+ amount of time has elapsed since a successful read or write operation,
+ during which the bufferevent was trying to read or write.
+
+ (In other words, if reading or writing is disabled, or if the
+ bufferevent's read or write operation has been suspended because
+ there's no data to write, or not enough banwidth, or so on, the
+ timeout isn't active. The timeout only becomes active when we we're
+ willing to actually read or write.)
+
+ Calling bufferevent_enable or setting a timeout for a bufferevent
+ whose timeout is already pending resets its timeout.
+
+ If the timeout elapses, the corresponding operation (EV_READ or
+ EV_WRITE) becomes disabled until you re-enable it again. The
+ bufferevent's event callback is called with the
+ BEV_EVENT_TIMEOUT|BEV_EVENT_READING or
+ BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING.
+
@param bufev the bufferevent to be modified
@param timeout_read the read timeout, or NULL
@param timeout_write the write timeout, or NULL
event_del(&close_listener_event);
}
+struct timeout_cb_result {
+ struct timeval read_timeout_at;
+ struct timeval write_timeout_at;
+ struct timeval last_wrote_at;
+ int n_read_timeouts;
+ int n_write_timeouts;
+ int total_calls;
+};
+
+static long
+msec_diff(const struct timeval *start, const struct timeval *end)
+{
+ long ms = end->tv_sec - start->tv_sec;
+ ms *= 1000;
+ ms += ((end->tv_usec - start->tv_usec)+500) / 1000;
+ return ms;
+}
+
+static void
+bev_timeout_write_cb(struct bufferevent *bev, void *arg)
+{
+ struct timeout_cb_result *res = arg;
+ evutil_gettimeofday(&res->last_wrote_at, NULL);
+}
+
+static void
+bev_timeout_event_cb(struct bufferevent *bev, short what, void *arg)
+{
+ struct timeout_cb_result *res = arg;
+ ++res->total_calls;
+
+ if ((what & (BEV_EVENT_READING|BEV_EVENT_TIMEOUT))
+ == (BEV_EVENT_READING|BEV_EVENT_TIMEOUT)) {
+ evutil_gettimeofday(&res->read_timeout_at, NULL);
+ ++res->n_read_timeouts;
+ }
+ if ((what & (BEV_EVENT_WRITING|BEV_EVENT_TIMEOUT))
+ == (BEV_EVENT_WRITING|BEV_EVENT_TIMEOUT)) {
+ evutil_gettimeofday(&res->write_timeout_at, NULL);
+ ++res->n_write_timeouts;
+ }
+}
+
+static void
+test_bufferevent_timeouts(void *arg)
+{
+ /* "arg" is a string containing "pair" and/or "nodata" */
+ struct bufferevent *bev1 = NULL, *bev2 = NULL;
+ struct basic_test_data *data = arg;
+ int use_pair = 0;
+ struct timeval tv_w, tv_r, started_at;
+ struct timeout_cb_result res1, res2;
+ char buf[1024];
+
+ memset(&res1, 0, sizeof(res1));
+ memset(&res2, 0, sizeof(res2));
+
+ if (strstr((char*)data->setup_data, "pair"))
+ use_pair = 1;
+
+ if (use_pair) {
+ struct bufferevent *p[2];
+ tt_int_op(0, ==, bufferevent_pair_new(data->base, 0, p));
+ bev1 = p[0];
+ bev2 = p[1];
+ } else {
+ bev1 = bufferevent_socket_new(data->base, data->pair[0], 0);
+ bev2 = bufferevent_socket_new(data->base, data->pair[1], 0);
+ }
+
+ /* Do this nice and early. */
+ bufferevent_disable(bev2, EV_READ);
+
+ /* bev1 will try to write and read. Both will time out. */
+ evutil_gettimeofday(&started_at, NULL);
+ tv_w.tv_sec = tv_r.tv_sec = 0;
+ tv_w.tv_usec = 100*1000;
+ tv_r.tv_usec = 150*1000;
+ bufferevent_setcb(bev1, NULL, bev_timeout_write_cb,
+ bev_timeout_event_cb, &res1);
+ bufferevent_setwatermark(bev1, EV_WRITE, 1024*1024+10, 0);
+ bufferevent_set_timeouts(bev1, &tv_r, &tv_w);
+ if (use_pair) {
+ /* For a pair, the fact that the other side isn't reading
+ * makes the writer stall */
+ bufferevent_write(bev1, "ABCDEFG", 7);
+ } else {
+ /* For a real socket, the kernel's TCP buffers can eat a
+ * fair number of bytes; make sure that at some point we
+ * have some bytes that will stall. */
+ struct evbuffer *output = bufferevent_get_output(bev1);
+ int i;
+ memset(buf, 0xbb, sizeof(buf));
+ for (i=0;i<1024;++i) {
+ evbuffer_add_reference(output, buf, sizeof(buf),
+ NULL, NULL);
+ }
+ }
+ bufferevent_enable(bev1, EV_READ|EV_WRITE);
+
+ /* bev2 has nothing to say, and isn't listening. */
+ bufferevent_setcb(bev2, NULL, bev_timeout_write_cb,
+ bev_timeout_event_cb, &res2);
+ tv_w.tv_sec = tv_r.tv_sec = 0;
+ tv_w.tv_usec = 200*1000;
+ tv_r.tv_usec = 100*1000;
+ bufferevent_set_timeouts(bev2, &tv_r, &tv_w);
+ bufferevent_enable(bev2, EV_WRITE);
+
+ tv_r.tv_sec = 1;
+ tv_r.tv_usec = 0;
+
+ event_base_loopexit(data->base, &tv_r);
+ event_base_dispatch(data->base);
+
+ /* XXXX Test that actually reading or writing a little resets the
+ * timeouts. */
+
+ /* Each buf1 timeout happens, and happens only once. */
+ tt_want(res1.n_read_timeouts);
+ tt_want(res1.n_write_timeouts);
+ tt_want(res1.n_read_timeouts == 1);
+ tt_want(res1.n_write_timeouts == 1);
+
+ tt_int_op(abs(msec_diff(&started_at, &res1.read_timeout_at)-150),
+ <=, 40);
+ tt_int_op(abs(msec_diff(&started_at, &res1.write_timeout_at)-100),
+ <=, 30);
+
+end:
+ if (bev1)
+ bufferevent_free(bev1);
+ if (bev2)
+ bufferevent_free(bev2);
+}
+
struct testcase_t bufferevent_testcases[] = {
LEGACY(bufferevent, TT_ISOLATED),
(void*)"defer lock" },
{ "bufferevent_connect_fail", test_bufferevent_connect_fail,
TT_FORK|TT_NEED_BASE, &basic_setup, NULL },
+ { "bufferevent_timeouts", test_bufferevent_timeouts,
+ TT_FORK|TT_NEED_BASE|TT_NEED_SOCKETPAIR, &basic_setup, (void*)"" },
+ { "bufferevent_pair_timeouts", test_bufferevent_timeouts,
+ TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"pair" },
#ifdef _EVENT_HAVE_LIBZ
LEGACY(bufferevent_zlib, TT_ISOLATED),
#else