Add an option to trigger bufferevent event callbacks
authorOndřej Kuzník <ondra@mistotebe.net>
Tue, 3 Dec 2013 23:01:54 +0000 (23:01 +0000)
committerOndřej Kuzník <ondra@mistotebe.net>
Tue, 3 Dec 2013 23:39:13 +0000 (23:39 +0000)
bufferevent-internal.h
bufferevent.c
bufferevent_async.c
bufferevent_filter.c
bufferevent_openssl.c
bufferevent_pair.c
bufferevent_sock.c
include/event2/bufferevent.h
test/regress_bufferevent.c

index 0fa690b9225753034f3f12a96cd427d4a9585d2f..12ae142baa6f067f58b7d6af2e0c46260b3a3eda 100644 (file)
@@ -346,8 +346,9 @@ void bufferevent_run_readcb_(struct bufferevent *bufev, int options);
  * a writecb.  Otherwise just run the writecb. */
 void bufferevent_run_writecb_(struct bufferevent *bufev, int options);
 /** Internal: If callbacks are deferred and we have an eventcb, schedule
- * it to run with events "what".  Otherwise just run the eventcb. */
-void bufferevent_run_eventcb_(struct bufferevent *bufev, short what);
+ * it to run with events "what".  Otherwise just run the eventcb.
+ * See bufferevent_trigger_event for meaning of "options". */
+void bufferevent_run_eventcb_(struct bufferevent *bufev, short what, int options);
 
 /** Internal: Run or schedule (if deferred or options contain
  * BEV_TRIG_DEFER_CALLBACKS) I/O callbacks specified in iotype.
index fd95941bae301adac33f76214bfa7dd475ab4f6f..5f424d7e72723e33b0c5a43cbd11656348303d36 100644 (file)
@@ -272,14 +272,15 @@ bufferevent_trigger(struct bufferevent *bufev, short iotype, int options)
 }
 
 void
-bufferevent_run_eventcb_(struct bufferevent *bufev, short what)
+bufferevent_run_eventcb_(struct bufferevent *bufev, short what, int options)
 {
        /* Requires that we hold the lock and a reference */
        struct bufferevent_private *p =
            EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
        if (bufev->errorcb == NULL)
                return;
-       if (p->options & BEV_OPT_DEFER_CALLBACKS) {
+       if ((p->options & BEV_OPT_DEFER_CALLBACKS) ||
+           (options & BEV_TRIG_DEFER_CALLBACKS)) {
                p->eventcb_pending |= what;
                p->errno_pending = EVUTIL_SOCKET_ERROR();
                SCHEDULE_DEFERRED(p);
@@ -288,6 +289,14 @@ bufferevent_run_eventcb_(struct bufferevent *bufev, short what)
        }
 }
 
+void
+bufferevent_trigger_event(struct bufferevent *bufev, short what, int options)
+{
+       bufferevent_incref_and_lock_(bufev);
+       bufferevent_run_eventcb_(bufev, what, options);
+       bufferevent_decref_and_unlock_(bufev);
+}
+
 int
 bufferevent_init_common_(struct bufferevent_private *bufev_private,
     struct event_base *base,
@@ -914,7 +923,7 @@ bufferevent_generic_read_timeout_cb(evutil_socket_t fd, short event, void *ctx)
        struct bufferevent *bev = ctx;
        bufferevent_incref_and_lock_(bev);
        bufferevent_disable(bev, EV_READ);
-       bufferevent_run_eventcb_(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_READING);
+       bufferevent_run_eventcb_(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_READING, 0);
        bufferevent_decref_and_unlock_(bev);
 }
 static void
@@ -923,7 +932,7 @@ bufferevent_generic_write_timeout_cb(evutil_socket_t fd, short event, void *ctx)
        struct bufferevent *bev = ctx;
        bufferevent_incref_and_lock_(bev);
        bufferevent_disable(bev, EV_WRITE);
-       bufferevent_run_eventcb_(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING);
+       bufferevent_run_eventcb_(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING, 0);
        bufferevent_decref_and_unlock_(bev);
 }
 
index 4e686479776edaa56af9302a2cc4aec167db904e..137ad24797feda682f293f3f516bf0963e686c6a 100644 (file)
@@ -217,7 +217,7 @@ bev_async_consider_writing(struct bufferevent_async *beva)
            &beva->write_overlapped)) {
                bufferevent_decref_(bev);
                beva->ok = 0;
-               bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR);
+               bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0);
        } else {
                beva->write_in_progress = at_most;
                bufferevent_decrement_write_buckets_(&beva->bev, at_most);
@@ -270,7 +270,7 @@ bev_async_consider_reading(struct bufferevent_async *beva)
        bufferevent_incref_(bev);
        if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) {
                beva->ok = 0;
-               bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR);
+               bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0);
                bufferevent_decref_(bev);
        } else {
                beva->read_in_progress = at_most;
@@ -428,7 +428,7 @@ connect_complete(struct event_overlapped *eo, ev_uintptr_t key,
                bev_async_set_wsa_error(bev, eo);
 
        bufferevent_run_eventcb_(bev,
-                       ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR);
+                       ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0);
 
        event_base_del_virtual_(bev->ev_base);
 
@@ -463,11 +463,11 @@ read_complete(struct event_overlapped *eo, ev_uintptr_t key,
                } else if (!ok) {
                        what |= BEV_EVENT_ERROR;
                        bev_a->ok = 0;
-                       bufferevent_run_eventcb_(bev, what);
+                       bufferevent_run_eventcb_(bev, what, 0);
                } else if (!nbytes) {
                        what |= BEV_EVENT_EOF;
                        bev_a->ok = 0;
-                       bufferevent_run_eventcb_(bev, what);
+                       bufferevent_run_eventcb_(bev, what, 0);
                }
        }
 
@@ -506,11 +506,11 @@ write_complete(struct event_overlapped *eo, ev_uintptr_t key,
                } else if (!ok) {
                        what |= BEV_EVENT_ERROR;
                        bev_a->ok = 0;
-                       bufferevent_run_eventcb_(bev, what);
+                       bufferevent_run_eventcb_(bev, what, 0);
                } else if (!nbytes) {
                        what |= BEV_EVENT_EOF;
                        bev_a->ok = 0;
-                       bufferevent_run_eventcb_(bev, what);
+                       bufferevent_run_eventcb_(bev, what, 0);
                }
        }
 
index cb1c00973bf2bb7c64573414d743d00c30d5fcb5..af71ebeeae912095e9acdfbcbab55f533dea8365 100644 (file)
@@ -470,7 +470,7 @@ be_filter_eventcb(struct bufferevent *underlying, short what, void *me_)
 
        bufferevent_incref_and_lock_(bev);
        /* All we can really to is tell our own eventcb. */
-       bufferevent_run_eventcb_(bev, what);
+       bufferevent_run_eventcb_(bev, what, 0);
        bufferevent_decref_and_unlock_(bev);
 }
 
index ed9e4a3dc87be693b8afebe2c34297f9f16c6183..3ce491ef372d2dea75395c31ec342ab36a9f5fdb 100644 (file)
@@ -533,7 +533,7 @@ conn_closed(struct bufferevent_openssl *bev_ssl, int when, int errcode, int ret)
 
        /* when is BEV_EVENT_{READING|WRITING} */
        event = when | event;
-       bufferevent_run_eventcb_(&bev_ssl->bev.bev, event);
+       bufferevent_run_eventcb_(&bev_ssl->bev.bev, event, 0);
 }
 
 static void
@@ -921,7 +921,7 @@ be_openssl_eventcb(struct bufferevent *bev_base, short what, void *ctx)
                   eat it. */
        }
        if (event)
-               bufferevent_run_eventcb_(&bev_ssl->bev.bev, event);
+               bufferevent_run_eventcb_(&bev_ssl->bev.bev, event, 0);
 }
 
 static void
@@ -931,7 +931,7 @@ be_openssl_readeventcb(evutil_socket_t fd, short what, void *ptr)
        bufferevent_incref_and_lock_(&bev_ssl->bev.bev);
        if (what == EV_TIMEOUT) {
                bufferevent_run_eventcb_(&bev_ssl->bev.bev,
-                   BEV_EVENT_TIMEOUT|BEV_EVENT_READING);
+                   BEV_EVENT_TIMEOUT|BEV_EVENT_READING, 0);
        } else {
                consider_reading(bev_ssl);
        }
@@ -945,7 +945,7 @@ be_openssl_writeeventcb(evutil_socket_t fd, short what, void *ptr)
        bufferevent_incref_and_lock_(&bev_ssl->bev.bev);
        if (what == EV_TIMEOUT) {
                bufferevent_run_eventcb_(&bev_ssl->bev.bev,
-                   BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING);
+                   BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING, 0);
        } else {
                consider_writing(bev_ssl);
        }
@@ -1012,7 +1012,7 @@ do_handshake(struct bufferevent_openssl *bev_ssl)
                /* Call do_read and do_write as needed */
                bufferevent_enable(&bev_ssl->bev.bev, bev_ssl->bev.bev.enabled);
                bufferevent_run_eventcb_(&bev_ssl->bev.bev,
-                   BEV_EVENT_CONNECTED);
+                   BEV_EVENT_CONNECTED, 0);
                return 1;
        } else {
                int err = SSL_get_error(bev_ssl->ssl, r);
@@ -1051,7 +1051,7 @@ be_openssl_handshakeeventcb(evutil_socket_t fd, short what, void *ptr)
 
        bufferevent_incref_and_lock_(&bev_ssl->bev.bev);
        if (what & EV_TIMEOUT) {
-               bufferevent_run_eventcb_(&bev_ssl->bev.bev, BEV_EVENT_TIMEOUT);
+               bufferevent_run_eventcb_(&bev_ssl->bev.bev, BEV_EVENT_TIMEOUT, 0);
        } else
                do_handshake(bev_ssl);/* XXX handle failure */
        bufferevent_decref_and_unlock_(&bev_ssl->bev.bev);
index eb3da3e316c5b2f5b58a69760ff99239664d23a9..5e2e2c41ed99f458dddbf4dbe53aa2f5da207a3a 100644 (file)
@@ -292,7 +292,7 @@ be_pair_flush(struct bufferevent *bev, short iotype,
                be_pair_transfer(bev, partner, 1);
 
        if (mode == BEV_FINISHED) {
-               bufferevent_run_eventcb_(partner, iotype|BEV_EVENT_EOF);
+               bufferevent_run_eventcb_(partner, iotype|BEV_EVENT_EOF, 0);
        }
        decref_and_unlock(bev);
        return 0;
index 82983ed7389829aab1cbcc930bbedc179137970d..49ebc0bef8ff9ff4411f7ec92444339350c37f2b 100644 (file)
@@ -193,7 +193,7 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
 
  error:
        bufferevent_disable(bufev, EV_READ);
-       bufferevent_run_eventcb_(bufev, what);
+       bufferevent_run_eventcb_(bufev, what, 0);
 
  done:
        bufferevent_decref_and_unlock_(bufev);
@@ -235,7 +235,7 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
                if (c < 0) {
                        event_del(&bufev->ev_write);
                        event_del(&bufev->ev_read);
-                       bufferevent_run_eventcb_(bufev, BEV_EVENT_ERROR);
+                       bufferevent_run_eventcb_(bufev, BEV_EVENT_ERROR, 0);
                        goto done;
                } else {
                        connected = 1;
@@ -244,12 +244,12 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
                                event_del(&bufev->ev_write);
                                bufferevent_async_set_connected_(bufev);
                                bufferevent_run_eventcb_(bufev,
-                                               BEV_EVENT_CONNECTED);
+                                               BEV_EVENT_CONNECTED, 0);
                                goto done;
                        }
 #endif
                        bufferevent_run_eventcb_(bufev,
-                                       BEV_EVENT_CONNECTED);
+                                       BEV_EVENT_CONNECTED, 0);
                        if (!(bufev->enabled & EV_WRITE) ||
                            bufev_p->write_suspended) {
                                event_del(&bufev->ev_write);
@@ -307,7 +307,7 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
 
  error:
        bufferevent_disable(bufev, EV_WRITE);
-       bufferevent_run_eventcb_(bufev, what);
+       bufferevent_run_eventcb_(bufev, what, 0);
 
  done:
        bufferevent_decref_and_unlock_(bufev);
@@ -424,7 +424,7 @@ bufferevent_socket_connect(struct bufferevent *bev,
        goto done;
 
 freesock:
-       bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR);
+       bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0);
        if (ownfd)
                evutil_closesocket(fd);
        /* do something about the error? */
@@ -448,7 +448,7 @@ bufferevent_connect_getaddrinfo_cb(int result, struct evutil_addrinfo *ai,
 
        if (result != 0) {
                bev_p->dns_error = result;
-               bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR);
+               bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0);
                bufferevent_decref_and_unlock_(bev);
                if (ai)
                        evutil_freeaddrinfo(ai);
index af6f7cdee41b92867d5e4d2a6e89bc645045672f..aef408b825c58cee4b358a13616b1e6bc9a5bc7d 100644 (file)
@@ -584,6 +584,18 @@ enum bufferevent_trigger_options {
 void bufferevent_trigger(struct bufferevent *bufev, short iotype,
     int options);
 
+/**
+   Triggers the bufferevent event callback.
+
+   If the options contain BEV_OPT_DEFER_CALLBACKS, the callbacks are deferred.
+
+   @param bufev the bufferevent object
+   @param what the flags to pass onto the event callback
+   @param options
+ */
+void bufferevent_trigger_event(struct bufferevent *bufev, short what,
+    int options);
+
 /**
    @name Filtering support
 
index 874d6018ebb1a72785ca8e8c73e600a70f5d18f5..a6a27752e42b6e7ccfc4fec1f29868dfd12d0100 100644 (file)
@@ -820,13 +820,23 @@ trigger_failure_cb(evutil_socket_t fd, short what, void *ctx)
 }
 
 static void
-trigger_readcb_triggered(struct bufferevent *bev, void *ctx)
+trigger_eventcb(struct bufferevent *bev, short what, void *ctx)
 {
        struct event_base *base = ctx;
+       if (what == ~0) {
+               TT_BLATHER(("Event successfully triggered."));
+               event_base_loopexit(base, NULL);
+               return;
+       }
+       reader_eventcb(bev, what, ctx);
+}
 
+static void
+trigger_readcb_triggered(struct bufferevent *bev, void *ctx)
+{
        TT_BLATHER(("Read successfully triggered."));
        n_reads_invoked++;
-       event_base_loopexit(base, NULL);
+       bufferevent_trigger_event(bev, ~0, bufferevent_trigger_test_flags);
 }
 
 static void
@@ -840,7 +850,7 @@ trigger_readcb(struct bufferevent *bev, void *ctx)
        TT_BLATHER(("Read invoked on %d.", (int)bufferevent_getfd(bev)));
        expected_reads = ++n_reads_invoked;
 
-       bufferevent_setcb(bev, trigger_readcb_triggered, NULL, reader_eventcb, ctx);
+       bufferevent_setcb(bev, trigger_readcb_triggered, NULL, trigger_eventcb, ctx);
 
        bufferevent_getwatermark(bev, EV_READ, &low, &high);
        len = evbuffer_get_length(bufferevent_get_input(bev));
@@ -912,7 +922,7 @@ test_bufferevent_trigger(void *arg)
        tt_assert(!evconnlistener_enable(lev));
        bev = bufferevent_socket_new(data->base, -1, be_flags);
        tt_assert(bev);
-       bufferevent_setcb(bev, trigger_readcb, NULL, reader_eventcb, data->base);
+       bufferevent_setcb(bev, trigger_readcb, NULL, trigger_eventcb, data->base);
 
        bufferevent_enable(bev, EV_READ);