]> granicus.if.org Git - libevent/commitdiff
Add an option to trigger bufferevent I/O callbacks
authorOndřej Kuzník <ondra@mistotebe.net>
Tue, 3 Dec 2013 22:49:57 +0000 (22:49 +0000)
committerOndřej Kuzník <ondra@mistotebe.net>
Tue, 3 Dec 2013 23:39:13 +0000 (23:39 +0000)
bufferevent-internal.h
bufferevent.c
bufferevent_async.c
bufferevent_filter.c
bufferevent_openssl.c
bufferevent_pair.c
bufferevent_sock.c
include/event2/bufferevent.h
test/regress_bufferevent.c

index 0c4df871fa7059589c8f78791808d188ab04cece..0fa690b9225753034f3f12a96cd427d4a9585d2f 100644 (file)
@@ -341,14 +341,20 @@ int bufferevent_decref_and_unlock_(struct bufferevent *bufev);
 
 /** Internal: If callbacks are deferred and we have a read callback, schedule
  * a readcb.  Otherwise just run the readcb. */
-void bufferevent_run_readcb_(struct bufferevent *bufev);
+void bufferevent_run_readcb_(struct bufferevent *bufev, int options);
 /** Internal: If callbacks are deferred and we have a write callback, schedule
  * a writecb.  Otherwise just run the writecb. */
-void bufferevent_run_writecb_(struct bufferevent *bufev);
+void bufferevent_run_writecb_(struct bufferevent *bufev, int options);
 /** Internal: If callbacks are deferred and we have an eventcb, schedule
  * it to run with events "what".  Otherwise just run the eventcb. */
 void bufferevent_run_eventcb_(struct bufferevent *bufev, short what);
 
+/** Internal: Run or schedule (if deferred or options contain
+ * BEV_TRIG_DEFER_CALLBACKS) I/O callbacks specified in iotype.
+ * Must already hold the bufev lock. */
+void bufferevent_trigger_nolock_(struct bufferevent *bufev, short iotype, int options);
+
+
 /** Internal: Add the event 'ev' with timeout tv, unless tv is set to 0, in
  * which case add ev with no timeout. */
 int bufferevent_add_event_(struct event *ev, const struct timeval *tv);
index 96ce109f18376ada458cc16613197f2fbedc0667..fd95941bae301adac33f76214bfa7dd475ab4f6f 100644 (file)
@@ -219,14 +219,15 @@ bufferevent_run_deferred_callbacks_unlocked(struct event_callback *cb, void *arg
 
 
 void
-bufferevent_run_readcb_(struct bufferevent *bufev)
+bufferevent_run_readcb_(struct bufferevent *bufev, int options)
 {
        /* Requires that we hold the lock and a reference */
        struct bufferevent_private *p =
            EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
        if (bufev->readcb == NULL)
                return;
-       if (p->options & BEV_OPT_DEFER_CALLBACKS) {
+       if ((p->options & BEV_OPT_DEFER_CALLBACKS) ||
+           (options & BEV_TRIG_DEFER_CALLBACKS)) {
                p->readcb_pending = 1;
                SCHEDULE_DEFERRED(p);
        } else {
@@ -235,14 +236,15 @@ bufferevent_run_readcb_(struct bufferevent *bufev)
 }
 
 void
-bufferevent_run_writecb_(struct bufferevent *bufev)
+bufferevent_run_writecb_(struct bufferevent *bufev, int options)
 {
        /* Requires that we hold the lock and a reference */
        struct bufferevent_private *p =
            EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
        if (bufev->writecb == NULL)
                return;
-       if (p->options & BEV_OPT_DEFER_CALLBACKS) {
+       if ((p->options & BEV_OPT_DEFER_CALLBACKS) ||
+           (options & BEV_TRIG_DEFER_CALLBACKS)) {
                p->writecb_pending = 1;
                SCHEDULE_DEFERRED(p);
        } else {
@@ -250,6 +252,25 @@ bufferevent_run_writecb_(struct bufferevent *bufev)
        }
 }
 
+void
+bufferevent_trigger_nolock_(struct bufferevent *bufev, short iotype, int options)
+{
+       if ((iotype & EV_READ) && ((options & BEV_TRIG_IGNORE_WATERMARKS) ||
+           evbuffer_get_length(bufev->input) >= bufev->wm_read.low))
+               bufferevent_run_readcb_(bufev, options);
+       if ((iotype & EV_WRITE) && ((options & BEV_TRIG_IGNORE_WATERMARKS) ||
+           evbuffer_get_length(bufev->output) <= bufev->wm_write.low))
+               bufferevent_run_writecb_(bufev, options);
+}
+
+void
+bufferevent_trigger(struct bufferevent *bufev, short iotype, int options)
+{
+       bufferevent_incref_and_lock_(bufev);
+       bufferevent_trigger_nolock_(bufev, iotype, options);
+       bufferevent_decref_and_unlock_(bufev);
+}
+
 void
 bufferevent_run_eventcb_(struct bufferevent *bufev, short what)
 {
@@ -322,20 +343,18 @@ bufferevent_init_common_(struct bufferevent_private *bufev_private,
                event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS");
                return -1;
        }
-       if (options & BEV_OPT_DEFER_CALLBACKS) {
-               if (options & BEV_OPT_UNLOCK_CALLBACKS)
-                       event_deferred_cb_init_(
-                           &bufev_private->deferred,
-                           event_base_get_npriorities(base) / 2,
-                           bufferevent_run_deferred_callbacks_unlocked,
-                           bufev_private);
-               else
-                       event_deferred_cb_init_(
-                           &bufev_private->deferred,
-                           event_base_get_npriorities(base) / 2,
-                           bufferevent_run_deferred_callbacks_locked,
-                           bufev_private);
-       }
+       if (options & BEV_OPT_UNLOCK_CALLBACKS)
+               event_deferred_cb_init_(
+                   &bufev_private->deferred,
+                   event_base_get_npriorities(base) / 2,
+                   bufferevent_run_deferred_callbacks_unlocked,
+                   bufev_private);
+       else
+               event_deferred_cb_init_(
+                   &bufev_private->deferred,
+                   event_base_get_npriorities(base) / 2,
+                   bufferevent_run_deferred_callbacks_locked,
+                   bufev_private);
 
        bufev_private->options = options;
 
index 0152fd164714487578ee793aafdfc5bdbfcc5a83..4e686479776edaa56af9302a2cc4aec167db904e 100644 (file)
@@ -458,8 +458,7 @@ read_complete(struct event_overlapped *eo, ev_uintptr_t key,
        if (bev_a->ok) {
                if (ok && nbytes) {
                        BEV_RESET_GENERIC_READ_TIMEOUT(bev);
-                       if (evbuffer_get_length(bev->input) >= bev->wm_read.low)
-                               bufferevent_run_readcb_(bev);
+                       bufferevent_trigger_nolock_(bev, EV_READ, 0);
                        bev_async_consider_reading(bev_a);
                } else if (!ok) {
                        what |= BEV_EVENT_ERROR;
@@ -502,9 +501,7 @@ write_complete(struct event_overlapped *eo, ev_uintptr_t key,
        if (bev_a->ok) {
                if (ok && nbytes) {
                        BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
-                       if (evbuffer_get_length(bev->output) <=
-                           bev->wm_write.low)
-                               bufferevent_run_writecb_(bev);
+                       bufferevent_trigger_nolock_(bev, EV_WRITE, 0);
                        bev_async_consider_writing(bev_a);
                } else if (!ok) {
                        what |= BEV_EVENT_ERROR;
index cc02230cf52d629037073d374eb7ba00a2d4e013..cb1c00973bf2bb7c64573414d743d00c30d5fcb5 100644 (file)
@@ -376,10 +376,9 @@ be_filter_process_output(struct bufferevent_filtered *bevf,
                        /* Or if we have filled the underlying output buffer. */
                        !be_underlying_writebuf_full(bevf,state));
 
-               if (processed &&
-                   evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {
+               if (processed) {
                        /* call the write callback.*/
-                       bufferevent_run_writecb_(bufev);
+                       bufferevent_trigger_nolock_(bufev, EV_WRITE, 0);
 
                        if (res == BEV_OK &&
                            (bufev->enabled & EV_WRITE) &&
@@ -442,9 +441,8 @@ be_filter_readcb(struct bufferevent *underlying, void *me_)
        /* XXX This should be in process_input, not here.  There are
         * other places that can call process-input, and they should
         * force readcb calls as needed. */
-       if (processed_any &&
-           evbuffer_get_length(bufev->input) >= bufev->wm_read.low)
-               bufferevent_run_readcb_(bufev);
+       if (processed_any)
+               bufferevent_trigger_nolock_(bufev, EV_READ, 0);
 
        bufferevent_decref_and_unlock_(bufev);
 }
index 1ce124f938b8ccc8cd2a872252b88704de71b4cb..ed9e4a3dc87be693b8afebe2c34297f9f16c6183 100644 (file)
@@ -709,8 +709,7 @@ do_write(struct bufferevent_openssl *bev_ssl, int atmost)
                if (bev_ssl->underlying)
                        BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
 
-               if (evbuffer_get_length(output) <= bev->wm_write.low)
-                       bufferevent_run_writecb_(bev);
+               bufferevent_trigger_nolock_(bev, EV_WRITE, 0);
        }
        return result;
 }
@@ -824,11 +823,8 @@ consider_reading(struct bufferevent_openssl *bev_ssl)
 
        if (all_result_flags & OP_MADE_PROGRESS) {
                struct bufferevent *bev = &bev_ssl->bev.bev;
-               struct evbuffer *input = bev->input;
 
-               if (evbuffer_get_length(input) >= bev->wm_read.low) {
-                       bufferevent_run_readcb_(bev);
-               }
+               bufferevent_trigger_nolock_(bev, EV_READ, 0);
        }
 
        if (!bev_ssl->underlying) {
@@ -852,11 +848,8 @@ consider_writing(struct bufferevent_openssl *bev_ssl)
                r = do_read(bev_ssl, 1024); /* XXXX 1024 is a hack */
                if (r & OP_MADE_PROGRESS) {
                        struct bufferevent *bev = &bev_ssl->bev.bev;
-                       struct evbuffer *input = bev->input;
 
-                       if (evbuffer_get_length(input) >= bev->wm_read.low) {
-                               bufferevent_run_readcb_(bev);
-                       }
+                       bufferevent_trigger_nolock_(bev, EV_READ, 0);
                }
                if (r & (OP_ERR|OP_BLOCKED))
                        break;
index 4d467260b5bfddea9a29f112ada6f52aa8db3b99..eb3da3e316c5b2f5b58a69760ff99239664d23a9 100644 (file)
@@ -151,7 +151,7 @@ static void
 be_pair_transfer(struct bufferevent *src, struct bufferevent *dst,
     int ignore_wm)
 {
-       size_t src_size, dst_size;
+       size_t dst_size;
        size_t n;
 
        evbuffer_unfreeze(src->output, 1);
@@ -182,15 +182,8 @@ be_pair_transfer(struct bufferevent *src, struct bufferevent *dst,
                        BEV_DEL_GENERIC_WRITE_TIMEOUT(dst);
        }
 
-       src_size = evbuffer_get_length(src->output);
-       dst_size = evbuffer_get_length(dst->input);
-
-       if (dst_size >= dst->wm_read.low) {
-               bufferevent_run_readcb_(dst);
-       }
-       if (src_size <= src->wm_write.low) {
-               bufferevent_run_writecb_(src);
-       }
+       bufferevent_trigger_nolock_(dst, EV_READ, 0);
+       bufferevent_trigger_nolock_(src, EV_WRITE, 0);
 done:
        evbuffer_freeze(src->output, 1);
        evbuffer_freeze(dst->input, 0);
index 5ce4953b8dfa4cae25fb13d0cab9dd58c1529840..82983ed7389829aab1cbcc930bbedc179137970d 100644 (file)
@@ -184,8 +184,7 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
        bufferevent_decrement_read_buckets_(bufev_p, res);
 
        /* Invoke the user callback - must always be called last */
-       if (evbuffer_get_length(input) >= bufev->wm_read.low)
-               bufferevent_run_readcb_(bufev);
+       bufferevent_trigger_nolock_(bufev, EV_READ, 0);
 
        goto done;
 
@@ -294,9 +293,8 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
         * Invoke the user callback if our buffer is drained or below the
         * low watermark.
         */
-       if ((res || !connected) &&
-           evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {
-               bufferevent_run_writecb_(bufev);
+       if (res || !connected) {
+               bufferevent_trigger_nolock_(bufev, EV_WRITE, 0);
        }
 
        goto done;
index efe0617b989af79d2323db01d4e6059162ac7812..af6f7cdee41b92867d5e4d2a6e89bc645045672f 100644 (file)
@@ -558,6 +558,32 @@ int bufferevent_flush(struct bufferevent *bufev,
     short iotype,
     enum bufferevent_flush_mode mode);
 
+/**
+   Flags for bufferevent_trigger(_event) that modify when and how to trigger
+   the callback.
+*/
+enum bufferevent_trigger_options {
+       /** trigger the callback regardless of the watermarks */
+       BEV_TRIG_IGNORE_WATERMARKS = (1<<0),
+
+       /** defer even if the callbacks are not */
+       BEV_TRIG_DEFER_CALLBACKS = (1<<1),
+};
+
+/**
+   Triggers bufferevent data callbacks.
+
+   The function will honor watermarks unless options contain
+   BEV_TRIG_IGNORE_WATERMARKS. If the options contain BEV_OPT_DEFER_CALLBACKS,
+   the callbacks are deferred.
+
+   @param bufev the bufferevent object
+   @param iotype either EV_READ or EV_WRITE or both.
+   @param options
+ */
+void bufferevent_trigger(struct bufferevent *bufev, short iotype,
+    int options);
+
 /**
    @name Filtering support
 
index 89c405bf99f7f4106df0c4bee6e32e169cf5d078..874d6018ebb1a72785ca8e8c73e600a70f5d18f5 100644 (file)
@@ -447,6 +447,7 @@ sender_errorcb(struct bufferevent *bev, short what, void *ctx)
 }
 
 static int bufferevent_connect_test_flags = 0;
+static int bufferevent_trigger_test_flags = 0;
 static int n_strings_read = 0;
 static int n_reads_invoked = 0;
 
@@ -812,6 +813,122 @@ end:
                bufferevent_free(bev2);
 }
 
+static void
+trigger_failure_cb(evutil_socket_t fd, short what, void *ctx)
+{
+       TT_FAIL(("The triggered callback did not fire or the machine is really slow (try increasing timeout)."));
+}
+
+static void
+trigger_readcb_triggered(struct bufferevent *bev, void *ctx)
+{
+       struct event_base *base = ctx;
+
+       TT_BLATHER(("Read successfully triggered."));
+       n_reads_invoked++;
+       event_base_loopexit(base, NULL);
+}
+
+static void
+trigger_readcb(struct bufferevent *bev, void *ctx)
+{
+       struct timeval timeout = { 30, 0 };
+       struct event_base *base = ctx;
+       size_t low, high, len;
+       int expected_reads;
+
+       TT_BLATHER(("Read invoked on %d.", (int)bufferevent_getfd(bev)));
+       expected_reads = ++n_reads_invoked;
+
+       bufferevent_setcb(bev, trigger_readcb_triggered, NULL, reader_eventcb, ctx);
+
+       bufferevent_getwatermark(bev, EV_READ, &low, &high);
+       len = evbuffer_get_length(bufferevent_get_input(bev));
+
+       bufferevent_setwatermark(bev, EV_READ, len + 1, 0);
+       bufferevent_trigger(bev, EV_READ, bufferevent_trigger_test_flags);
+       /* no callback expected */
+       tt_int_op(n_reads_invoked, ==, expected_reads);
+
+       if ((bufferevent_trigger_test_flags & BEV_TRIG_DEFER_CALLBACKS) ||
+           (bufferevent_connect_test_flags & BEV_OPT_DEFER_CALLBACKS)) {
+               /* will be deferred */
+       } else {
+               expected_reads++;
+       }
+
+       event_base_once(base, -1, EV_TIMEOUT, trigger_failure_cb, NULL, &timeout);
+
+       bufferevent_trigger(bev, EV_READ,
+           bufferevent_trigger_test_flags | BEV_TRIG_IGNORE_WATERMARKS);
+       tt_int_op(n_reads_invoked, ==, expected_reads);
+
+       bufferevent_setwatermark(bev, EV_READ, low, high);
+end:
+       ;
+}
+
+static void
+test_bufferevent_trigger(void *arg)
+{
+       struct basic_test_data *data = arg;
+       struct evconnlistener *lev=NULL;
+       struct bufferevent *bev=NULL;
+       struct sockaddr_in localhost;
+       struct sockaddr_storage ss;
+       struct sockaddr *sa;
+       ev_socklen_t slen;
+
+       int be_flags=BEV_OPT_CLOSE_ON_FREE;
+       int trig_flags=0;
+
+       if (strstr((char*)data->setup_data, "defer")) {
+               be_flags |= BEV_OPT_DEFER_CALLBACKS;
+       }
+       bufferevent_connect_test_flags = be_flags;
+
+       if (strstr((char*)data->setup_data, "postpone")) {
+               trig_flags |= BEV_TRIG_DEFER_CALLBACKS;
+       }
+       bufferevent_trigger_test_flags = trig_flags;
+
+       memset(&localhost, 0, sizeof(localhost));
+
+       localhost.sin_port = 0; /* pick-a-port */
+       localhost.sin_addr.s_addr = htonl(0x7f000001L);
+       localhost.sin_family = AF_INET;
+       sa = (struct sockaddr *)&localhost;
+       lev = evconnlistener_new_bind(data->base, listen_cb, data->base,
+           LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE,
+           16, sa, sizeof(localhost));
+       tt_assert(lev);
+
+       sa = (struct sockaddr *)&ss;
+       slen = sizeof(ss);
+       if (regress_get_listener_addr(lev, sa, &slen) < 0) {
+               tt_abort_perror("getsockname");
+       }
+
+       tt_assert(!evconnlistener_enable(lev));
+       bev = bufferevent_socket_new(data->base, -1, be_flags);
+       tt_assert(bev);
+       bufferevent_setcb(bev, trigger_readcb, NULL, reader_eventcb, data->base);
+
+       bufferevent_enable(bev, EV_READ);
+
+       tt_want(!bufferevent_socket_connect(bev, sa, sizeof(localhost)));
+
+       event_base_dispatch(data->base);
+
+       tt_int_op(n_reads_invoked, ==, 2);
+end:
+       if (lev)
+               evconnlistener_free(lev);
+
+       if (bev)
+               bufferevent_free(bev);
+}
+
 struct testcase_t bufferevent_testcases[] = {
 
        LEGACY(bufferevent, TT_ISOLATED),
@@ -842,6 +959,16 @@ struct testcase_t bufferevent_testcases[] = {
          TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"filter" },
        { "bufferevent_timeout_filter_pair", test_bufferevent_timeouts,
          TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"filter pair" },
+       { "bufferevent_trigger", test_bufferevent_trigger, TT_FORK|TT_NEED_BASE,
+         &basic_setup, (void*)"" },
+       { "bufferevent_trigger_defer", test_bufferevent_trigger,
+         TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"defer" },
+       { "bufferevent_trigger_postpone", test_bufferevent_trigger,
+         TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup,
+         (void*)"postpone" },
+       { "bufferevent_trigger_defer_postpone", test_bufferevent_trigger,
+         TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup,
+         (void*)"defer postpone" },
 #ifdef EVENT__HAVE_LIBZ
        LEGACY(bufferevent_zlib, TT_ISOLATED),
 #else