bufferevent_free(bev2);
}
+struct bufferevent_filter_data_stuck {
+ size_t header_size;
+ size_t total_read;
+};
+
+static void
+bufferevent_filter_data_stuck_readcb(struct bufferevent *bev, void *arg)
+{
+ struct bufferevent_filter_data_stuck *filter_data = arg;
+ struct evbuffer *input = bufferevent_get_input(bev);
+ size_t read_size = evbuffer_get_length(input);
+ evbuffer_drain(input, read_size);
+ filter_data->total_read += read_size;
+}
+
+/**
+ * This filter prepends header once before forwarding data.
+ */
+static enum bufferevent_filter_result
+bufferevent_filter_data_stuck_inputcb(
+ struct evbuffer *src, struct evbuffer *dst, ev_ssize_t dst_limit,
+ enum bufferevent_flush_mode mode, void *ctx)
+{
+ struct bufferevent_filter_data_stuck *filter_data = ctx;
+ static int header_inserted = 0;
+ size_t payload_size;
+ size_t header_size = 0;
+
+ if (!header_inserted) {
+ char *header = calloc(filter_data->header_size, 1);
+ evbuffer_add(dst, header, filter_data->header_size);
+ free(header);
+ header_size = filter_data->header_size;
+ header_inserted = 1;
+ }
+
+ payload_size = evbuffer_get_length(src);
+ if (payload_size > dst_limit - header_size) {
+ payload_size = dst_limit - header_size;
+ }
+
+ tt_int_op(payload_size, ==, evbuffer_remove_buffer(src, dst, payload_size));
+
+end:
+ return BEV_OK;
+}
+
+static void
+test_bufferevent_filter_data_stuck(void *arg)
+{
+ const size_t read_high_wm = 4096;
+ struct bufferevent_filter_data_stuck filter_data;
+ struct basic_test_data *data = arg;
+ struct bufferevent *pair[2];
+ struct bufferevent *filter = NULL;
+
+ int options = BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS;
+
+ char payload[4096];
+ int payload_size = sizeof(payload);
+
+ memset(&filter_data, 0, sizeof(filter_data));
+ filter_data.header_size = 20;
+
+ tt_assert(bufferevent_pair_new(data->base, options, pair) == 0);
+
+ bufferevent_setwatermark(pair[0], EV_READ, 0, read_high_wm);
+ bufferevent_setwatermark(pair[1], EV_READ, 0, read_high_wm);
+
+ tt_assert(
+ filter =
+ bufferevent_filter_new(pair[1],
+ bufferevent_filter_data_stuck_inputcb,
+ NULL,
+ options,
+ NULL,
+ &filter_data));
+
+ bufferevent_setcb(filter,
+ bufferevent_filter_data_stuck_readcb,
+ NULL,
+ NULL,
+ &filter_data);
+
+ tt_assert(bufferevent_enable(filter, EV_READ|EV_WRITE) == 0);
+
+ bufferevent_setwatermark(filter, EV_READ, 0, read_high_wm);
+
+ tt_assert(bufferevent_write(pair[0], payload, sizeof(payload)) == 0);
+
+ event_base_dispatch(data->base);
+
+ tt_int_op(filter_data.total_read, ==, payload_size + filter_data.header_size);
+end:
+ if (pair[0])
+ bufferevent_free(pair[0]);
+ if (filter)
+ bufferevent_free(filter);
+}
+
struct testcase_t bufferevent_testcases[] = {
LEGACY(bufferevent, TT_ISOLATED),
{ "bufferevent_pair_flush",
test_bufferevent_pair_flush,
TT_FORK|TT_NEED_BASE, &basic_setup, NULL },
+ { "bufferevent_filter_data_stuck",
+ test_bufferevent_filter_data_stuck,
+ TT_FORK|TT_NEED_BASE, &basic_setup, NULL },
END_OF_TESTCASES,
};