]> granicus.if.org Git - libevent/commitdiff
Add a generic way for any bufferevent to make its callback deferred
authorNick Mathewson <nickm@torproject.org>
Fri, 17 Apr 2009 23:12:34 +0000 (23:12 +0000)
committerNick Mathewson <nickm@torproject.org>
Fri, 17 Apr 2009 23:12:34 +0000 (23:12 +0000)
svn:r1197

ChangeLog
bufferevent-internal.h
bufferevent.c
bufferevent_filter.c
bufferevent_pair.c
bufferevent_sock.c
include/event2/bufferevent.h

index d94080fbf4fa2c95b0beb25066f0a313bac00970..de6e8f9a8180b3c08f67cc03f04acc93ffc79a7a 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,4 +1,7 @@
-Changes in current version:
+Changes in 2.0.2-alpha:
+ o Add a new flag to bufferevents to make all callbacks automatically deferred.
+
+Changes in 2.0.1-alpha:
  o free minheap on event_base_free(); from Christopher Layne
  o debug cleanups in signal.c; from Christopher Layne
  o provide event_base_new() that does not set the current_base global
index 1fb78716094af96e9b3de1eaa7301bfe0e88d363..57748ebc0b98e186b3c54ba245716af1e7cfd6d6 100644 (file)
@@ -44,8 +44,15 @@ struct bufferevent_private {
 
        /** If set, read is suspended until evbuffer some. */
        unsigned read_suspended : 1;
+       /** If set, we should free the lock when we free the bufferevent. */
        unsigned own_lock : 1;
 
+       unsigned readcb_pending : 1;
+       unsigned writecb_pending : 1;
+       short errorcb_pending;
+       int errno_pending;
+       struct deferred_cb deferred;
+
        enum bufferevent_options options;
 
        int refcnt;
@@ -113,6 +120,10 @@ int bufferevent_enable_locking(struct bufferevent *bufev, void *lock);
 void bufferevent_incref(struct bufferevent *bufev);
 void _bufferevent_decref_and_unlock(struct bufferevent *bufev);
 
+void _bufferevent_run_readcb(struct bufferevent *bufev);
+void _bufferevent_run_writecb(struct bufferevent *bufev);
+void _bufferevent_run_errorcb(struct bufferevent *bufev, short what);
+
 #define BEV_UPCAST(b) EVUTIL_UPCAST((b), struct bufferevent_private, bev)
 
 #define BEV_LOCK(b) do {                                               \
index 7b6c94f2458164404e618f17a08a363fa8f08708..7f3c240a11b3840850728d90ee03ecf6180653d5 100644 (file)
@@ -46,6 +46,7 @@
 #ifdef WIN32
 #include <winsock2.h>
 #endif
+#include <errno.h>
 
 #include "event2/util.h"
 #include "event2/bufferevent.h"
@@ -112,6 +113,87 @@ bufferevent_inbuf_wm_cb(struct evbuffer *buf,
        }
 }
 
+static void
+bufferevent_run_deferred_callbacks(struct deferred_cb *_, void *arg)
+{
+       struct bufferevent_private *bufev_private = arg;
+       struct bufferevent *bufev = &bufev_private->bev;
+
+       BEV_LOCK(bufev);
+       if (bufev_private->readcb_pending && bufev->readcb) {
+               bufev_private->readcb_pending = 0;
+               bufev->readcb(bufev, bufev->cbarg);
+       }
+       if (bufev_private->writecb_pending && bufev->writecb) {
+               bufev_private->writecb_pending = 0;
+               bufev->writecb(bufev, bufev->cbarg);
+       }
+       if (bufev_private->errorcb_pending && bufev->errorcb) {
+               short what = bufev_private->errorcb_pending;
+               int err = bufev_private->errno_pending;
+               bufev_private->errorcb_pending = 0;
+               bufev_private->errno_pending = 0;
+               EVUTIL_SET_SOCKET_ERROR(err);
+               bufev->errorcb(bufev, what, bufev->cbarg);
+       }
+       _bufferevent_decref_and_unlock(bufev);
+}
+
+void
+_bufferevent_run_readcb(struct bufferevent *bufev)
+{
+       /* Requires lock. */
+       struct bufferevent_private *p =
+           EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
+       if (p->options & BEV_OPT_DEFER_CALLBACKS) {
+               p->readcb_pending = 1;
+               if (!p->deferred.queued) {
+                       bufferevent_incref(bufev);
+                       event_deferred_cb_schedule(
+                               bufev->ev_base, &p->deferred);
+               }
+       } else {
+               bufev->readcb(bufev, bufev->cbarg);
+       }
+}
+
+void
+_bufferevent_run_writecb(struct bufferevent *bufev)
+{
+       /* Requires lock. */
+       struct bufferevent_private *p =
+           EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
+       if (p->options & BEV_OPT_DEFER_CALLBACKS) {
+               p->writecb_pending = 1;
+               if (!p->deferred.queued) {
+                       bufferevent_incref(bufev);
+                       event_deferred_cb_schedule(
+                               bufev->ev_base, &p->deferred);
+               }
+       } else {
+               bufev->writecb(bufev, bufev->cbarg);
+       }
+}
+
+void
+_bufferevent_run_errorcb(struct bufferevent *bufev, short what)
+{
+       /* Requires lock. */
+       struct bufferevent_private *p =
+           EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
+       if (p->options & BEV_OPT_DEFER_CALLBACKS) {
+               p->errorcb_pending |= what;
+               p->errno_pending = EVUTIL_SOCKET_ERROR();
+               if (!p->deferred.queued) {
+                       bufferevent_incref(bufev);
+                       event_deferred_cb_schedule(
+                               bufev->ev_base, &p->deferred);
+               }
+       } else {
+               bufev->errorcb(bufev, what, bufev->cbarg);
+       }
+}
+
 int
 bufferevent_init_common(struct bufferevent_private *bufev_private,
     struct event_base *base,
@@ -152,6 +234,11 @@ bufferevent_init_common(struct bufferevent_private *bufev_private,
                }
        }
 #endif
+       if (options & BEV_OPT_DEFER_CALLBACKS) {
+               event_deferred_cb_init(&bufev_private->deferred,
+                   bufferevent_run_deferred_callbacks,
+                   bufev_private);
+       }
 
        bufev_private->options = options;
 
index 9208300ea01ac1fd058d5d4c47a6426dac6a3efe..5cf83f42e818e0e5f35fe311f8e40ce003d3cbcf 100644 (file)
@@ -343,7 +343,7 @@ be_filter_process_output(struct bufferevent_filtered *bevf,
                 if (processed && bufev->writecb &&
                     evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {
                         /* call the write callback.*/
-                        (*bufev->writecb)(bufev, bufev->cbarg);
+                        _bufferevent_run_writecb(bufev);
 
                         if (res == BEV_OK &&
                             (bufev->enabled & EV_WRITE) &&
@@ -396,7 +396,7 @@ be_filter_readcb(struct bufferevent *underlying, void *_me)
        if (processed_any &&
             evbuffer_get_length(bufev->input) >= bufev->wm_read.low &&
             bufev->readcb != NULL)
-               (*bufev->readcb)(bufev, bufev->cbarg);
+               _bufferevent_run_readcb(bufev);
 }
 
 /* Called when the underlying socket has drained enough that we can write to
@@ -419,7 +419,7 @@ be_filter_errorcb(struct bufferevent *underlying, short what, void *_me)
 
        /* All we can really to is tell our own errorcb. */
        if (bev->errorcb)
-               bev->errorcb(bev, what, bev->cbarg);
+               _bufferevent_run_errorcb(bev, what);
 }
 
 static int
index 6080421c0cee7ba233a0c4df0b851637faff660b..068a80a84d1da39ec328a4244ab2cd99b6dc9d32 100644 (file)
@@ -44,8 +44,6 @@
 struct bufferevent_pair {
        struct bufferevent_private bev;
        struct bufferevent_pair *partner;
-       struct deferred_cb deferred_write_cb;
-       struct deferred_cb deferred_read_cb;
 };
 
 
@@ -69,25 +67,6 @@ upcast(struct bufferevent *bev)
 static void be_pair_outbuf_cb(struct evbuffer *,
     const struct evbuffer_cb_info *, void *);
 
-static void
-run_callback(struct deferred_cb *cb, void *arg)
-{
-       struct bufferevent_pair *bufev = arg;
-       struct bufferevent *bev = downcast(bufev);
-
-       BEV_LOCK(bev);
-       if (cb == &bufev->deferred_read_cb) {
-               if (bev->readcb) {
-                       bev->readcb(bev, bev->cbarg);
-               }
-       } else {
-               if (bev->writecb) {
-                       bev->writecb(bev, bev->cbarg);
-               }
-       }
-       BEV_UNLOCK(bev);
-}
-
 static struct bufferevent_pair *
 bufferevent_pair_elt_new(struct event_base *base,
     enum bufferevent_options options)
@@ -106,8 +85,6 @@ bufferevent_pair_elt_new(struct event_base *base,
                bufferevent_free(downcast(bufev));
                return NULL;
        }
-       event_deferred_cb_init(&bufev->deferred_read_cb, run_callback, bufev);
-       event_deferred_cb_init(&bufev->deferred_write_cb, run_callback, bufev);
 
        return bufev;
 }
@@ -117,7 +94,10 @@ bufferevent_pair_new(struct event_base *base, enum bufferevent_options options,
     struct bufferevent *pair[2])
 {
         struct bufferevent_pair *bufev1 = NULL, *bufev2 = NULL;
-       enum bufferevent_options tmp_options = options & ~BEV_OPT_THREADSAFE;
+       enum bufferevent_options tmp_options;
+
+       options |= BEV_OPT_DEFER_CALLBACKS;
+       tmp_options = options & ~BEV_OPT_THREADSAFE;
 
        bufev1 = bufferevent_pair_elt_new(base, options);
        if (!bufev1)
@@ -175,12 +155,10 @@ be_pair_transfer(struct bufferevent *src, struct bufferevent *dst,
        dst_size = evbuffer_get_length(dst->input);
 
        if (dst_size >= dst->wm_read.low && dst->readcb) {
-               event_deferred_cb_schedule(dst->ev_base,
-                   &(upcast(dst)->deferred_read_cb));
+               _bufferevent_run_readcb(dst);
        }
        if (src_size <= src->wm_write.low && src->writecb) {
-               event_deferred_cb_schedule(src->ev_base,
-                   &(upcast(src)->deferred_write_cb));
+               _bufferevent_run_writecb(src);
        }
 done:
        evbuffer_freeze(src->output, 1);
@@ -247,8 +225,6 @@ be_pair_destruct(struct bufferevent *bev)
                bev_p->partner->partner = NULL;
                bev_p->partner = NULL;
        }
-       event_deferred_cb_cancel(bev->ev_base, &bev_p->deferred_write_cb);
-       event_deferred_cb_cancel(bev->ev_base, &bev_p->deferred_read_cb);
 }
 
 static void
index 614e0e56a752e82adeda867471b3b33aaadead35..634da83a5e1642d514e24f925e1c0f712307fe4a 100644 (file)
@@ -156,7 +156,7 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
        /* Invoke the user callback - must always be called last */
        if (evbuffer_get_length(input) >= bufev->wm_read.low &&
             bufev->readcb != NULL)
-               (*bufev->readcb)(bufev, bufev->cbarg);
+               _bufferevent_run_readcb(bufev);
 
        return;
 
@@ -165,8 +165,7 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
 
  error:
        event_del(&bufev->ev_read);
-       (*bufev->errorcb)(bufev, what, bufev->cbarg);
-
+       _bufferevent_run_errorcb(bufev, what);
 }
 
 static void
@@ -207,7 +206,7 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
         */
        if (bufev->writecb != NULL &&
            evbuffer_get_length(bufev->output) <= bufev->wm_write.low)
-               (*bufev->writecb)(bufev, bufev->cbarg);
+               _bufferevent_run_writecb(bufev);
 
        return;
 
@@ -218,7 +217,7 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
 
  error:
        event_del(&bufev->ev_write);
-       (*bufev->errorcb)(bufev, what, bufev->cbarg);
+       _bufferevent_run_errorcb(bufev, what);
 }
 
 struct bufferevent *
index 6f218465469761ba07a57636028fc47c6c5ff33e..94cebe2761e5a4a1f764efe22e347c50614fbc55 100644 (file)
@@ -124,6 +124,9 @@ enum bufferevent_options {
        /** If set, and threading is enabled, operations on this bufferevent
         * are protected by a lock */
        BEV_OPT_THREADSAFE = (1<<1),
+
+       /** If set, callbacks are run deferred in the event loop. */
+       BEV_OPT_DEFER_CALLBACKS = (1<<2),
 };
 
 /**