]> granicus.if.org Git - libevent/commitdiff
be_filter: avoid data stuck under active watermarks
authorEduardo Panisset <eduardo@anchorfree.com>
Fri, 17 Jun 2016 17:46:32 +0000 (10:46 -0700)
committerAzat Khuzhin <a3at.mail@gmail.com>
Sun, 19 Jun 2016 10:04:19 +0000 (13:04 +0300)
Suppose we have bufferevent filter attached to bufferevent socket.
Read high watermark for bufferevent filter is configured to 4096 bytes.
Socket receives 4343 bytes. Due to watermark, 4096 bytes are transferred
from socket input buffer to filter input buffer and 247 bytes are left
in bufferevent socket.
Suppose that no more data is received through socket.

At this point 247 bytes will sit forever in input buffer of bufferevent
socket.
The patch attached solves this issue registering read callback to
filter's input buffer if it reaches its read high water mark and data
was left in corresponding underlying's input buffer.

This read callback calls filter process input function as soon as filter
input buffer falls below its read high watermark and there still is data
left in underlying input buffer. Callback is deregistered as soon as
filter input buffer falls below its read high watermark.

bufferevent_filter.c

index 5d5f992bdcc509ed1100236e10545520dcd9a5eb..d47f9452bbd2a5213ae628eb56ec3140f459fd38 100644 (file)
@@ -71,6 +71,9 @@ static int be_filter_flush(struct bufferevent *bufev,
     short iotype, enum bufferevent_flush_mode mode);
 static int be_filter_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
 
+static void bufferevent_filtered_inbuf_cb(struct evbuffer *buf,
+    const struct evbuffer_cb_info *cbinfo, void *arg);
+
 static void bufferevent_filtered_outbuf_cb(struct evbuffer *buf,
     const struct evbuffer_cb_info *info, void *arg);
 
@@ -79,6 +82,8 @@ struct bufferevent_filtered {
 
        /** The bufferevent that we read/write filtered data from/to. */
        struct bufferevent *underlying;
+       /** A callback on our inbuf to notice somebory removes data */
+       struct evbuffer_cb_entry *inbuf_cb;
        /** A callback on our outbuf to notice when somebody adds data */
        struct evbuffer_cb_entry *outbuf_cb;
        /** True iff we have received an EOF callback from the underlying
@@ -203,6 +208,11 @@ bufferevent_filter_new(struct bufferevent *underlying,
        bufferevent_setcb(bufev_f->underlying,
            be_filter_readcb, be_filter_writecb, be_filter_eventcb, bufev_f);
 
+       bufev_f->inbuf_cb = evbuffer_add_cb(downcast(bufev_f)->input,
+               bufferevent_filtered_inbuf_cb, bufev_f);
+       evbuffer_cb_clear_flags(downcast(bufev_f)->input, bufev_f->inbuf_cb,
+               EVBUFFER_CB_ENABLED);
+
        bufev_f->outbuf_cb = evbuffer_add_cb(downcast(bufev_f)->output,
           bufferevent_filtered_outbuf_cb, bufev_f);
 
@@ -251,6 +261,12 @@ be_filter_destruct(struct bufferevent *bev)
        EVUTIL_ASSERT(bevf);
        if (bevf->free_context)
                bevf->free_context(bevf->context);
+
+       if (bevf->inbuf_cb)
+               evbuffer_remove_cb_entry(bev->input, bevf->inbuf_cb);
+
+       if (bevf->outbuf_cb)
+               evbuffer_remove_cb_entry(bev->output, bevf->outbuf_cb);
 }
 
 static int
@@ -418,9 +434,8 @@ bufferevent_filtered_outbuf_cb(struct evbuffer *buf,
        }
 }
 
-/* Called when the underlying socket has read. */
 static void
-be_filter_readcb(struct bufferevent *underlying, void *me_)
+be_filter_read_nolock_(struct bufferevent *underlying, void *me_)
 {
        struct bufferevent_filtered *bevf = me_;
        enum bufferevent_filter_result res;
@@ -429,8 +444,6 @@ be_filter_readcb(struct bufferevent *underlying, void *me_)
        struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
        int processed_any = 0;
 
-       BEV_LOCK(bufev);
-
        // It's possible our refcount is 0 at this point if another thread free'd our filterevent
        EVUTIL_ASSERT(bufev_private->refcnt >= 0);
 
@@ -449,11 +462,65 @@ be_filter_readcb(struct bufferevent *underlying, void *me_)
                /* XXX This should be in process_input, not here.  There are
                 * other places that can call process-input, and they should
                 * force readcb calls as needed. */
-               if (processed_any)
+               if (processed_any) {
                        bufferevent_trigger_nolock_(bufev, EV_READ, 0);
+                       if (evbuffer_get_length(underlying->input) > 0 &&
+                               be_readbuf_full(bevf, state)) {
+                               /* data left in underlying buffer and filter input buffer
+                                * hit its read high watermark.
+                                * Schedule callback to avoid data gets stuck in underlying
+                                * input buffer.
+                                */
+                               evbuffer_cb_set_flags(bufev->input, bevf->inbuf_cb,
+                                       EVBUFFER_CB_ENABLED);
+                       }
+               }
        }
+}
 
-       BEV_UNLOCK(bufev);
+/* Called when the size of our inbuf changes. */
+static void
+bufferevent_filtered_inbuf_cb(struct evbuffer *buf,
+    const struct evbuffer_cb_info *cbinfo, void *arg)
+{
+       struct bufferevent_filtered *bevf = arg;
+       enum bufferevent_flush_mode state;
+       struct bufferevent *bev = downcast(bevf);
+
+       BEV_LOCK(bev);
+
+       if (bevf->got_eof)
+               state = BEV_FINISHED;
+       else
+               state = BEV_NORMAL;
+
+
+       if (!be_readbuf_full(bevf, state)) {
+               /* opportunity to read data which was left in underlying
+                * input buffer because filter input buffer hit read
+                * high watermark.
+                */
+               evbuffer_cb_clear_flags(bev->input, bevf->inbuf_cb,
+                       EVBUFFER_CB_ENABLED);
+               if (evbuffer_get_length(bevf->underlying->input) > 0)
+                       be_filter_read_nolock_(bevf->underlying, bevf);
+       }
+
+       BEV_UNLOCK(bev);
+}
+
+/* Called when the underlying socket has read. */
+static void
+be_filter_readcb(struct bufferevent *underlying, void *me_)
+{
+       struct bufferevent_filtered *bevf = me_;
+       struct bufferevent *bev = downcast(bevf);
+
+       BEV_LOCK(bev);
+
+       be_filter_read_nolock_(underlying, me_);
+
+       BEV_UNLOCK(bev);
 }
 
 /* Called when the underlying socket has drained enough that we can write to