]> granicus.if.org Git - libevent/commitdiff
Provide consistent, tested semantics for bufferevent timeouts
authorNick Mathewson <nickm@torproject.org>
Sat, 20 Feb 2010 23:44:35 +0000 (18:44 -0500)
committerNick Mathewson <nickm@torproject.org>
Tue, 23 Feb 2010 20:20:33 +0000 (15:20 -0500)
The different bufferevent implementations had different behavior for
their timeouts.  Some of them kept re-triggering the timeouts
indefinitely; some disabled the event immediately the first time a
timeout triggered.  Some of them made the timeouts only count when
the bufferevent was actively trying to read or write; some did not.

The new behavior is modeled after old socket bufferevents, since
they were here first and their behavior is relatively sane.
Basically, each timeout disables the bufferevent's corresponding
read or write operation when it fires.  Timeouts are stopped
whenever we suspend writing or reading, and reset whenever we
unsuspend writing or reading.  Calling bufferevent_enable resets a
timeout, as does changing the timeout value.

bufferevent-internal.h
bufferevent.c
bufferevent_async.c
bufferevent_filter.c
bufferevent_openssl.c
bufferevent_pair.c
bufferevent_sock.c
include/event2/bufferevent.h
test/regress_bufferevent.c

index 7df5c23729c4a9421b75418125686ba14356ae44..81cadf1a65e4485ec31486c4b55b113cd4640991 100644 (file)
@@ -328,6 +328,11 @@ int _bufferevent_generic_adj_timeouts(struct bufferevent *bev);
                if (evutil_timerisset(&(bev)->timeout_write))           \
                        event_add(&(bev)->ev_write, &(bev)->timeout_write); \
        } while (0)
+#define BEV_DEL_GENERIC_READ_TIMEOUT(bev)      \
+               event_del(&(bev)->ev_read)
+#define BEV_DEL_GENERIC_WRITE_TIMEOUT(bev)     \
+               event_del(&(bev)->ev_write)
+
 
 /** Internal: Given a bufferevent, return its corresponding
  * bufferevent_private. */
index 3864cece8c58db4a10409c4e55d0ff983d9fec68..1a83ed28f96d567cf70e95897a62c070f7df35a7 100644 (file)
@@ -676,6 +676,7 @@ bufferevent_generic_read_timeout_cb(evutil_socket_t fd, short event, void *ctx)
 {
        struct bufferevent *bev = ctx;
        _bufferevent_incref_and_lock(bev);
+       bufferevent_disable(bev, EV_READ);
        _bufferevent_run_eventcb(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_READING);
        _bufferevent_decref_and_unlock(bev);
 }
@@ -684,6 +685,7 @@ bufferevent_generic_write_timeout_cb(evutil_socket_t fd, short event, void *ctx)
 {
        struct bufferevent *bev = ctx;
        _bufferevent_incref_and_lock(bev);
+       bufferevent_disable(bev, EV_WRITE);
        _bufferevent_run_eventcb(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING);
        _bufferevent_decref_and_unlock(bev);
 }
@@ -712,13 +714,18 @@ int
 _bufferevent_generic_adj_timeouts(struct bufferevent *bev)
 {
        const short enabled = bev->enabled;
+       struct bufferevent_private *bev_p =
+           EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
        int r1=0, r2=0;
-       if ((enabled & EV_READ) && evutil_timerisset(&bev->timeout_read))
+       if ((enabled & EV_READ) && !bev_p->read_suspended &&
+           evutil_timerisset(&bev->timeout_read))
                r1 = event_add(&bev->ev_read, &bev->timeout_read);
        else
                r1 = event_del(&bev->ev_read);
 
-       if ((enabled & EV_WRITE) && evutil_timerisset(&bev->timeout_write))
+       if ((enabled & EV_WRITE) && !bev_p->write_suspended &&
+           evutil_timerisset(&bev->timeout_write) &&
+           evbuffer_get_length(bev->output))
                r2 = event_add(&bev->ev_write, &bev->timeout_write);
        else
                r2 = event_del(&bev->ev_write);
index fc99696c670a4075304ad07189d665d489bfd9cc..7960001699222826a136b034d2ff35daf28c8bad 100644 (file)
@@ -242,8 +242,10 @@ be_async_enable(struct bufferevent *buf, short what)
                return -1;
 
        /* NOTE: This interferes with non-blocking connect */
-       if (_bufferevent_generic_adj_timeouts(buf) < 0)
-               return -1;
+       if (event & EV_READ)
+               BEV_RESET_GENERIC_READ_TIMEOUT(bev);
+       if (event & EV_WRITE)
+               BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
 
        /* If we newly enable reading or writing, and we aren't reading or
           writing already, consider launching a new read or write. */
@@ -262,7 +264,10 @@ be_async_disable(struct bufferevent *bev, short what)
         * canceling any in-progress read or write operation, though it might
         * not work. */
 
-       _bufferevent_generic_adj_timeouts(bev);
+       if (event & EV_READ)
+               BEV_DEL_GENERIC_READ_TIMEOUT(bev);
+       if (event & EV_WRITE)
+               BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
 
        return 0;
 }
index 16f17b5aacb2d325e1151ba559eebb827074350c..80d2a1bf9dd7628246d05bdfb6c83c184509846a 100644 (file)
@@ -222,7 +222,10 @@ static int
 be_filter_enable(struct bufferevent *bev, short event)
 {
        struct bufferevent_filtered *bevf = upcast(bev);
-       _bufferevent_generic_adj_timeouts(bev);
+       if (event & EV_READ)
+               BEV_RESET_GENERIC_READ_TIMEOUT(bev);
+       if (event & EV_WRITE)
+               BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
        return bufferevent_enable(bevf->underlying, event);
 }
 
@@ -230,7 +233,10 @@ static int
 be_filter_disable(struct bufferevent *bev, short event)
 {
        struct bufferevent_filtered *bevf = upcast(bev);
-       _bufferevent_generic_adj_timeouts(bev);
+       if (event & EV_READ)
+               BEV_DEL_GENERIC_READ_TIMEOUT(bev);
+       if (event & EV_WRITE)
+               BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
        return bufferevent_disable(bevf->underlying, event);
 }
 
index 550b9c3a7002c0a2f27fe639b37e07e944815bdf..9a99a308820a7942dbac56ca41faa400b3b15af4 100644 (file)
@@ -982,7 +982,10 @@ be_openssl_enable(struct bufferevent *bev, short events)
                r2 = start_writing(bev_ssl);
 
        if (bev_ssl->underlying) {
-               _bufferevent_generic_adj_timeouts(bev);
+               if (events & EV_READ)
+                       BEV_RESET_GENERIC_READ_TIMEOUT(bev);
+               if (events & EV_WRITE)
+                       BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
 
                if (events & EV_READ)
                        consider_reading(bev_ssl);
@@ -1004,8 +1007,12 @@ be_openssl_disable(struct bufferevent *bev, short events)
        if (events & EV_WRITE)
                stop_writing(bev_ssl);
 
-       if (bev_ssl->underlying)
-               _bufferevent_generic_adj_timeouts(bev);
+       if (bev_ssl->underlying) {
+               if (events & EV_READ)
+                       BEV_DEL_GENERIC_READ_TIMEOUT(bev);
+               if (events & EV_WRITE)
+                       BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
+       }
        return 0;
 }
 
index 32280a6a2b78e769bdea872998e8c1424e8eb8c1..1025e7cd7173fd8ca4124c80442dab9270e1dd35 100644 (file)
@@ -165,14 +165,22 @@ be_pair_transfer(struct bufferevent *src, struct bufferevent *dst,
                } else {
                        if (!ignore_wm)
                                goto done;
+                       n = evbuffer_get_length(src->output);
                        evbuffer_add_buffer(dst->input, src->output);
                }
        } else {
+               n = evbuffer_get_length(src->output);
                evbuffer_add_buffer(dst->input, src->output);
        }
 
-       BEV_RESET_GENERIC_READ_TIMEOUT(dst);
-       BEV_RESET_GENERIC_WRITE_TIMEOUT(dst);
+       if (n) {
+               BEV_RESET_GENERIC_READ_TIMEOUT(dst);
+
+               if (evbuffer_get_length(dst->output))
+                       BEV_RESET_GENERIC_WRITE_TIMEOUT(dst);
+               else
+                       BEV_DEL_GENERIC_WRITE_TIMEOUT(dst);
+       }
 
        src_size = evbuffer_get_length(src->output);
        dst_size = evbuffer_get_length(dst->input);
@@ -226,8 +234,11 @@ be_pair_enable(struct bufferevent *bufev, short events)
 
        incref_and_lock(bufev);
 
-       if (_bufferevent_generic_adj_timeouts(bufev) < 0)
-               return -1;
+       if (events & EV_READ) {
+               BEV_RESET_GENERIC_READ_TIMEOUT(bufev);
+       }
+       if ((events & EV_WRITE) && evbuffer_get_length(bufev->output))
+               BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev);
 
        /* We're starting to read! Does the other side have anything to write?*/
        if ((events & EV_READ) && partner &&
@@ -246,7 +257,12 @@ be_pair_enable(struct bufferevent *bufev, short events)
 static int
 be_pair_disable(struct bufferevent *bev, short events)
 {
-       return _bufferevent_generic_adj_timeouts(bev);
+       if (events & EV_READ) {
+               BEV_DEL_GENERIC_READ_TIMEOUT(bev);
+       }
+       if (events & EV_WRITE)
+               BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
+       return 0;
 }
 
 static void
index df048de91556d8831c717fe58611e671d51b88c2..2225a2a4b26a27ed5a4a784a48b1f7699d0b5960 100644 (file)
@@ -104,10 +104,13 @@ bufferevent_socket_outbuf_cb(struct evbuffer *buf,
     void *arg)
 {
        struct bufferevent *bufev = arg;
+       struct bufferevent_private *bufev_p =
+           EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
 
        if (cbinfo->n_added &&
            (bufev->enabled & EV_WRITE) &&
-           !event_pending(&bufev->ev_write, EV_WRITE, NULL)) {
+           !event_pending(&bufev->ev_write, EV_WRITE, NULL) &&
+           !bufev_p->write_suspended) {
                /* Somebody added data to the buffer, and we would like to
                 * write, and we were not writing.  So, start writing. */
                be_socket_add(&bufev->ev_write, &bufev->timeout_write);
@@ -184,7 +187,7 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
        goto done;
 
  error:
-       event_del(&bufev->ev_read);
+       bufferevent_disable(bufev, EV_READ);
        _bufferevent_run_eventcb(bufev, what);
 
  done:
@@ -268,8 +271,9 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
                _bufferevent_decrement_write_buckets(bufev_p, res);
        }
 
-       if (evbuffer_get_length(bufev->output) == 0)
+       if (evbuffer_get_length(bufev->output) == 0) {
                event_del(&bufev->ev_write);
+       }
 
        /*
         * Invoke the user callback if our buffer is drained or below the
@@ -283,12 +287,13 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
        goto done;
 
  reschedule:
-       if (evbuffer_get_length(bufev->output) == 0)
+       if (evbuffer_get_length(bufev->output) == 0) {
                event_del(&bufev->ev_write);
+       }
        goto done;
 
  error:
-       event_del(&bufev->ev_write);
+       bufferevent_disable(bufev, EV_WRITE);
        _bufferevent_run_eventcb(bufev, what);
 
  done:
@@ -553,9 +558,10 @@ be_socket_adj_timeouts(struct bufferevent *bufev)
        if (event_pending(&bufev->ev_read, EV_READ, NULL))
                if (be_socket_add(&bufev->ev_read, &bufev->timeout_read) < 0)
                        r = -1;
-       if (event_pending(&bufev->ev_write, EV_WRITE, NULL))
+       if (event_pending(&bufev->ev_write, EV_WRITE, NULL)) {
                if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) < 0)
                        r = -1;
+       }
        return r;
 }
 
index 18114c20d5868deee5b3244bcaa646c08bb3fd39..ca3fda7a7cb10e0b700c2b8debdaa42ed087bf05 100644 (file)
@@ -360,6 +360,25 @@ short bufferevent_get_enabled(struct bufferevent *bufev);
 /**
   Set the read and write timeout for a buffered event.
 
+  A bufferevent's timeout will fire the first time that the indicated
+  amount of time has elapsed since a successful read or write operation,
+  during which the bufferevent was trying to read or write.
+
+  (In other words, if reading or writing is disabled, or if the
+  bufferevent's read or write operation has been suspended because
+  there's no data to write, or not enough banwidth, or so on, the
+  timeout isn't active.  The timeout only becomes active when we we're
+  willing to actually read or write.)
+
+  Calling bufferevent_enable or setting a timeout for a bufferevent
+  whose timeout is already pending resets its timeout.
+
+  If the timeout elapses, the corresponding operation (EV_READ or
+  EV_WRITE) becomes disabled until you re-enable it again.  The
+  bufferevent's event callback is called with the
+  BEV_EVENT_TIMEOUT|BEV_EVENT_READING or
+  BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING.
+
   @param bufev the bufferevent to be modified
   @param timeout_read the read timeout, or NULL
   @param timeout_write the write timeout, or NULL
index e2b21cd1ec2e5e99cb9f3555131f026a67c1e593..9f816971a9a61162f7a07efb9d484a8d8ef1e626 100644 (file)
@@ -613,6 +613,142 @@ end:
                event_del(&close_listener_event);
 }
 
+struct timeout_cb_result {
+       struct timeval read_timeout_at;
+       struct timeval write_timeout_at;
+       struct timeval last_wrote_at;
+       int n_read_timeouts;
+       int n_write_timeouts;
+       int total_calls;
+};
+
+static long
+msec_diff(const struct timeval *start, const struct timeval *end)
+{
+       long ms = end->tv_sec - start->tv_sec;
+       ms *= 1000;
+       ms += ((end->tv_usec - start->tv_usec)+500) / 1000;
+       return ms;
+}
+
+static void
+bev_timeout_write_cb(struct bufferevent *bev, void *arg)
+{
+       struct timeout_cb_result *res = arg;
+       evutil_gettimeofday(&res->last_wrote_at, NULL);
+}
+
+static void
+bev_timeout_event_cb(struct bufferevent *bev, short what, void *arg)
+{
+       struct timeout_cb_result *res = arg;
+       ++res->total_calls;
+
+       if ((what & (BEV_EVENT_READING|BEV_EVENT_TIMEOUT))
+           == (BEV_EVENT_READING|BEV_EVENT_TIMEOUT)) {
+               evutil_gettimeofday(&res->read_timeout_at, NULL);
+               ++res->n_read_timeouts;
+       }
+       if ((what & (BEV_EVENT_WRITING|BEV_EVENT_TIMEOUT))
+           == (BEV_EVENT_WRITING|BEV_EVENT_TIMEOUT)) {
+               evutil_gettimeofday(&res->write_timeout_at, NULL);
+               ++res->n_write_timeouts;
+       }
+}
+
+static void
+test_bufferevent_timeouts(void *arg)
+{
+       /* "arg" is a string containing "pair" and/or "nodata" */
+       struct bufferevent *bev1 = NULL, *bev2 = NULL;
+       struct basic_test_data *data = arg;
+       int use_pair = 0;
+       struct timeval tv_w, tv_r, started_at;
+       struct timeout_cb_result res1, res2;
+       char buf[1024];
+
+       memset(&res1, 0, sizeof(res1));
+       memset(&res2, 0, sizeof(res2));
+
+       if (strstr((char*)data->setup_data, "pair"))
+               use_pair = 1;
+
+       if (use_pair) {
+               struct bufferevent *p[2];
+               tt_int_op(0, ==, bufferevent_pair_new(data->base, 0, p));
+               bev1 = p[0];
+               bev2 = p[1];
+       } else {
+               bev1 = bufferevent_socket_new(data->base, data->pair[0], 0);
+               bev2 = bufferevent_socket_new(data->base, data->pair[1], 0);
+       }
+
+       /* Do this nice and early. */
+       bufferevent_disable(bev2, EV_READ);
+
+       /* bev1 will try to write and read.  Both will time out. */
+       evutil_gettimeofday(&started_at, NULL);
+       tv_w.tv_sec = tv_r.tv_sec = 0;
+       tv_w.tv_usec = 100*1000;
+       tv_r.tv_usec = 150*1000;
+       bufferevent_setcb(bev1, NULL, bev_timeout_write_cb,
+           bev_timeout_event_cb, &res1);
+       bufferevent_setwatermark(bev1, EV_WRITE, 1024*1024+10, 0);
+       bufferevent_set_timeouts(bev1, &tv_r, &tv_w);
+       if (use_pair) {
+               /* For a pair, the fact that the other side isn't reading
+                * makes the writer stall */
+               bufferevent_write(bev1, "ABCDEFG", 7);
+       } else {
+               /* For a real socket, the kernel's TCP buffers can eat a
+                * fair number of bytes; make sure that at some point we
+                * have some bytes that will stall. */
+               struct evbuffer *output = bufferevent_get_output(bev1);
+               int i;
+               memset(buf, 0xbb, sizeof(buf));
+               for (i=0;i<1024;++i) {
+                       evbuffer_add_reference(output, buf, sizeof(buf),
+                           NULL, NULL);
+               }
+       }
+       bufferevent_enable(bev1, EV_READ|EV_WRITE);
+
+       /* bev2 has nothing to say, and isn't listening. */
+       bufferevent_setcb(bev2, NULL,  bev_timeout_write_cb,
+           bev_timeout_event_cb, &res2);
+       tv_w.tv_sec = tv_r.tv_sec = 0;
+       tv_w.tv_usec = 200*1000;
+       tv_r.tv_usec = 100*1000;
+       bufferevent_set_timeouts(bev2, &tv_r, &tv_w);
+       bufferevent_enable(bev2, EV_WRITE);
+
+       tv_r.tv_sec = 1;
+       tv_r.tv_usec = 0;
+
+       event_base_loopexit(data->base, &tv_r);
+       event_base_dispatch(data->base);
+
+       /* XXXX Test that actually reading or writing a little resets the
+        * timeouts. */
+
+       /* Each buf1 timeout happens, and happens only once. */
+       tt_want(res1.n_read_timeouts);
+       tt_want(res1.n_write_timeouts);
+       tt_want(res1.n_read_timeouts == 1);
+       tt_want(res1.n_write_timeouts == 1);
+
+       tt_int_op(abs(msec_diff(&started_at, &res1.read_timeout_at)-150),
+           <=, 40);
+       tt_int_op(abs(msec_diff(&started_at, &res1.write_timeout_at)-100),
+           <=, 30);
+
+end:
+       if (bev1)
+               bufferevent_free(bev1);
+       if (bev2)
+               bufferevent_free(bev2);
+}
+
 struct testcase_t bufferevent_testcases[] = {
 
        LEGACY(bufferevent, TT_ISOLATED),
@@ -632,6 +768,10 @@ struct testcase_t bufferevent_testcases[] = {
          (void*)"defer lock" },
        { "bufferevent_connect_fail", test_bufferevent_connect_fail,
          TT_FORK|TT_NEED_BASE, &basic_setup, NULL },
+       { "bufferevent_timeouts", test_bufferevent_timeouts,
+         TT_FORK|TT_NEED_BASE|TT_NEED_SOCKETPAIR, &basic_setup, (void*)"" },
+       { "bufferevent_pair_timeouts", test_bufferevent_timeouts,
+         TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"pair" },
 #ifdef _EVENT_HAVE_LIBZ
        LEGACY(bufferevent_zlib, TT_ISOLATED),
 #else