]> granicus.if.org Git - libevent/commitdiff
Rate-limiting for bufferevents; group and individual limits are supported.
authorNick Mathewson <nickm@torproject.org>
Fri, 27 Nov 2009 18:16:54 +0000 (13:16 -0500)
committerNick Mathewson <nickm@torproject.org>
Mon, 28 Dec 2009 21:11:18 +0000 (16:11 -0500)
The fairness algorithms are not the best, not every bufferevent type
is supported, and some of the locking tricks here are simply absurd.
Still, this code should be a good first step.

Makefile.am
bufferevent-internal.h
bufferevent.c
bufferevent_async.c
bufferevent_openssl.c
bufferevent_ratelim.c [new file with mode: 0644]
bufferevent_sock.c
include/event2/bufferevent.h
include/event2/util.h
ratelim-internal.h [new file with mode: 0644]

index 08fb0fdd5688bff826b420df1922920ee58bd22d..52b15023d541bdd9f441e7c16fd99b7742de8e42 100644 (file)
@@ -105,7 +105,7 @@ event-config.h: config.h
 
 CORE_SRC = event.c evthread.c buffer.c \
        bufferevent.c bufferevent_sock.c bufferevent_filter.c \
-       bufferevent_pair.c listener.c \
+       bufferevent_pair.c listener.c bufferevent_ratelim.c \
        evmap.c log.c evutil.c strlcpy.c $(SYS_SRC)
 EXTRA_SRC = event_tagging.c http.c evdns.c evrpc.c
 
@@ -136,7 +136,8 @@ noinst_HEADERS = util-internal.h mm-internal.h ipv6-internal.h \
        evrpc-internal.h strlcpy-internal.h evbuffer-internal.h \
        bufferevent-internal.h http-internal.h event-internal.h \
        evthread-internal.h ht-internal.h defer-internal.h \
-       minheap-internal.h log-internal.h evsignal-internal.h evmap-internal.h
+       minheap-internal.h log-internal.h evsignal-internal.h evmap-internal.h \
+       ratelim-internal.h
 
 include_HEADERS = event.h evhttp.h evdns.h evrpc.h evutil.h
 
index c3c33ecacea7fa134dfda61dc62de468d584fd7a..220cdeb72ef69fa768826dddb4a4a1c777b31821 100644 (file)
@@ -35,6 +35,7 @@ extern "C" {
 #include "defer-internal.h"
 #include "evthread-internal.h"
 #include "event2/thread.h"
+#include "ratelim-internal.h"
 
 /* These flags are reasons that we might be declining to actually enable
    reading or writing on a bufferevent.
@@ -43,22 +44,77 @@ extern "C" {
 /* On a all bufferevents, for reading: used when we have read up to the
    watermark value.
 
-   On a filtering bufferxevent, for writing: used when the underlying
+   On a filtering bufferevent, for writing: used when the underlying
    bufferevent's write buffer has been filled up to its watermark
    value.
 */
 #define BEV_SUSPEND_WM 0x01
-/* On a base bufferevent: when we have used up our bandwidth buckets. */
+/* On a base bufferevent: when we have emptied a bandwidth buckets */
 #define BEV_SUSPEND_BW 0x02
+/* On a base bufferevent: when we have emptied the group's bandwidth bucket. */
+#define BEV_SUSPEND_BW_GROUP 0x04
 /* On a socket bufferevent: we aren't going to try reading until the
  * connect operation is done. */
-#define BEV_SUSPEND_CONNECTING 0x04
+#define BEV_SUSPEND_CONNECTING 0x08
+
+struct bufferevent_rate_limit_group {
+       /** List of all members in the group */
+       TAILQ_HEAD(rlim_group_member_list, bufferevent_private) members;
+       /** Current limits for the group. */
+       struct ev_token_bucket rate_limit;
+       struct ev_token_bucket_cfg rate_limit_cfg;
+
+       /** True iff we don't want to read from any member of the group.until
+        * the token bucket refills.  */
+       unsigned read_suspended : 1;
+       /** True iff we don't want to write from any member of the group.until
+        * the token bucket refills.  */
+       unsigned write_suspended : 1;
+       /** True iff we were unable to suspend one of the bufferevents in the
+        * group for reading the last time we tried, and we should try
+        * again. */
+       unsigned pending_unsuspend_read : 1;
+       /** True iff we were unable to suspend one of the bufferevents in the
+        * group for writing the last time we tried, and we should try
+        * again. */
+       unsigned pending_unsuspend_write : 1;
+
+       /** The number of bufferevents in the group. */
+       int n_members;
+
+       /** The smallest number of bytes that any member of the group should
+        * be limited to read or write at a time. */
+       ev_uint32_t min_share;
+       /** Timeout event that goes off once a tick, when the bucket is ready
+        * to refill. */
+       struct event master_refill_event;
+       /** Lock to protect the members of this group.  This lock should nest
+        * within every bufferevent lock: if you are holding this lock, do
+        * not assume you can lock another bufferevent. */
+       void *lock;
+};
 
-struct token_bucket {
-       ev_uint32_t limit;
-       ev_uint32_t rate;
-       ev_uint32_t burst;
-       unsigned last_updated;
+/** Fields for rate-limiting a single bufferevent. */
+struct bufferevent_rate_limit {
+       /* Linked-list elements for storing this bufferevent_private in a
+        * group.
+        *
+        * Note that this field is supposed to be protected by the group
+        * lock */
+       TAILQ_ENTRY(bufferevent_private) next_in_group;
+       /** The rate-limiting group for this bufferevent, or NULL if it is
+        * only rate-limited on its own. */
+       struct bufferevent_rate_limit_group *group;
+
+       /* This bufferevent's current limits. */
+       struct ev_token_bucket limit;
+       /* Pointer to the rate-limit configuration for this bufferevent.
+        * Can be shared.  XXX reference-count this? */
+       struct ev_token_bucket_cfg *cfg;
+
+       /* Timeout event used when one this bufferevent's buckets are
+        * empty. */
+       struct event refill_bucket_event;
 };
 
 /** Parts of the bufferevent structure that are shared among all bufferevent
@@ -111,6 +167,9 @@ struct bufferevent_private {
        /** Lock for this bufferevent.  Shared by the inbuf and the outbuf.
         * If NULL, locking is disabled. */
        void *lock;
+
+       /** Rate-limiting information for this bufferevent */
+       struct bufferevent_rate_limit *rate_limiting;
 };
 
 /** Possible operations for a control callback. */
@@ -170,6 +229,7 @@ struct bufferevent_ops {
 
        /** Called to access miscellaneous fields. */
        int (*ctrl)(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
+
 };
 
 extern const struct bufferevent_ops bufferevent_ops_socket;
@@ -287,6 +347,15 @@ void _bufferevent_generic_adj_timeouts(struct bufferevent *bev);
                        EVLOCK_UNLOCK(locking->lock, 0);                \
        } while(0)
 
+/* ==== For rate-limiting. */
+
+int _bufferevent_decrement_write_buckets(struct bufferevent_private *bev,
+    int bytes);
+int _bufferevent_decrement_read_buckets(struct bufferevent_private *bev,
+    int bytes);
+int _bufferevent_get_read_max(struct bufferevent_private *bev);
+int _bufferevent_get_write_max(struct bufferevent_private *bev);
+
 #ifdef __cplusplus
 }
 #endif
index 96b8ec7d7ad28b433d61ab31ad0073b70da0ff3c..94a007141f8f14143b0b3d5156e110895adcd234 100644 (file)
@@ -525,6 +525,15 @@ _bufferevent_decref_and_unlock(struct bufferevent *bufev)
        evbuffer_free(bufev->input);
        evbuffer_free(bufev->output);
 
+       if (bufev_private->rate_limiting) {
+               if (bufev_private->rate_limiting->group)
+                       bufferevent_remove_from_rate_limit_group(bufev);
+               if (event_initialized(&bufev_private->rate_limiting->refill_bucket_event))
+                       event_del(&bufev_private->rate_limiting->refill_bucket_event);
+               mm_free(bufev_private->rate_limiting);
+               bufev_private->rate_limiting = NULL;
+       }
+
        BEV_UNLOCK(bufev);
        if (bufev_private->own_lock)
                EVTHREAD_FREE_LOCK(bufev_private->lock,
index a8e92b70ee60932339a686f5129d74b465766b00..2075301b763b8e25b6b4cc6b866d91ff807584f0 100644 (file)
@@ -127,6 +127,8 @@ upcast_write(struct event_overlapped *eo)
 static void
 bev_async_consider_writing(struct bufferevent_async *b)
 {
+       size_t at_most;
+       int limit;
        /* Don't write if there's a write in progress, or we do not
         * want to write. */
        if (!b->ok || b->write_in_progress || !(b->bev.bev.enabled&EV_WRITE))
@@ -135,8 +137,18 @@ bev_async_consider_writing(struct bufferevent_async *b)
        if (!evbuffer_get_length(b->bev.bev.output))
                return;
 
+       at_most = evbuffer_get_length(b->bev.bev.output);
+
+       /* XXXX This over-commits. */
+       limit = _bufferevent_get_write_max(&b->bev);
+       if (at_most >= limit)
+               at_most = limit;
+
+       if (b->bev.write_suspended)
+               return;
+
        /*  XXXX doesn't respect low-water mark very well. */
-       if (evbuffer_launch_write(b->bev.bev.output, -1,
+       if (evbuffer_launch_write(b->bev.bev.output, at_most,
            &b->write_overlapped)) {
                EVUTIL_ASSERT(0);/* XXX act sensibly. */
        } else {
@@ -150,6 +162,7 @@ bev_async_consider_reading(struct bufferevent_async *b)
        size_t cur_size;
        size_t read_high;
        size_t at_most;
+       int limit;
        /* Don't read if there is a read in progress, or we do not
         * want to read. */
        if (!b->ok || b->read_in_progress || !(b->bev.bev.enabled&EV_READ))
@@ -166,6 +179,14 @@ bev_async_consider_reading(struct bufferevent_async *b)
                at_most = 16384; /* FIXME totally magic. */
        }
 
+       /* XXXX This over-commits. */
+       limit = _bufferevent_get_read_max(&b->bev);
+       if (at_most >= limit)
+               at_most = limit;
+
+       if (b->bev.read_suspended)
+               return;
+
        if (evbuffer_launch_read(b->bev.bev.input, at_most,
            &b->read_overlapped)) {
                EVUTIL_ASSERT(0);
@@ -304,6 +325,7 @@ read_complete(struct event_overlapped *eo, uintptr_t key,
 
        if (ok && nbytes) {
                BEV_RESET_GENERIC_READ_TIMEOUT(bev);
+               _bufferevent_derement_read_buckets(&bev_a->bev, nbytes);
                if (bev->readcb != NULL &&
                    evbuffer_get_length(bev->input) >= bev->wm_read.low)
                        _bufferevent_run_readcb(bev);
@@ -337,7 +359,8 @@ write_complete(struct event_overlapped *eo, uintptr_t key,
 
        if (ok && nbytes) {
                BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
-               if (bev->writecb != NULL && 
+               _bufferevent_derement_write_buckets(&bev_a->bev, nbytes);
+               if (bev->writecb != NULL &&
                    evbuffer_get_length(bev->output) <= bev->wm_write.low)
                        _bufferevent_run_writecb(bev);
                bev_async_consider_writing(bev_a);
index f121c5bedfd3d41b763be0971644113cc040f0ce..e2cb28b123cbc3670f45b283f0688f8fff49adf5 100644 (file)
@@ -524,20 +524,29 @@ do_read(struct bufferevent_openssl *bev_ssl, int n_to_read)
        /* Requires lock */
        struct bufferevent *bev = &bev_ssl->bev.bev;
        struct evbuffer *input = bev->input;
-       int r, n, i, n_used = 0, blocked = 0;
+       int r, n, i, n_used = 0, blocked = 0, atmost;
        struct evbuffer_iovec space[2];
 
+       atmost = _bufferevent_get_read_max(&bev_ssl->bev);
+       if (n_to_read > atmost)
+               n_to_read = atmost;
+
        n = evbuffer_reserve_space(input, n_to_read, space, 2);
        if (n < 0)
                return -1;
 
        for (i=0; i<n; ++i) {
+               if (bev_ssl->bev.read_suspended)
+                       break;
                r = SSL_read(bev_ssl->ssl, space[i].iov_base, space[i].iov_len);
                if (r>0) {
                        if (bev_ssl->read_blocked_on_write)
                                clear_rbow(bev_ssl);
                        ++n_used;
                        space[i].iov_len = r;
+                       /* Not exactly right; we probably want to do
+                        * our rate-limiting on the underlying bytes. */
+                       _bufferevent_decrement_read_buckets(&bev_ssl->bev, r);
                } else {
                        int err = SSL_get_error(bev_ssl->ssl, r);
                        print_err(err);
@@ -585,6 +594,8 @@ do_write(struct bufferevent_openssl *bev_ssl, int atmost)
 
        if (bev_ssl->last_write > 0)
                atmost = bev_ssl->last_write;
+       else
+               atmost = _bufferevent_get_write_max(&bev_ssl->bev);
 
        n = evbuffer_peek(output, atmost, NULL, space, 8);
        if (n < 0)
@@ -593,6 +604,9 @@ do_write(struct bufferevent_openssl *bev_ssl, int atmost)
        if (n > 8)
                n = 8;
        for (i=0; i < n; ++i) {
+               if (bev_ssl->bev.write_suspended)
+                       break;
+
                r = SSL_write(bev_ssl->ssl, space[i].iov_base,
                    space[i].iov_len);
                if (r > 0) {
@@ -600,6 +614,9 @@ do_write(struct bufferevent_openssl *bev_ssl, int atmost)
                                clear_wbor(bev_ssl);
                        n_written += r;
                        bev_ssl->last_write = -1;
+                       /* Not exactly right; we probably want to do
+                        * our rate-limiting on the underlying bytes. */
+                       _bufferevent_decrement_write_buckets(&bev_ssl->bev, r);
                } else {
                        int err = SSL_get_error(bev_ssl->ssl, r);
                        print_err(err);
diff --git a/bufferevent_ratelim.c b/bufferevent_ratelim.c
new file mode 100644 (file)
index 0000000..564367c
--- /dev/null
@@ -0,0 +1,654 @@
+/*
+ * Copyright (c) 2007-2009 Niels Provos and Nick Mathewson
+ * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ * 3. The name of the author may not be used to endorse or promote products
+ *    derived from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <sys/types.h>
+#include <limits.h>
+#include <string.h>
+#include <stdlib.h>
+
+#include "event2/event.h"
+#include "event2/event_struct.h"
+#include "event2/util.h"
+#include "event2/bufferevent.h"
+#include "event2/bufferevent_struct.h"
+#include "event2/buffer.h"
+
+#include "ratelim-internal.h"
+
+#include "bufferevent-internal.h"
+#include "mm-internal.h"
+#include "util-internal.h"
+
+int
+ev_token_bucket_init(struct ev_token_bucket *bucket,
+    const struct ev_token_bucket_cfg *cfg,
+    ev_uint32_t current_tick,
+    int reinitialize)
+{
+       if (reinitialize) {
+               /* on reinitialization, we only clip downwards, since we've
+                  already used who-knows-how-much bandwidth this tick.  We
+                  leave "last_updated" as it is; the next update will add the
+                  appropriate amount of bandwidth to the bucket.
+               */
+               if (bucket->read_limit > cfg->read_maximum)
+                       bucket->read_limit = cfg->read_maximum;
+               if (bucket->write_limit > cfg->write_maximum)
+                       bucket->write_limit = cfg->write_maximum;
+       } else {
+               bucket->read_limit = cfg->read_rate;
+               bucket->write_limit = cfg->write_rate;
+               bucket->last_updated = current_tick;
+       }
+       return 0;
+}
+
+int
+ev_token_bucket_update(struct ev_token_bucket *bucket,
+    const struct ev_token_bucket_cfg *cfg,
+    ev_uint32_t current_tick)
+{
+       /* It's okay if the tick number overflows, since we'll just
+        * wrap around when we do the unsigned substraction. */
+       unsigned n_ticks = current_tick - bucket->last_updated;
+
+       /* Make sure some ticks actually happened, and that time didn't
+        * roll back. */
+       if (n_ticks == 0 || n_ticks > INT_MAX)
+               return 0;
+
+       /* Naively, we would say
+              bucket->limit += n_ticks * cfg->rate;
+
+              if (bucket->limit > cfg->maximum)
+                  bucket->limit = cfg->maximum;
+
+          But we're worried about overflow, so we do it like this:
+       */
+
+       if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
+               bucket->read_limit = cfg->read_maximum;
+       else
+               bucket->read_limit += n_ticks * cfg->read_rate;
+
+
+       if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
+               bucket->write_limit = cfg->write_maximum;
+       else
+               bucket->write_limit += n_ticks * cfg->write_rate;
+
+
+       bucket->last_updated = current_tick;
+
+       return 1;
+}
+
+ev_uint32_t
+ev_token_bucket_get_tick(const struct timeval *tv,
+    const struct ev_token_bucket_cfg *cfg)
+{
+       /* This computation uses two multiplies and a divide.  We could do
+        * fewer if we knew that the tick length was an integer number of
+        * seconds, or if we knew it divided evenly into a second.  We should
+        * investigate that more.
+        */
+
+       /* We cast to an ev_uint64_t first, since we don't want to overflow
+        * before we do the final divide. */
+       ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
+       return (unsigned)(msec / cfg->msec_per_tick);
+}
+
+struct ev_token_bucket_cfg *
+ev_token_bucket_cfg_new(ev_uint32_t read_rate, ev_uint32_t read_burst,
+    ev_uint32_t write_rate, ev_uint32_t write_burst,
+    const struct timeval *tick_len)
+{
+       struct ev_token_bucket_cfg *r;
+       struct timeval g;
+       if (! tick_len) {
+               g.tv_sec = 1;
+               g.tv_usec = 0;
+               tick_len = &g;
+       }
+       if (read_rate > read_burst || write_rate > write_burst ||
+           read_rate < 1 || write_rate < 1)
+               return NULL;
+       r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
+       if (!r)
+               return NULL;
+       r->read_rate = read_rate;
+       r->write_rate = write_rate;
+       r->read_maximum = read_burst;
+       r->write_maximum = write_burst;
+       memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
+       r->msec_per_tick = (tick_len->tv_sec * 1000) + tick_len->tv_usec/1000;
+       return r;
+}
+
+void
+ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
+{
+       mm_free(cfg);
+}
+
+/* No matter how big our bucket gets, don't try to read more than this
+ * much in a single read operation. */
+#define MAX_TO_READ_EVER 16384
+/* No matter how big our bucket gets, don't try to write more than this
+ * much in a single write operation. */
+#define MAX_TO_WRITE_EVER 16384
+
+#define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
+#define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
+
+static int _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g);
+static int _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g);
+
+/** Helper: figure out the maximum amount we should write if is_write, or
+    the maximum amount we should read if is_read.  Return that maximum, or
+    0 if our bucket is wholly exhausted.
+ */
+static inline int
+_bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write)
+{
+       /* needs lock on bev. */
+       int max_so_far = is_write?MAX_TO_WRITE_EVER:MAX_TO_READ_EVER;
+       struct timeval now;
+
+#define LIM(x)                                         \
+       (is_write ? (x).write_limit : (x).read_limit)
+
+#define GROUP_SUSPENDED(g)                     \
+       (is_write ? (g)->write_suspended : (g)->read_suspended)
+
+       /* Sets max_so_far to MIN(x, max_so_far) */
+#define CLAMPTO(x)                             \
+       do {                                    \
+               if (max_so_far > (x))           \
+                       max_so_far = (x);       \
+       } while (0);
+
+       if (!bev->rate_limiting)
+               return max_so_far;
+
+       /* If rate-limiting is enabled at all, update the appropriate
+          bucket, and take the smaller of our rate limit and the group
+          rate limit.
+        */
+
+       if (bev->rate_limiting->cfg) {
+               unsigned tick;
+
+               event_base_gettimeofday_cached(bev->bev.ev_base, &now);
+               tick = ev_token_bucket_get_tick(&now, bev->rate_limiting->cfg);
+               ev_token_bucket_update(&bev->rate_limiting->limit,
+                   bev->rate_limiting->cfg, tick);
+               max_so_far = LIM(bev->rate_limiting->limit);
+       }
+       if (bev->rate_limiting->group) {
+               struct bufferevent_rate_limit_group *g =
+                   bev->rate_limiting->group;
+               ev_uint32_t share;
+               LOCK_GROUP(g);
+               if (GROUP_SUSPENDED(g)) {
+                       /* We can get here if we failed to lock this
+                        * particular bufferevent while suspending the whole
+                        * group. */
+                       if (is_write)
+                               bufferevent_suspend_write(&bev->bev,
+                                   BEV_SUSPEND_BW_GROUP);
+                       else
+                               bufferevent_suspend_read(&bev->bev,
+                                   BEV_SUSPEND_BW_GROUP);
+                       share = 0;
+               } else {
+                       /* XXXX probably we should divide among the active
+                        * members, not the total members. */
+                       share = LIM(g->rate_limit) / g->n_members;
+                       if (share < g->min_share)
+                               share = g->min_share;
+               }
+               UNLOCK_GROUP(g);
+               CLAMPTO(share);
+       }
+
+       return max_so_far;
+}
+
+int
+_bufferevent_get_read_max(struct bufferevent_private *bev)
+{
+       return _bufferevent_get_rlim_max(bev, 0);
+}
+
+int
+_bufferevent_get_write_max(struct bufferevent_private *bev)
+{
+       return _bufferevent_get_rlim_max(bev, 1);
+}
+
+int
+_bufferevent_decrement_read_buckets(struct bufferevent_private *bev, int bytes)
+{
+       /* need to hold lock on bev */
+       if (!bev->rate_limiting)
+               return 0;
+
+       if (bev->rate_limiting->cfg) {
+               bev->rate_limiting->limit.read_limit -= bytes;
+               if (bev->rate_limiting->limit.read_limit <= 0) {
+                       bufferevent_suspend_read(&bev->bev, BEV_SUSPEND_BW);
+                       event_add(&bev->rate_limiting->refill_bucket_event,
+                           &bev->rate_limiting->cfg->tick_timeout);
+               }
+       }
+
+       if (bev->rate_limiting->group) {
+               LOCK_GROUP(bev->rate_limiting->group);
+               bev->rate_limiting->group->rate_limit.read_limit -= bytes;
+               if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
+                       _bev_group_suspend_reading(bev->rate_limiting->group);
+               }
+               UNLOCK_GROUP(bev->rate_limiting->group);
+       }
+
+       return 0;
+}
+
+int
+_bufferevent_decrement_write_buckets(struct bufferevent_private *bev, int bytes)
+{
+       /* need to hold lock */
+       if (!bev->rate_limiting)
+               return 0;
+
+       if (bev->rate_limiting->cfg) {
+               bev->rate_limiting->limit.write_limit -= bytes;
+               if (bev->rate_limiting->limit.write_limit <= 0) {
+                       bufferevent_suspend_write(&bev->bev, BEV_SUSPEND_BW);
+                       event_add(&bev->rate_limiting->refill_bucket_event,
+                           &bev->rate_limiting->cfg->tick_timeout);
+               }
+       }
+
+       if (bev->rate_limiting->group) {
+               LOCK_GROUP(bev->rate_limiting->group);
+               bev->rate_limiting->group->rate_limit.write_limit -= bytes;
+               if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
+                       _bev_group_suspend_writing(bev->rate_limiting->group);
+               }
+               UNLOCK_GROUP(bev->rate_limiting->group);
+       }
+
+       return 0;
+}
+
+/** Stop reading on every bufferevent in <b>g</b> */
+static int
+_bev_group_suspend_reading(struct bufferevent_rate_limit_group *g)
+{
+       /* Needs group lock */
+       struct bufferevent_private *bev;
+       g->read_suspended = 1;
+       g->pending_unsuspend_read = 0;
+
+       /* Note that in this loop we call EVLOCK_TRY_LOCK instead of BEV_LOCK,
+          to prevent a deadlock.  (Ordinarily, the group lock nests inside
+          the bufferevent locks.  If we are unable to lock any individual
+          bufferevent, it will find out later when it looks at its limit
+          and sees that its group is suspended.
+       */
+       TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
+               if (EVLOCK_TRY_LOCK(bev->lock)) {
+                       bufferevent_suspend_read(&bev->bev,
+                           BEV_SUSPEND_BW_GROUP);
+                       EVLOCK_UNLOCK(bev->lock, 0);
+               }
+       }
+       return 0;
+}
+
+/** Stop writing on every bufferevent in <b>g</b> */
+static int
+_bev_group_suspend_writing(struct bufferevent_rate_limit_group *g)
+{
+       /* Needs group lock */
+       struct bufferevent_private *bev;
+       g->write_suspended = 1;
+       g->pending_unsuspend_write = 0;
+       TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
+               if (EVLOCK_TRY_LOCK(bev->lock)) {
+                       bufferevent_suspend_write(&bev->bev,
+                           BEV_SUSPEND_BW_GROUP);
+                       EVLOCK_UNLOCK(bev->lock, 0);
+               }
+       }
+       return 0;
+}
+
+/** Timer callback invoked on a single bufferevent with one or more exhausted
+    buckets when they are ready to refill. */
+static void
+_bev_refill_callback(evutil_socket_t fd, short what, void *arg)
+{
+       unsigned tick;
+       struct timeval now;
+       struct bufferevent_private *bev = arg;
+       int again = 0;
+       BEV_LOCK(&bev->bev);
+       if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
+               BEV_UNLOCK(&bev->bev);
+               return;
+       }
+
+       /* First, update the bucket */
+       event_base_gettimeofday_cached(bev->bev.ev_base, &now);
+       tick = ev_token_bucket_get_tick(&now,
+           bev->rate_limiting->cfg);
+       ev_token_bucket_update(&bev->rate_limiting->limit,
+           bev->rate_limiting->cfg,
+           tick);
+
+       /* Now unsuspend any read/write operations as appropriate. */
+       if ((bev->read_suspended & BEV_SUSPEND_BW)) {
+               if (bev->rate_limiting->limit.read_limit > 0)
+                       bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW);
+               else
+                       again = 1;
+       }
+       if ((bev->write_suspended & BEV_SUSPEND_BW)) {
+               if (bev->rate_limiting->limit.write_limit > 0)
+                       bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW);
+               else
+                       again = 1;
+       }
+       if (again) {
+               /* One or more of the buckets may need another refill if they
+                  started negative.
+
+                  XXXX if we need to be quiet for more ticks, we should
+                  maybe figure out what timeout we really want.
+               */
+               event_add(&bev->rate_limiting->refill_bucket_event,
+                   &bev->rate_limiting->cfg->tick_timeout);
+       }
+       BEV_UNLOCK(&bev->bev);
+}
+
+/** Helper: grab a random element from a bufferevent group. */
+static struct bufferevent_private *
+_bev_group_random_element(struct bufferevent_rate_limit_group *group)
+{
+       int which;
+       struct bufferevent_private *bev;
+
+       /* requires group lock */
+
+       if (!group->n_members)
+               return NULL;
+
+       EVUTIL_ASSERT(! TAILQ_EMPTY(&group->members));
+
+       which = random() % group->n_members;
+
+       bev = TAILQ_FIRST(&group->members);
+       while (which--)
+               bev = TAILQ_NEXT(bev, rate_limiting->next_in_group);
+
+       return bev;
+}
+
+/** Iterate over the elements of a rate-limiting group 'g' with a random
+    starting point, assigning each to the variable 'bev', and executing the
+    block 'block'.
+
+    We do this in a half-baked effort to get fairness among group members.
+    XXX Round-robin or some kind of priority queue would be even more fair.
+ */
+#define FOREACH_RANDOM_ORDER(block)                    \
+       do {                                            \
+               first = _bev_group_random_element(g);   \
+               for (bev = first; bev != TAILQ_END(&g->members); \
+                   bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \
+                       block ;                                  \
+               }                                                \
+               for (bev = TAILQ_FIRST(&g->members); bev && bev != first; \
+                   bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \
+                       block ;                                         \
+               }                                                       \
+       } while (0)
+
+/** Callback invoked every tick to add more elements to the group bucket
+    and unsuspend group members as needed.
+ */
+static void
+_bev_group_refill_callback(evutil_socket_t fd, short what, void *arg)
+{
+       struct bufferevent_rate_limit_group *g = arg;
+       unsigned tick;
+       struct timeval now;
+       int again = 0;
+       struct bufferevent_private *bev, *first;
+
+       event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
+
+       LOCK_GROUP(g);
+       tick = ev_token_bucket_get_tick(&now, &g->rate_limit_cfg);
+       ev_token_bucket_update(&g->rate_limit, &g->rate_limit_cfg, tick);
+
+       if (g->pending_unsuspend_read ||
+           (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
+               g->read_suspended = 0;
+               again = 0;
+               FOREACH_RANDOM_ORDER({
+                       if (EVLOCK_TRY_LOCK(bev->lock)) {
+                               bufferevent_unsuspend_read(&bev->bev,
+                                   BEV_SUSPEND_BW_GROUP);
+                               EVLOCK_UNLOCK(bev->lock, 0);
+                       } else {
+                               again = 1;
+                       }
+               });
+               g->pending_unsuspend_read = again;
+       }
+       if (g->pending_unsuspend_write ||
+           (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
+               g->write_suspended = 0;
+               again = 0;
+               FOREACH_RANDOM_ORDER({
+                       if (EVLOCK_TRY_LOCK(bev->lock)) {
+                               bufferevent_unsuspend_write(&bev->bev,
+                                   BEV_SUSPEND_BW_GROUP);
+                               EVLOCK_UNLOCK(bev->lock, 0);
+                       } else {
+                               again = 1;
+                       }
+               });
+               g->pending_unsuspend_write = again;
+       }
+
+       /* XXXX Rather than waiting to the next tick to unsuspend stuff
+        * with pending_unsuspend_write/read, we should do it on the
+        * next iteration of the mainloop.
+        */
+
+       UNLOCK_GROUP(g);
+}
+
+int
+bufferevent_set_rate_limit(struct bufferevent *bev,
+    struct ev_token_bucket_cfg *cfg)
+{
+       struct bufferevent_private *bevp =
+           EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
+       int r = -1;
+       struct bufferevent_rate_limit *rlim;
+       struct timeval now;
+       ev_uint32_t tick;
+       /* XXX reference-count cfg */
+
+       BEV_LOCK(bev);
+
+       if (cfg == NULL) {
+               if (bevp->rate_limiting) {
+                       bevp->rate_limiting->cfg = NULL;
+                       bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
+                       bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
+               }
+               r = 0;
+               goto done;
+       }
+
+       event_base_gettimeofday_cached(bev->ev_base, &now);
+       tick = ev_token_bucket_get_tick(&now, cfg);
+
+       if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
+               ;
+       } else if (bevp->rate_limiting) {
+               bevp->rate_limiting->cfg = cfg;
+               ev_token_bucket_init(&bevp->rate_limiting->limit, cfg, tick, 1);
+               if (bevp->rate_limiting->limit.read_limit > 0)
+                       bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
+               else
+                       bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
+               if (bevp->rate_limiting->limit.write_limit > 0)
+                       bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
+               else
+                       bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
+       } else {
+               rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
+               if (!rlim)
+                       goto done;
+               rlim->cfg = cfg;
+               ev_token_bucket_init(&rlim->limit, cfg, tick, 0);
+               evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
+                   _bev_refill_callback, bevp);
+               bevp->rate_limiting = rlim;
+       }
+       r = 0;
+done:
+       BEV_UNLOCK(bev);
+       return r;
+}
+
+struct bufferevent_rate_limit_group *
+bufferevent_rate_limit_group_new(struct event_base *base,
+    const struct ev_token_bucket_cfg *cfg)
+{
+       struct bufferevent_rate_limit_group *g;
+       struct timeval now;
+       ev_uint32_t tick;
+
+       event_base_gettimeofday_cached(base, &now);
+       tick = ev_token_bucket_get_tick(&now, cfg);
+
+       g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
+       if (!g)
+               return NULL;
+       memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
+       TAILQ_INIT(&g->members);
+
+       ev_token_bucket_init(&g->rate_limit, cfg, tick, 0);
+
+       g->min_share = 64;
+       event_assign(&g->master_refill_event, base, -1, EV_PERSIST,
+           _bev_group_refill_callback, g);
+       event_add(&g->master_refill_event, &cfg->tick_timeout);
+
+       EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
+       return g;
+}
+
+int
+bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
+    struct bufferevent_rate_limit_group *g)
+{
+       int wsuspend, rsuspend;
+       struct bufferevent_private *bevp =
+           EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
+       BEV_LOCK(bev);
+
+       if (!bevp->rate_limiting) {
+               struct bufferevent_rate_limit *rlim;
+               rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
+               if (!rlim) {
+                       BEV_UNLOCK(bev);
+                       return -1;
+               }
+               evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
+                   _bev_refill_callback, bevp);
+               bevp->rate_limiting = rlim;
+       }
+
+       if (bevp->rate_limiting->group == g) {
+               BEV_UNLOCK(bev);
+               return 0;
+       }
+       if (bevp->rate_limiting->group)
+               bufferevent_remove_from_rate_limit_group(bev);
+
+       LOCK_GROUP(g);
+       bevp->rate_limiting->group = g;
+       ++g->n_members;
+       TAILQ_INSERT_TAIL(&g->members, bevp, rate_limiting->next_in_group);
+
+       rsuspend = g->read_suspended;
+       wsuspend = g->write_suspended;
+
+       UNLOCK_GROUP(g);
+
+       if (rsuspend)
+               bufferevent_suspend_read(bev, BEV_SUSPEND_BW_GROUP);
+       if (wsuspend)
+               bufferevent_suspend_write(bev, BEV_SUSPEND_BW_GROUP);
+
+       BEV_UNLOCK(bev);
+       return 0;
+}
+
+int
+bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
+{
+       struct bufferevent_private *bevp =
+           EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
+       BEV_LOCK(bev);
+       if (bevp->rate_limiting && bevp->rate_limiting->group) {
+               struct bufferevent_rate_limit_group *g =
+                   bevp->rate_limiting->group;
+               LOCK_GROUP(g);
+               bevp->rate_limiting->group = NULL;
+               --g->n_members;
+               TAILQ_REMOVE(&g->members, bevp, rate_limiting->next_in_group);
+               UNLOCK_GROUP(g);
+       }
+       bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW_GROUP);
+       bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW_GROUP);
+       BEV_UNLOCK(bev);
+       return 0;
+}
index 61a369f6db6a635a92de3848ee59a6738b1bb24b..f53689eefc99dc4b94549e71c2a47106c6121bfb 100644 (file)
@@ -118,10 +118,12 @@ static void
 bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
 {
        struct bufferevent *bufev = arg;
+       struct bufferevent_private *bufev_p =
+           EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
        struct evbuffer *input;
        int res = 0;
        short what = BEV_EVENT_READING;
-       int howmuch = -1;
+       int howmuch = -1, readmax=-1;
 
        _bufferevent_incref_and_lock(bufev);
 
@@ -144,6 +146,12 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
                        goto done;
                }
        }
+       readmax = _bufferevent_get_read_max(bufev_p);
+       if (howmuch < 0 || howmuch > readmax) /* The use of -1 for "unlimited"
+                                              * uglifies this code. */
+               howmuch = readmax;
+       if (bufev_p->read_suspended)
+               goto done;
 
        evbuffer_unfreeze(input, 0);
        res = evbuffer_read(input, fd, howmuch);
@@ -163,6 +171,7 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
        if (res <= 0)
                goto error;
 
+       _bufferevent_decrement_read_buckets(bufev_p, res);
 
        /* Invoke the user callback - must always be called last */
        if (evbuffer_get_length(input) >= bufev->wm_read.low &&
@@ -191,6 +200,7 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
        int res = 0;
        short what = BEV_EVENT_WRITING;
        int connected = 0;
+       int atmost = -1;
 
        _bufferevent_incref_and_lock(bufev);
 
@@ -232,9 +242,14 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
                }
        }
 
+       atmost = _bufferevent_get_write_max(bufev_p);
+
+       if (bufev_p->write_suspended)
+               goto done;
+
        if (evbuffer_get_length(bufev->output)) {
                evbuffer_unfreeze(bufev->output, 1);
-               res = evbuffer_write(bufev->output, fd);
+               res = evbuffer_write_atmost(bufev->output, fd, atmost);
                evbuffer_freeze(bufev->output, 1);
                if (res == -1) {
                        int err = evutil_socket_geterror(fd);
@@ -250,6 +265,8 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
                }
                if (res <= 0)
                        goto error;
+
+               _bufferevent_decrement_write_buckets(bufev_p, res);
        }
 
        if (evbuffer_get_length(bufev->output) == 0)
index 523a7fce06e239cece6cf9048496689051217c35..48d56bd6a4940b6c82fa38745159fad9624bd1ef 100644 (file)
@@ -499,6 +499,97 @@ int
 bufferevent_pair_new(struct event_base *base, int options,
     struct bufferevent *pair[2]);
 
+
+/**
+   Abstract type used to configure rate-limiting on a bufferevent or a group
+   of bufferevents.
+ */
+struct ev_token_bucket_cfg;
+/**
+   A group of bufferevents which are configured to respect the same rate
+   limit.
+*/
+struct bufferevent_rate_limit_group;
+
+/**
+   Initialize and return a new object to configure the rate-limiting behavior
+   of bufferevents.
+
+   @param read_rate The maximum number of bytes to read per tick on
+     average.
+   @param read_burst The maximum number of bytes to read in any single tick.
+   @param write_rate The maximum number of bytes to write per tick on
+     average.
+   @param write_burst The maximum number of bytes to write in any single tick.
+   @param tick_len The length of a single tick.  Defaults to one second.
+     Any fractions of a millisecond are ignored.
+
+   Note that all rate-limits hare are currently best-effort: future versions
+   of Libevent may implement them more tightly.
+ */
+struct ev_token_bucket_cfg *ev_token_bucket_cfg_new(
+       ev_uint32_t read_rate, ev_uint32_t read_burst,
+       ev_uint32_t write_rate, ev_uint32_t write_burst,
+       const struct timeval *tick_len);
+
+/** Free all storage held in 'cfg'.
+
+    Note: 'cfg' is not currently reference-counted; it is not safe to free it
+    until no bufferevent is using it.
+ */
+void ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg);
+
+/**
+   Set the rate-limit of a the bufferevent 'bev' to the one specified in
+   'cfg'.  If 'cfg' is NULL, disable any per-bufferevent rate-limiting on
+   'bev'.
+
+   Note that only some bufferevent types currently respect rate-limiting.
+   They are: socket-based bufferevents (normal and IOCP-based), and SSL-based
+   bufferevents.
+
+   Return 0 on sucess, -1 on failure.
+ */
+int bufferevent_set_rate_limit(struct bufferevent *bev,
+    struct ev_token_bucket_cfg *cfg);
+/**
+   Create a new rate-limit group for bufferevents.  A rate-limit group
+   constrains the maximum number of bytes sent and received, in toto,
+   by all of its bufferevents.
+
+   @param base An event_base to run any necessary timeouts for the group.
+      Note that all bufferevents in the group do not necessarily need to share
+      this event_base.
+   @param cfg The rate-limit for this group.
+
+   Note that all rate-limits hare are currently best-effort: future versions
+   of Libevent may implement them more tightly.
+
+   Note also that only some bufferevent types currently respect rate-limiting.
+   They are: socket-based bufferevents (normal and IOCP-based), and SSL-based
+   bufferevents.
+ */
+struct bufferevent_rate_limit_group *bufferevent_rate_limit_group_new(
+       struct event_base *base,
+       const struct ev_token_bucket_cfg *cfg);
+/*XXX we need a bufferevent_rate_limit_group_set_cfg */
+
+/**
+   Add 'bev' to the list of bufferevents whose aggregate reading and writing
+   is restricted by 'g'.  If 'g' is NULL, remove 'bev' from its current group.
+
+   A bufferevent may belong to no more than one rate-limit group at a time.
+   If 'bev' is already a member of a group, it will be removed from its old
+   group before being added to 'g'.
+
+   Return 0 on success and -1 on failure.
+ */
+int bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
+    struct bufferevent_rate_limit_group *g);
+
+/** Remove 'bev' from its current rate-limit group (if any). */
+int bufferevent_remove_from_rate_limit_group(struct bufferevent *bev);
+
 #ifdef __cplusplus
 }
 #endif
index 98c26b6136497f3c59d03f55cf08489ff3e780d4..e50e434b48982c03166da46c61c37923acca35c4 100644 (file)
@@ -93,12 +93,16 @@ extern "C" {
 
 #ifdef _EVENT_HAVE_UINT32_T
 #define ev_uint32_t uint32_t
+#define ev_int32_t int32_t
 #elif defined(WIN32)
 #define ev_uint32_t unsigned int
+#define ev_int32_t signed int
 #elif _EVENT_SIZEOF_LONG == 4
 #define ev_uint32_t unsigned long
+#define ev_int32_t signed long
 #elif _EVENT_SIZEOF_INT == 4
 #define ev_uint32_t unsigned int
+#define ev_int32_t signed int
 #else
 #error "No way to define ev_uint32_t"
 #endif
diff --git a/ratelim-internal.h b/ratelim-internal.h
new file mode 100644 (file)
index 0000000..105798a
--- /dev/null
@@ -0,0 +1,102 @@
+/*
+ * Copyright (c) 2009 Niels Provos and Nick Mathewson
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ * 3. The name of the author may not be used to endorse or promote products
+ *    derived from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+#ifndef _RATELIM_INTERNAL_H_
+#define _RATELIM_INTERNAL_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <event2/util.h>
+
+/** A token bucket is an internal structure that tracks how many bytes we are
+ * currently willing to read or write on a given bufferevent or group of
+ * bufferevents */
+struct ev_token_bucket {
+       /** How many bytes are we willing to read or write right now? These
+        * values are signed so that we can do "defecit spending" */
+       ev_int32_t read_limit, write_limit;
+       /** When was this bucket last updated?  Measured in abstract 'ticks'
+        * relative to the token bucket configuration. */
+       ev_uint32_t last_updated;
+};
+
+/** Configuration info for a token bucket or set of token buckets. */
+struct ev_token_bucket_cfg {
+       /** How many bytes are we willing to read on average per tick? */
+       ev_uint32_t read_rate;
+       /** How many bytes are we willing to read at most in any one tick? */
+       ev_uint32_t read_maximum;
+       /** How many bytes are we willing to write on average per tick? */
+       ev_uint32_t write_rate;
+       /** How many bytes are we willing to write at most in any one tick? */
+       ev_uint32_t write_maximum;
+
+       /* How long is a tick?  Note that fractions of a millisecond are
+        * ignored. */
+       struct timeval tick_timeout;
+
+       /* How long is a tick, in milliseconds?  Derived from tick_timeout. */
+       unsigned msec_per_tick;
+};
+
+/** The current tick is 'current_tick': add bytes to 'bucket' as specified in
+ * 'cfg'. */
+int ev_token_bucket_update(struct ev_token_bucket *bucket,
+    const struct ev_token_bucket_cfg *cfg,
+    ev_uint32_t current_tick);
+
+/** In which tick does 'tv' fall according to 'cfg'?  Note that ticks can
+ * overflow easily; your code needs to handle this. */
+ev_uint32_t ev_token_bucket_get_tick(const struct timeval *tv,
+    const struct ev_token_bucket_cfg *cfg);
+
+/** Adjust 'bucket' to respect 'cfg', and note that it was last updated in
+ * 'current_tick'.  If 'reinitialize' is true, we are changing the
+ * configuration of 'bucket'; otherwise, we are setting it up for the first
+ * time.
+ */
+int ev_token_bucket_init(struct ev_token_bucket *bucket,
+    const struct ev_token_bucket_cfg *cfg,
+    ev_uint32_t current_tick,
+    int reinitialize);
+
+/** Decrease the read limit of 'b' by 'n' bytes */
+#define ev_token_bucket_decrement_read(b,n)    \
+       do {                                    \
+               (b)->read_limit -= (n);         \
+       } while (0)
+/** Decrease the write limit of 'b' by 'n' bytes */
+#define ev_token_bucket_decrement_write(b,n)   \
+       do {                                    \
+               (b)->write_limit -= (n);        \
+       } while (0)
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif