short iotype, enum bufferevent_flush_mode mode);
static int be_filter_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
+static void bufferevent_filtered_inbuf_cb(struct evbuffer *buf,
+ const struct evbuffer_cb_info *cbinfo, void *arg);
+
static void bufferevent_filtered_outbuf_cb(struct evbuffer *buf,
const struct evbuffer_cb_info *info, void *arg);
/** The bufferevent that we read/write filtered data from/to. */
struct bufferevent *underlying;
+ /** A callback on our inbuf to notice somebory removes data */
+ struct evbuffer_cb_entry *inbuf_cb;
/** A callback on our outbuf to notice when somebody adds data */
struct evbuffer_cb_entry *outbuf_cb;
/** True iff we have received an EOF callback from the underlying
bufferevent_setcb(bufev_f->underlying,
be_filter_readcb, be_filter_writecb, be_filter_eventcb, bufev_f);
+ bufev_f->inbuf_cb = evbuffer_add_cb(downcast(bufev_f)->input,
+ bufferevent_filtered_inbuf_cb, bufev_f);
+ evbuffer_cb_clear_flags(downcast(bufev_f)->input, bufev_f->inbuf_cb,
+ EVBUFFER_CB_ENABLED);
+
bufev_f->outbuf_cb = evbuffer_add_cb(downcast(bufev_f)->output,
bufferevent_filtered_outbuf_cb, bufev_f);
EVUTIL_ASSERT(bevf);
if (bevf->free_context)
bevf->free_context(bevf->context);
+
+ if (bevf->inbuf_cb)
+ evbuffer_remove_cb_entry(bev->input, bevf->inbuf_cb);
+
+ if (bevf->outbuf_cb)
+ evbuffer_remove_cb_entry(bev->output, bevf->outbuf_cb);
}
static int
}
}
-/* Called when the underlying socket has read. */
static void
-be_filter_readcb(struct bufferevent *underlying, void *me_)
+be_filter_read_nolock_(struct bufferevent *underlying, void *me_)
{
struct bufferevent_filtered *bevf = me_;
enum bufferevent_filter_result res;
struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
int processed_any = 0;
- BEV_LOCK(bufev);
-
// It's possible our refcount is 0 at this point if another thread free'd our filterevent
EVUTIL_ASSERT(bufev_private->refcnt >= 0);
/* XXX This should be in process_input, not here. There are
* other places that can call process-input, and they should
* force readcb calls as needed. */
- if (processed_any)
+ if (processed_any) {
bufferevent_trigger_nolock_(bufev, EV_READ, 0);
+ if (evbuffer_get_length(underlying->input) > 0 &&
+ be_readbuf_full(bevf, state)) {
+ /* data left in underlying buffer and filter input buffer
+ * hit its read high watermark.
+ * Schedule callback to avoid data gets stuck in underlying
+ * input buffer.
+ */
+ evbuffer_cb_set_flags(bufev->input, bevf->inbuf_cb,
+ EVBUFFER_CB_ENABLED);
+ }
+ }
}
+}
- BEV_UNLOCK(bufev);
+/* Called when the size of our inbuf changes. */
+static void
+bufferevent_filtered_inbuf_cb(struct evbuffer *buf,
+ const struct evbuffer_cb_info *cbinfo, void *arg)
+{
+ struct bufferevent_filtered *bevf = arg;
+ enum bufferevent_flush_mode state;
+ struct bufferevent *bev = downcast(bevf);
+
+ BEV_LOCK(bev);
+
+ if (bevf->got_eof)
+ state = BEV_FINISHED;
+ else
+ state = BEV_NORMAL;
+
+
+ if (!be_readbuf_full(bevf, state)) {
+ /* opportunity to read data which was left in underlying
+ * input buffer because filter input buffer hit read
+ * high watermark.
+ */
+ evbuffer_cb_clear_flags(bev->input, bevf->inbuf_cb,
+ EVBUFFER_CB_ENABLED);
+ if (evbuffer_get_length(bevf->underlying->input) > 0)
+ be_filter_read_nolock_(bevf->underlying, bevf);
+ }
+
+ BEV_UNLOCK(bev);
+}
+
+/* Called when the underlying socket has read. */
+static void
+be_filter_readcb(struct bufferevent *underlying, void *me_)
+{
+ struct bufferevent_filtered *bevf = me_;
+ struct bufferevent *bev = downcast(bevf);
+
+ BEV_LOCK(bev);
+
+ be_filter_read_nolock_(underlying, me_);
+
+ BEV_UNLOCK(bev);
}
/* Called when the underlying socket has drained enough that we can write to