]> granicus.if.org Git - libevent/commitdiff
Refactor our 'suspend operation' logic on bufferevents.
authorNick Mathewson <nickm@torproject.org>
Mon, 23 Nov 2009 23:34:32 +0000 (18:34 -0500)
committerNick Mathewson <nickm@torproject.org>
Fri, 4 Dec 2009 19:15:17 +0000 (14:15 -0500)
There are lots of things we do internally in bufferevents to indicate
"the user would like this operation to happen, but we aren't going to
try until some other condition goes away."  Our logic here has gotten
entirely too complicated.

This patch tries to fix that by adding the idea of 'suspend flags' for
read and write.  To say "don't bother reading or writing until
condition X no longer holds," bufferevent_suspend_read/write(bev,
BEV_SUSPEND_X).  When X no longer holds, call
bufferevent_unsuspend_read/write(bev, BEV_SUSPEND_X).

Right now, only the read-watermark logic uses this.

bufferevent-internal.h
bufferevent.c

index 937086ea6205da2144bdb288afe7347ba34877a1..a5ce7cb70d1b9cd5a1432a310227787f06bceb3c 100644 (file)
@@ -36,6 +36,28 @@ extern "C" {
 #include "evthread-internal.h"
 #include "event2/thread.h"
 
+/* These flags are reasons that we might be declining to actually enable
+   reading or writing on a bufferevent.
+ */
+
+/* On a all bufferevents, for reading: used when we have read up to the
+   watermark value.
+
+   On a filtering bufferxevent, for writing: used when the underlying
+   bufferevent's write buffer has been filled up to its watermark
+   value.
+*/
+#define BEV_SUSPEND_WM 0x01
+/* On a base bufferevent: when we have used up our bandwidth buckets. */
+#define BEV_SUSPEND_BW 0x02
+
+struct token_bucket {
+       uint32_t limit;
+       uint32_t rate;
+       uint32_t burst;
+       unsigned last_updated;
+};
+
 /** Parts of the bufferevent structure that are shared among all bufferevent
  * types, but not exposed in bufferevent_struct.h. */
 struct bufferevent_private {
@@ -45,8 +67,6 @@ struct bufferevent_private {
        /** Evbuffer callback to enforce watermarks on input. */
        struct evbuffer_cb_entry *read_watermarks_cb;
 
-       /** If set, read is suspended until evbuffer some. */
-       unsigned read_suspended : 1;
        /** If set, we should free the lock when we free the bufferevent. */
        unsigned own_lock : 1;
 
@@ -61,9 +81,21 @@ struct bufferevent_private {
        /** Set to the events pending if we have deferred callbacks and
         * an events callback is pending. */
        short eventcb_pending;
+
+       /** If set, read is suspended until one or more conditions are over.
+        * The actual value here is a bitfield of those conditions; see the
+        * BEV_SUSPEND_* flags above. */
+       short read_suspended;
+
+       /** If set, writing is suspended until one or more conditions are over.
+        * The actual value here is a bitfield of those conditions; see the
+        * BEV_SUSPEND_* flags above. */
+       short write_suspended;
+
        /** Set to the current socket errno if we have deferred callbacks and
         * an events callback is pending. */
        int errno_pending;
+
        /** Used to implement deferred callbacks */
        struct deferred_cb deferred;
 
@@ -155,12 +187,24 @@ extern const struct bufferevent_ops bufferevent_ops_async;
 /** Initialize the shared parts of a bufferevent. */
 int bufferevent_init_common(struct bufferevent_private *, struct event_base *, const struct bufferevent_ops *, enum bufferevent_options options);
 
-/** For internal use: temporarily stop all reads on bufev, because its
- * read buffer is too full. */
-void bufferevent_wm_suspend_read(struct bufferevent *bufev);
-/** For internal use: temporarily stop all reads on bufev, because its
- * read buffer is too full. */
-void bufferevent_wm_unsuspend_read(struct bufferevent *bufev);
+/** For internal use: temporarily stop all reads on bufev, until the conditions
+ * in 'what' are over. */
+void bufferevent_suspend_read(struct bufferevent *bufev, short what);
+/** For internal use: clear the conditions 'what' on bufev, and re-enable
+ * reading if there are no conditions left. */
+void bufferevent_unsuspend_read(struct bufferevent *bufev, short what);
+
+/** For internal use: temporarily stop all writes on bufev, until the conditions
+ * in 'what' are over. */
+void bufferevent_suspend_write(struct bufferevent *bufev, short what);
+/** For internal use: clear the conditions 'what' on bufev, and re-enable
+ * writing if there are no conditions left. */
+void bufferevent_unsuspend_write(struct bufferevent *bufev, short what);
+
+#define bufferevent_wm_suspend_read(b) \
+       bufferevent_suspend_read((b), BEV_SUSPEND_WM)
+#define bufferevent_wm_unsuspend_read(b) \
+       bufferevent_unsuspend_read((b), BEV_SUSPEND_WM)
 
 /** Internal: Set up locking on a bufferevent.  If lock is set, use it.
  * Otherwise, use a new lock. */
index 2794cff257400ed083f6b686a8174597090c3020..012950b25fd6d7dc21ceb8e2b099024af2ce9e73 100644 (file)
 #include "util-internal.h"
 
 void
-bufferevent_wm_suspend_read(struct bufferevent *bufev)
+bufferevent_suspend_read(struct bufferevent *bufev, short what)
 {
        struct bufferevent_private *bufev_private =
            EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
        BEV_LOCK(bufev);
-       if (!bufev_private->read_suspended) {
+       if (!bufev_private->read_suspended)
                bufev->be_ops->disable(bufev, EV_READ);
-               bufev_private->read_suspended = 1;
-       }
+       bufev_private->read_suspended |= what;
        BEV_UNLOCK(bufev);
 }
 
 void
-bufferevent_wm_unsuspend_read(struct bufferevent *bufev)
+bufferevent_unsuspend_read(struct bufferevent *bufev, short what)
 {
        struct bufferevent_private *bufev_private =
            EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
+       BEV_LOCK(bufev);
+       bufev_private->read_suspended &= ~what;
+       if (!bufev_private->read_suspended)
+               bufev->be_ops->enable(bufev, EV_READ);
+       BEV_UNLOCK(bufev);
+}
 
+void
+bufferevent_suspend_write(struct bufferevent *bufev, short what)
+{
+       struct bufferevent_private *bufev_private =
+           EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
        BEV_LOCK(bufev);
-       if (bufev_private->read_suspended) {
-               bufev_private->read_suspended = 0;
-               if (bufev->enabled & EV_READ)
-                       bufev->be_ops->enable(bufev, EV_READ);
-       }
+       if (!bufev_private->write_suspended)
+               bufev->be_ops->disable(bufev, EV_WRITE);
+       bufev_private->write_suspended |= what;
        BEV_UNLOCK(bufev);
 }
 
+void
+bufferevent_unsuspend_write(struct bufferevent *bufev, short what)
+{
+       struct bufferevent_private *bufev_private =
+           EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
+       BEV_LOCK(bufev);
+       bufev_private->write_suspended &= ~what;
+       if (!bufev_private->write_suspended)
+               bufev->be_ops->enable(bufev, EV_WRITE);
+       BEV_UNLOCK(bufev);
+}
+
+
 /* Callback to implement watermarks on the input buffer.  Only enabled
  * if the watermark is set. */
 static void
@@ -338,10 +359,12 @@ bufferevent_enable(struct bufferevent *bufev, short event)
        _bufferevent_incref_and_lock(bufev);
        if (bufev_private->read_suspended)
                impl_events &= ~EV_READ;
+       if (bufev_private->write_suspended)
+               impl_events &= ~EV_WRITE;
 
        bufev->enabled |= event;
 
-       if (bufev->be_ops->enable(bufev, impl_events) < 0)
+       if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0)
                r = -1;
 
        _bufferevent_decref_and_unlock(bufev);