]> granicus.if.org Git - libevent/commitdiff
Add a generic mechanism to implement timeouts in bufferevents.
authorNick Mathewson <nickm@torproject.org>
Mon, 25 May 2009 23:10:23 +0000 (23:10 +0000)
committerNick Mathewson <nickm@torproject.org>
Mon, 25 May 2009 23:10:23 +0000 (23:10 +0000)
Paired and asynchronous bufferevents didn't do timeouts, and filtering
bufferevents gave them funny semantics.  Now they all should all work
in a way consistent with what socket bufferevents do now: a [read/write]
timeout triggers if [reading/writing] is enabled, and if the timeout is
set, and the right amount of time passes without any data getting
[added to the input buffer/drained from the output buffer].

svn:r1314

bufferevent-internal.h
bufferevent.c
bufferevent_async.c
bufferevent_filter.c
bufferevent_pair.c

index 7ea9217213f04f58892fe324d06bdcf5dca8642a..547d313ce207053ddcd54c7b1bc89d31d182d4ac 100644 (file)
@@ -139,6 +139,38 @@ void _bufferevent_run_readcb(struct bufferevent *bufev);
 void _bufferevent_run_writecb(struct bufferevent *bufev);
 void _bufferevent_run_errorcb(struct bufferevent *bufev, short what);
 
+/* =========
+ * These next functions implement timeouts for bufferevents that aren't doing
+ * anything else with ev_read and ev_write, to handle timeouts.
+ * ========= */
+/** Internal use: Set up the ev_read and ev_write callbacks so that
+ * the other "generic_timeout" functions will work on it.  Call this from
+ * the constuctor function. */
+void _bufferevent_init_generic_timeout_cbs(struct bufferevent *bev);
+/** Internal use: Delete the ev_read and ev_write callbacks if they're pending.
+ * Call thiss from the destructor function. */
+void _bufferevent_del_generic_timeout_cbs(struct bufferevent *bev);
+/** Internal use: Add or delete the generic timeout events as appropriate.
+ * (If an event is enabled and a timeout is set, we add the event.  Otherwise
+ * we delete it.)  Call this from anything that changes the timeout values,
+ * that enabled EV_READ or EV_WRITE, or that disables EV_READ or EV_WRITE. */
+void _bufferevent_generic_adj_timeouts(struct bufferevent *bev);
+
+/** Internal use: We have just successfully read data into an inbuf, so
+ * reset the read timout (if any). */
+#define BEV_RESET_GENERIC_READ_TIMEOUT(bev)                            \
+       do {                                                            \
+               if (evutil_timerisset(&(bev)->timeout_read))            \
+                       event_add(&(bev)->ev_read, &(bev)->timeout_read); \
+       } while (0)
+/** Internal use: We have just successfully written data from an inbuf, so
+ * reset the read timout (if any). */
+#define BEV_RESET_GENERIC_WRITE_TIMEOUT(bev)                           \
+       do {                                                            \
+               if (evutil_timerisset(&(bev)->timeout_write))           \
+                       event_add(&(bev)->ev_write, &(bev)->timeout_write); \
+       } while (0)
+
 #define BEV_UPCAST(b) EVUTIL_UPCAST((b), struct bufferevent_private, bev)
 
 #define BEV_LOCK(b) do {                                               \
index 81c5e02c332f33305cc21a77d5d776d745d0d182..94a22f6f74d8b6c8d8bd3e95a5b8da4690f39b7e 100644 (file)
@@ -569,3 +569,48 @@ bufferevent_get_underlying(struct bufferevent *bev)
        BEV_UNLOCK(bev);
        return (res<0) ? NULL : d.ptr;
 }
+
+static void
+bufferevent_generic_read_timeout_cb(evutil_socket_t fd, short event, void *ctx)
+{
+       struct bufferevent *bev = ctx;
+       _bufferevent_run_errorcb(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_READING);
+}
+static void
+bufferevent_generic_write_timeout_cb(evutil_socket_t fd, short event, void *ctx)
+{
+       struct bufferevent *bev = ctx;
+       _bufferevent_run_errorcb(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING);
+}
+
+void
+_bufferevent_init_generic_timeout_cbs(struct bufferevent *bev)
+{
+       evtimer_assign(&bev->ev_read, bev->ev_base,
+           bufferevent_generic_read_timeout_cb, bev);
+       evtimer_assign(&bev->ev_read, bev->ev_base,
+           bufferevent_generic_write_timeout_cb, bev);
+}
+
+void
+_bufferevent_del_generic_timeout_cbs(struct bufferevent *bev)
+{
+       event_del(&bev->ev_read);
+       event_del(&bev->ev_write);
+}
+
+void
+_bufferevent_generic_adj_timeouts(struct bufferevent *bev)
+{
+       const short enabled = bev->enabled;
+       if ((enabled & EV_READ) && evutil_timerisset(&bev->timeout_read))
+               event_add(&bev->ev_read, &bev->timeout_read);
+       else
+               event_del(&bev->ev_read);
+
+       if ((enabled & EV_WRITE) && evutil_timerisset(&bev->timeout_write))
+               event_add(&bev->ev_write, &bev->timeout_write);
+       else
+               event_del(&bev->ev_write);
+}
+
index 878a8a0fb2291eacbf645bd1efb147b8145e0a29..ea7ec29e3355926c456cac238c4239e53d46a438 100644 (file)
@@ -63,7 +63,6 @@
 static int be_async_enable(struct bufferevent *, short);
 static int be_async_disable(struct bufferevent *, short);
 static void be_async_destruct(struct bufferevent *);
-static void be_async_adj_timeouts(struct bufferevent *);
 static int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode);
 static int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
 
@@ -73,7 +72,7 @@ const struct bufferevent_ops bufferevent_ops_async = {
        be_async_enable,
        be_async_disable,
        be_async_destruct,
-       be_async_adj_timeouts,
+       _bufferevent_generic_adj_timeouts,
         be_async_flush,
         be_async_ctrl,
 };
@@ -162,10 +161,13 @@ be_async_outbuf_callback(struct evbuffer *buf,
        if (cbinfo->n_added || cbinfo->n_deleted)
                bev_async_consider_writing(bev_async);
 
-       if (cbinfo->n_deleted &&
-           bev->writecb != NULL &&
-           evbuffer_get_length(bev->output) <= bev->wm_write.low)
-               _bufferevent_run_writecb(bev);
+       if (cbinfo->n_deleted) {
+               BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
+
+               if (bev->writecb != NULL &&
+                   evbuffer_get_length(bev->output) <= bev->wm_write.low)
+                       _bufferevent_run_writecb(bev);
+       }
 
        BEV_UNLOCK(bev);
 }
@@ -190,10 +192,13 @@ be_async_inbuf_callback(struct evbuffer *buf,
        if (cbinfo->n_added || cbinfo->n_deleted)
                bev_async_consider_reading(bev_async);
 
-       if (cbinfo->n_added &&
-           evbuffer_get_length(bev->input) >= bev->wm_read.low &&
-            bev->readcb != NULL)
-               _bufferevent_run_readcb(bev);
+       if (cbinfo->n_added) {
+               BEV_RESET_GENERIC_READ_TIMEOUT(bev);
+
+               if (evbuffer_get_length(bev->input) >= bev->wm_read.low &&
+                   bev->readcb != NULL)
+                       _bufferevent_run_readcb(bev);
+       }
 
        BEV_UNLOCK(bev);
 }
@@ -203,6 +208,8 @@ be_async_enable(struct bufferevent *buf, short what)
 {
        struct bufferevent_async *bev_async = upcast(buf);
 
+       _bufferevent_generic_adj_timeouts(buf);
+
        /* If we newly enable reading or writing, and we aren't reading or
           writing already, consider launching a new read or write. */
 
@@ -219,17 +226,18 @@ be_async_disable(struct bufferevent *bev, short what)
        /* XXXX If we disable reading or writing, we may want to consider
         * canceling any in-progress read or write operation, though it might
         * not work. */
+
+       _bufferevent_generic_adj_timeouts(bev);
+
        return 0;
 }
 
 static void
 be_async_destruct(struct bufferevent *bev)
 {
+       _bufferevent_del_generic_timeout_cbs(bev);
 }
-static void
-be_async_adj_timeouts(struct bufferevent *bev)
-{
-}
+
 static int
 be_async_flush(struct bufferevent *bev, short what,
     enum bufferevent_flush_mode mode)
@@ -281,6 +289,8 @@ bufferevent_async_new(struct event_base *base,
        evbuffer_defer_callbacks(bev->input, base);
        evbuffer_defer_callbacks(bev->output, base);
 
+       _bufferevent_init_generic_timeout_cbs(&bev_a->bev.bev);
+
        return bev;
 err:
        bufferevent_free(&bev_a->bev.bev);
index 3b51e500b83b56615ab864ccc30b5fa098ab2dee..585b3c255dd700c09adcfafcf0a00d5bb9c74091 100644 (file)
@@ -63,7 +63,6 @@
 static int be_filter_enable(struct bufferevent *, short);
 static int be_filter_disable(struct bufferevent *, short);
 static void be_filter_destruct(struct bufferevent *);
-static void be_filter_adj_timeouts(struct bufferevent *);
 
 static void be_filter_readcb(struct bufferevent *, void *);
 static void be_filter_writecb(struct bufferevent *, void *);
@@ -103,7 +102,7 @@ const struct bufferevent_ops bufferevent_ops_filter = {
        be_filter_enable,
        be_filter_disable,
        be_filter_destruct,
-       be_filter_adj_timeouts,
+       _bufferevent_generic_adj_timeouts,
         be_filter_flush,
        be_filter_ctrl,
 };
@@ -208,6 +207,8 @@ bufferevent_filter_new(struct bufferevent *underlying,
        bufev_f->outbuf_cb = evbuffer_add_cb(downcast(bufev_f)->output,
           bufferevent_filtered_outbuf_cb, bufev_f);
 
+       _bufferevent_init_generic_timeout_cbs(downcast(bufev_f));
+
        return downcast(bufev_f);
 }
 
@@ -221,12 +222,15 @@ be_filter_destruct(struct bufferevent *bev)
 
        if (bevf->bev.options & BEV_OPT_CLOSE_ON_FREE)
                bufferevent_free(bevf->underlying);
+
+       _bufferevent_del_generic_timeout_cbs(bev);
 }
 
 static int
 be_filter_enable(struct bufferevent *bev, short event)
 {
        struct bufferevent_filtered *bevf = upcast(bev);
+       _bufferevent_generic_adj_timeouts(bev);
        return bufferevent_enable(bevf->underlying, event);
 }
 
@@ -234,23 +238,10 @@ static int
 be_filter_disable(struct bufferevent *bev, short event)
 {
        struct bufferevent_filtered *bevf = upcast(bev);
+       _bufferevent_generic_adj_timeouts(bev);
        return bufferevent_disable(bevf->underlying, event);
 }
 
-static void
-be_filter_adj_timeouts(struct bufferevent *bev)
-{
-       struct bufferevent_filtered *bevf = upcast(bev);
-       struct timeval *r = NULL, *w = NULL;
-
-       if (bev->timeout_read.tv_sec >= 0)
-               r = &bev->timeout_read;
-       if (bev->timeout_write.tv_sec >= 0)
-               w = &bev->timeout_write;
-
-       bufferevent_set_timeouts(bevf->underlying, r, w);
-}
-
 static enum bufferevent_filter_result
 be_filter_process_input(struct bufferevent_filtered *bevf,
                        enum bufferevent_flush_mode state,
@@ -283,6 +274,9 @@ be_filter_process_input(struct bufferevent_filtered *bevf,
                 evbuffer_get_length(bevf->underlying->input) &&
                 !be_readbuf_full(bevf, state));
 
+       if (*processed_out)
+               BEV_RESET_GENERIC_READ_TIMEOUT(bev);
+
        return res;
 }
 
@@ -359,6 +353,9 @@ be_filter_process_output(struct bufferevent_filtered *bevf,
         evbuffer_cb_set_flags(bufev->output,bevf->outbuf_cb,
             EVBUFFER_CB_ENABLED);
 
+       if (*processed_out)
+               BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev);
+
        return res;
 }
 
@@ -395,8 +392,8 @@ be_filter_readcb(struct bufferevent *underlying, void *_me)
        res = be_filter_process_input(bevf, state, &processed_any);
 
        if (processed_any &&
-            evbuffer_get_length(bufev->input) >= bufev->wm_read.low &&
-            bufev->readcb != NULL)
+           evbuffer_get_length(bufev->input) >= bufev->wm_read.low &&
+           bufev->readcb != NULL)
                _bufferevent_run_readcb(bufev);
 }
 
index e692404614ba33406ed2e43303ab1273ea6a74d5..a061cb8ced79b285978d50480a1d7b5005b9f774 100644 (file)
@@ -83,13 +83,13 @@ bufferevent_pair_elt_new(struct event_base *base,
                mm_free(bufev);
                return NULL;
        }
-       /* XXX set read timeout event */
-       /* XXX set write timeout event */
        if (!evbuffer_add_cb(bufev->bev.bev.output, be_pair_outbuf_cb, bufev)) {
                bufferevent_free(downcast(bufev));
                return NULL;
        }
 
+       _bufferevent_init_generic_timeout_cbs(&bufev->bev.bev);
+
        return bufev;
 }
 
@@ -155,6 +155,9 @@ be_pair_transfer(struct bufferevent *src, struct bufferevent *dst,
                evbuffer_add_buffer(dst->input, src->output);
        }
 
+       BEV_RESET_GENERIC_READ_TIMEOUT(dst);
+       BEV_RESET_GENERIC_WRITE_TIMEOUT(dst);
+
        src_size = evbuffer_get_length(src->output);
        dst_size = evbuffer_get_length(dst->input);
 
@@ -201,6 +204,8 @@ be_pair_enable(struct bufferevent *bufev, short events)
        struct bufferevent_pair *bev_p = upcast(bufev);
        struct bufferevent_pair *partner = bev_p->partner;
 
+       _bufferevent_generic_adj_timeouts(bufev);
+
        /* We're starting to read! Does the other side have anything to write?*/
        if ((events & EV_READ) && partner &&
            be_pair_wants_to_talk(partner, bev_p)) {
@@ -217,6 +222,7 @@ be_pair_enable(struct bufferevent *bufev, short events)
 static int
 be_pair_disable(struct bufferevent *bev, short events)
 {
+       _bufferevent_generic_adj_timeouts(bev);
        return 0;
 }
 
@@ -229,12 +235,8 @@ be_pair_destruct(struct bufferevent *bev)
                bev_p->partner->partner = NULL;
                bev_p->partner = NULL;
        }
-}
 
-static void
-be_pair_adj_timeouts(struct bufferevent *bev)
-{
-       /* TODO: implement. */
+       _bufferevent_del_generic_timeout_cbs(bev);
 }
 
 static int
@@ -271,7 +273,7 @@ const struct bufferevent_ops bufferevent_ops_pair = {
        be_pair_enable,
        be_pair_disable,
        be_pair_destruct,
-       be_pair_adj_timeouts,
+       _bufferevent_generic_adj_timeouts,
        be_pair_flush,
        NULL, /* ctrl */
 };