]> granicus.if.org Git - libevent/commitdiff
Merge branch 'be-wm-overrun-v2'
authorAzat Khuzhin <a3at.mail@gmail.com>
Wed, 17 Oct 2018 20:21:32 +0000 (23:21 +0300)
committerAzat Khuzhin <azat@libevent.org>
Sat, 2 Feb 2019 12:17:59 +0000 (15:17 +0300)
* be-wm-overrun-v2:
  Fix hangs due to watermarks overruns in bufferevents implementations
  test: cover watermarks (with some corner cases) in ssl bufferevent

Fixes: #690
(cherry picked from commit 878bb2d3b9484b27594308da1d0d6a7c9bdf6647)

bufferevent.c
include/event2/bufferevent.h
test/regress_ssl.c

index 490b59839ccacdc2ffacb79706258f31e485eed3..89ad6e2b01563e13edf7c916ea9bedc58a1316aa 100644 (file)
@@ -111,6 +111,28 @@ bufferevent_unsuspend_write_(struct bufferevent *bufev, bufferevent_suspend_flag
        BEV_UNLOCK(bufev);
 }
 
+/**
+ * Sometimes bufferevent's implementation can overrun high watermarks
+ * (one of examples is openssl) and in this case if the read callback
+ * will not handle enough data do over condition above the read
+ * callback will never be called again (due to suspend above).
+ *
+ * To avoid this we are scheduling read callback again here, but only
+ * from the user callback to avoid multiple scheduling:
+ * - when the data had been added to it
+ * - when the data had been drained from it (user specified read callback)
+ */
+static void bufferevent_inbuf_wm_check(struct bufferevent *bev)
+{
+       if (!bev->wm_read.high)
+               return;
+       if (!(bev->enabled & EV_READ))
+               return;
+       if (evbuffer_get_length(bev->input) < bev->wm_read.high)
+               return;
+
+       bufferevent_trigger(bev, EV_READ, BEV_OPT_DEFER_CALLBACKS);
+}
 
 /* Callback to implement watermarks on the input buffer.  Only enabled
  * if the watermark is set. */
@@ -147,6 +169,7 @@ bufferevent_run_deferred_callbacks_locked(struct event_callback *cb, void *arg)
        if (bufev_private->readcb_pending && bufev->readcb) {
                bufev_private->readcb_pending = 0;
                bufev->readcb(bufev, bufev->cbarg);
+               bufferevent_inbuf_wm_check(bufev);
        }
        if (bufev_private->writecb_pending && bufev->writecb) {
                bufev_private->writecb_pending = 0;
@@ -187,6 +210,7 @@ bufferevent_run_deferred_callbacks_unlocked(struct event_callback *cb, void *arg
                void *cbarg = bufev->cbarg;
                bufev_private->readcb_pending = 0;
                UNLOCKED(readcb(bufev, cbarg));
+               bufferevent_inbuf_wm_check(bufev);
        }
        if (bufev_private->writecb_pending && bufev->writecb) {
                bufferevent_data_cb writecb = bufev->writecb;
@@ -230,6 +254,7 @@ bufferevent_run_readcb_(struct bufferevent *bufev, int options)
                SCHEDULE_DEFERRED(p);
        } else {
                bufev->readcb(bufev, bufev->cbarg);
+               bufferevent_inbuf_wm_check(bufev);
        }
 }
 
index 825918e3a45261860d83dc514795e75be922661d..dac34dc761eef9f5821d2bc5b21069b5fd02b5a0 100644 (file)
@@ -518,6 +518,9 @@ int bufferevent_set_timeouts(struct bufferevent *bufev,
   On input, a bufferevent does not invoke the user read callback unless
   there is at least low watermark data in the buffer.  If the read buffer
   is beyond the high watermark, the bufferevent stops reading from the network.
+  But be aware that bufferevent input/read buffer can overrun high watermark
+  limit (typical example is openssl bufferevent), so you should not relay in
+  this.
 
   On output, the user write callback is invoked whenever the buffered data
   falls below the low watermark.  Filters that write to this bufev will try
index 92026976ec94c26be4724d692aa3d5927c69bdd5..0eba2323728d6aed6f156c654c1153a5248b0b36 100644 (file)
@@ -733,6 +733,169 @@ end:
        ;
 }
 
+struct wm_context
+{
+       int server;
+       struct evbuffer *data;
+       size_t to_read;
+       size_t wm_high;
+       size_t limit;
+       size_t get;
+       struct bufferevent *bev;
+};
+static void
+wm_transfer(struct bufferevent *bev, void *arg)
+{
+       struct wm_context *ctx = arg;
+       struct evbuffer *in  = bufferevent_get_input(bev);
+       struct evbuffer *out = bufferevent_get_output(bev);
+       size_t len = evbuffer_get_length(in);
+       size_t drain = len < ctx->to_read ? len : ctx->to_read;
+
+       evbuffer_drain(in, drain);
+       ctx->get += drain;
+
+       TT_BLATHER(("wm_transfer-%s: in: %zu, out: %zu, got: %zu",
+               ctx->server ? "server" : "client",
+               evbuffer_get_length(in),
+               evbuffer_get_length(out),
+               ctx->get));
+
+       evbuffer_add_buffer_reference(out, ctx->data);
+       if (ctx->get >= ctx->limit) {
+               TT_BLATHER(("wm_transfer-%s: break",
+                       ctx->server ? "server" : "client"));
+               bufferevent_setcb(bev, NULL, NULL, NULL, NULL);
+               bufferevent_disable(bev, EV_READ);
+       }
+}
+static void
+wm_eventcb(struct bufferevent *bev, short what, void *arg)
+{
+       struct wm_context *ctx = arg;
+       TT_BLATHER(("wm_eventcb-%s: %i",
+               ctx->server ? "server" : "client", what));
+       if (what & BEV_EVENT_CONNECTED) {
+       } else {
+               ctx->get = 0;
+       }
+}
+static void
+wm_acceptcb(struct evconnlistener *listener, evutil_socket_t fd,
+    struct sockaddr *addr, int socklen, void *arg)
+{
+       struct wm_context *ctx = arg;
+       struct bufferevent *bev;
+       struct event_base *base = evconnlistener_get_base(listener);
+       SSL *ssl = SSL_new(get_ssl_ctx());
+
+       SSL_use_certificate(ssl, ssl_getcert());
+       SSL_use_PrivateKey(ssl, ssl_getkey());
+
+       bev = bufferevent_openssl_socket_new(
+               base, fd, ssl, BUFFEREVENT_SSL_ACCEPTING, BEV_OPT_CLOSE_ON_FREE);
+
+       bufferevent_setwatermark(bev, EV_READ, 0, ctx->wm_high);
+       bufferevent_setcb(bev, wm_transfer, NULL, wm_eventcb, ctx);
+       bufferevent_enable(bev, EV_READ|EV_WRITE);
+       ctx->bev = bev;
+
+       /* Only accept once, then disable ourself. */
+       evconnlistener_disable(listener);
+}
+static void
+regress_bufferevent_openssl_wm(void *arg)
+{
+       struct basic_test_data *data = arg;
+       struct event_base *base = data->base;
+
+       struct evconnlistener *listener;
+       struct bufferevent *bev;
+       struct sockaddr_in sin;
+       struct sockaddr_storage ss;
+       enum regress_openssl_type type =
+               (enum regress_openssl_type)data->setup_data;
+       int bev_flags = BEV_OPT_CLOSE_ON_FREE;
+       ev_socklen_t slen;
+       SSL *ssl;
+       struct wm_context client, server;
+       char *payload;
+       size_t payload_len = 1<<10;
+       size_t wm_high = 5<<10;
+
+       init_ssl();
+
+       memset(&sin, 0, sizeof(sin));
+       sin.sin_family = AF_INET;
+       sin.sin_addr.s_addr = htonl(0x7f000001);
+
+       memset(&ss, 0, sizeof(ss));
+       slen = sizeof(ss);
+
+       memset(&client, 0, sizeof(client));
+       memset(&server, 0, sizeof(server));
+       client.server = 0;
+       server.server = 1;
+       client.data = evbuffer_new();
+       server.data = evbuffer_new();
+       payload = calloc(1, payload_len);
+       memset(payload, 'A', payload_len);
+       evbuffer_add(server.data, payload, payload_len);
+       evbuffer_add(client.data, payload, payload_len);
+       client.wm_high = server.wm_high = wm_high;
+       client.limit = server.limit = wm_high<<3;
+       client.to_read = server.to_read = payload_len>>1;
+
+       TT_BLATHER(("openssl_wm: payload_len = %zu, wm_high = %zu, limit = %zu, to_read: %zu",
+               payload_len, wm_high, server.limit, server.to_read));
+
+       listener = evconnlistener_new_bind(base, wm_acceptcb, &server,
+           LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE,
+           -1, (struct sockaddr *)&sin, sizeof(sin));
+
+       tt_assert(listener);
+       tt_assert(evconnlistener_get_fd(listener) >= 0);
+
+       ssl = SSL_new(get_ssl_ctx());
+       tt_assert(ssl);
+
+       if (type & REGRESS_OPENSSL_FILTER) {
+               bev = bufferevent_socket_new(data->base, -1, bev_flags);
+               tt_assert(bev);
+               bev = bufferevent_openssl_filter_new(
+                       base, bev, ssl, BUFFEREVENT_SSL_CONNECTING, bev_flags);
+       } else {
+               bev = bufferevent_openssl_socket_new(
+                       data->base, -1, ssl,
+                       BUFFEREVENT_SSL_CONNECTING,
+                       bev_flags);
+       }
+       tt_assert(bev);
+       client.bev = bev;
+
+       bufferevent_setwatermark(bev, EV_READ, 0, client.wm_high);
+       bufferevent_setcb(bev, wm_transfer, NULL, wm_eventcb, &client);
+
+       tt_assert(getsockname(evconnlistener_get_fd(listener),
+               (struct sockaddr*)&ss, &slen) == 0);
+
+       tt_assert(!bufferevent_socket_connect(bev, (struct sockaddr*)&ss, slen));
+       tt_assert(!evbuffer_add_buffer_reference(bufferevent_get_output(bev), client.data));
+       tt_assert(!bufferevent_enable(bev, EV_READ|EV_WRITE));
+
+       event_base_dispatch(base);
+
+       tt_int_op(client.get, ==, client.limit);
+       tt_int_op(server.get, ==, server.limit);
+end:
+       free(payload);
+       evbuffer_free(client.data);
+       evbuffer_free(server.data);
+       evconnlistener_free(listener);
+       bufferevent_free(client.bev);
+       bufferevent_free(server.bev);
+}
+
 struct testcase_t ssl_testcases[] = {
 #define T(a) ((void *)(a))
        { "bufferevent_socketpair", regress_bufferevent_openssl,
@@ -808,6 +971,11 @@ struct testcase_t ssl_testcases[] = {
        { "bufferevent_connect_sleep", regress_bufferevent_openssl_connect,
          TT_FORK|TT_NEED_BASE, &basic_setup, T(REGRESS_OPENSSL_SLEEP) },
 
+       { "bufferevent_wm", regress_bufferevent_openssl_wm,
+         TT_FORK|TT_NEED_BASE, &basic_setup, NULL },
+       { "bufferevent_wm_filter", regress_bufferevent_openssl_wm,
+         TT_FORK|TT_NEED_BASE, &basic_setup, T(REGRESS_OPENSSL_FILTER) },
+
 #undef T
 
        END_OF_TESTCASES,