]> granicus.if.org Git - libevent/commitdiff
Functions to view and manipulate rate-limiting buckets.
authorNick Mathewson <nickm@torproject.org>
Wed, 3 Feb 2010 20:12:04 +0000 (15:12 -0500)
committerNick Mathewson <nickm@torproject.org>
Wed, 3 Feb 2010 20:12:04 +0000 (15:12 -0500)
We need these for Tor, and other projects probably need them too.  Uses
include:
    - Checking whether bandwidth is mostly-used, and only taking some
      actions when there's plenty of bandwidth.
    - Deducting some non-bufferevent activities from a rate-limit group.

bufferevent_ratelim.c
include/event2/bufferevent.h

index bbbadcad713e31c2f87096d6226be492bb0691ff..f2438202ea023708fde81b46a176f194255f981b 100644 (file)
@@ -108,6 +108,19 @@ ev_token_bucket_update(struct ev_token_bucket *bucket,
        return 1;
 }
 
+static inline void
+bufferevent_update_buckets(struct bufferevent_private *bev)
+{
+       /* Must hold lock on bev. */
+       struct timeval now;
+       unsigned tick;
+       event_base_gettimeofday_cached(bev->bev.ev_base, &now);
+       tick = ev_token_bucket_get_tick(&now, bev->rate_limiting->cfg);
+       if (tick != bev->rate_limiting->limit.last_updated)
+               ev_token_bucket_update(&bev->rate_limiting->limit,
+                   bev->rate_limiting->cfg, tick);
+}
+
 ev_uint32_t
 ev_token_bucket_get_tick(const struct timeval *tv,
     const struct ev_token_bucket_cfg *cfg)
@@ -179,7 +192,6 @@ _bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write)
 {
        /* needs lock on bev. */
        int max_so_far = is_write?MAX_TO_WRITE_EVER:MAX_TO_READ_EVER;
-       struct timeval now;
 
 #define LIM(x)                                         \
        (is_write ? (x).write_limit : (x).read_limit)
@@ -203,12 +215,7 @@ _bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write)
         */
 
        if (bev->rate_limiting->cfg) {
-               unsigned tick;
-
-               event_base_gettimeofday_cached(bev->bev.ev_base, &now);
-               tick = ev_token_bucket_get_tick(&now, bev->rate_limiting->cfg);
-               ev_token_bucket_update(&bev->rate_limiting->limit,
-                   bev->rate_limiting->cfg, tick);
+               bufferevent_update_buckets(bev);
                max_so_far = LIM(bev->rate_limiting->limit);
        }
        if (bev->rate_limiting->group) {
@@ -451,6 +458,44 @@ _bev_group_random_element(struct bufferevent_rate_limit_group *group)
                }                                                       \
        } while (0)
 
+static void
+_bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g)
+{
+       int again = 0;
+       struct bufferevent_private *bev, *first;
+
+       g->read_suspended = 0;
+       FOREACH_RANDOM_ORDER({
+               if (EVLOCK_TRY_LOCK(bev->lock)) {
+                       bufferevent_unsuspend_read(&bev->bev,
+                           BEV_SUSPEND_BW_GROUP);
+                       EVLOCK_UNLOCK(bev->lock, 0);
+               } else {
+                       again = 1;
+               }
+       });
+       g->pending_unsuspend_read = again;
+}
+
+static void
+_bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g)
+{
+       int again = 0;
+       struct bufferevent_private *bev, *first;
+       g->write_suspended = 0;
+
+       FOREACH_RANDOM_ORDER({
+               if (EVLOCK_TRY_LOCK(bev->lock)) {
+                       bufferevent_unsuspend_write(&bev->bev,
+                           BEV_SUSPEND_BW_GROUP);
+                       EVLOCK_UNLOCK(bev->lock, 0);
+               } else {
+                       again = 1;
+               }
+       });
+       g->pending_unsuspend_write = again;
+}
+
 /** Callback invoked every tick to add more elements to the group bucket
     and unsuspend group members as needed.
  */
@@ -460,8 +505,6 @@ _bev_group_refill_callback(evutil_socket_t fd, short what, void *arg)
        struct bufferevent_rate_limit_group *g = arg;
        unsigned tick;
        struct timeval now;
-       int again = 0;
-       struct bufferevent_private *bev, *first;
 
        event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
 
@@ -471,33 +514,11 @@ _bev_group_refill_callback(evutil_socket_t fd, short what, void *arg)
 
        if (g->pending_unsuspend_read ||
            (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
-               g->read_suspended = 0;
-               again = 0;
-               FOREACH_RANDOM_ORDER({
-                       if (EVLOCK_TRY_LOCK(bev->lock)) {
-                               bufferevent_unsuspend_read(&bev->bev,
-                                   BEV_SUSPEND_BW_GROUP);
-                               EVLOCK_UNLOCK(bev->lock, 0);
-                       } else {
-                               again = 1;
-                       }
-               });
-               g->pending_unsuspend_read = again;
+               _bev_group_unsuspend_reading(g);
        }
        if (g->pending_unsuspend_write ||
            (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
-               g->write_suspended = 0;
-               again = 0;
-               FOREACH_RANDOM_ORDER({
-                       if (EVLOCK_TRY_LOCK(bev->lock)) {
-                               bufferevent_unsuspend_write(&bev->bev,
-                                   BEV_SUSPEND_BW_GROUP);
-                               EVLOCK_UNLOCK(bev->lock, 0);
-                       } else {
-                               again = 1;
-                       }
-               });
-               g->pending_unsuspend_write = again;
+               _bev_group_unsuspend_writing(g);
        }
 
        /* XXXX Rather than waiting to the next tick to unsuspend stuff
@@ -660,3 +681,169 @@ bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
        BEV_UNLOCK(bev);
        return 0;
 }
+
+/* ===
+ * API functions to expose rate limits.
+ *
+ * Don't use these from inside Libevent; they're meant to be for use by
+ * the program.
+ * === */
+
+/* Mostly you don't want to use this function from inside libevent;
+ * _bufferevent_get_read_max() is more likely what you want*/
+ev_ssize_t
+bufferevent_get_read_limit(struct bufferevent *bev)
+{
+       ev_ssize_t r;
+       struct bufferevent_private *bevp;
+       BEV_LOCK(bev);
+       bevp = BEV_UPCAST(bev);
+       if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
+               bufferevent_update_buckets(bevp);
+               r = bevp->rate_limiting->limit.read_limit;
+       } else {
+               r = EV_SSIZE_MAX;
+       }
+       BEV_UNLOCK(bev);
+       return r;
+}
+
+/* Mostly you don't want to use this function from inside libevent;
+ * _bufferevent_get_write_max() is more likely what you want*/
+ev_ssize_t
+bufferevent_get_write_limit(struct bufferevent *bev)
+{
+       ev_ssize_t r;
+       struct bufferevent_private *bevp;
+       BEV_LOCK(bev);
+       bevp = BEV_UPCAST(bev);
+       if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
+               bufferevent_update_buckets(bevp);
+               r = bevp->rate_limiting->limit.write_limit;
+       } else {
+               r = EV_SSIZE_MAX;
+       }
+       BEV_UNLOCK(bev);
+       return r;
+}
+
+/* Mostly you don't want to use this function from inside libevent;
+ * _bufferevent_get_read_max() is more likely what you want*/
+ev_ssize_t
+bufferevent_rate_limit_group_get_read_limit(
+       struct bufferevent_rate_limit_group *grp)
+{
+       ev_ssize_t r;
+       LOCK_GROUP(grp);
+       r = grp->rate_limit.read_limit;
+       UNLOCK_GROUP(grp);
+       return r;
+}
+
+/* Mostly you don't want to use this function from inside libevent;
+ * _bufferevent_get_write_max() is more likely what you want. */
+ev_ssize_t
+bufferevent_rate_limit_group_get_write_limit(
+       struct bufferevent_rate_limit_group *grp)
+{
+       ev_ssize_t r;
+       LOCK_GROUP(grp);
+       r = grp->rate_limit.write_limit;
+       UNLOCK_GROUP(grp);
+       return r;
+}
+
+int
+bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
+{
+       int r = 0;
+       ev_int32_t old_limit, new_limit;
+       struct bufferevent_private *bevp;
+       BEV_LOCK(bev);
+       bevp = BEV_UPCAST(bev);
+       EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
+       old_limit = bevp->rate_limiting->limit.read_limit;
+
+       new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
+       if (old_limit > 0 && new_limit <= 0) {
+               bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
+               if (event_add(&bevp->rate_limiting->refill_bucket_event,
+                       &bevp->rate_limiting->cfg->tick_timeout) < 0)
+                       r = -1;
+       } else if (old_limit <= 0 && new_limit > 0) {
+               event_del(&bevp->rate_limiting->refill_bucket_event);
+               bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
+       }
+
+       BEV_UNLOCK(bev);
+       return r;
+}
+
+int
+bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
+{
+       /* XXXX this is mostly copy-and-paste from
+        * bufferevent_decrement_read_limit */
+       int r = 0;
+       ev_int32_t old_limit, new_limit;
+       struct bufferevent_private *bevp;
+       BEV_LOCK(bev);
+       bevp = BEV_UPCAST(bev);
+       EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
+       old_limit = bevp->rate_limiting->limit.write_limit;
+
+       new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
+       if (old_limit > 0 && new_limit <= 0) {
+               bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
+               if (event_add(&bevp->rate_limiting->refill_bucket_event,
+                       &bevp->rate_limiting->cfg->tick_timeout) < 0)
+                       r = -1;
+       } else if (old_limit <= 0 && new_limit > 0) {
+               event_del(&bevp->rate_limiting->refill_bucket_event);
+               bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
+       }
+
+       BEV_UNLOCK(bev);
+       return r;
+}
+
+int
+bufferevent_rate_limit_group_decrement_read(
+       struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
+{
+       int r = 0;
+       ev_int32_t old_limit, new_limit;
+       LOCK_GROUP(grp);
+       old_limit = grp->rate_limit.read_limit;
+       new_limit = (grp->rate_limit.read_limit -= decr);
+
+       if (old_limit > 0 && new_limit <= 0) {
+               _bev_group_suspend_reading(grp);
+       } else if (old_limit <= 0 && new_limit > 0) {
+               _bev_group_unsuspend_reading(grp);
+       }
+
+       UNLOCK_GROUP(grp);
+       return r;
+}
+
+int
+bufferevent_rate_limit_group_decrement_write(
+       struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
+{
+       int r = 0;
+       ev_int32_t old_limit, new_limit;
+       LOCK_GROUP(grp);
+       old_limit = grp->rate_limit.write_limit;
+       new_limit = (grp->rate_limit.write_limit -= decr);
+
+       if (old_limit > 0 && new_limit <= 0) {
+               _bev_group_suspend_writing(grp);
+       } else if (old_limit <= 0 && new_limit > 0) {
+               _bev_group_unsuspend_writing(grp);
+       }
+
+       UNLOCK_GROUP(grp);
+       return r;
+}
+
index 795607b0f0715ef22f51c699fcdd1191f04e9706..18f5d37346c73b8a54a3e16b79477d40029a3633 100644 (file)
@@ -492,12 +492,12 @@ int
 bufferevent_pair_new(struct event_base *base, int options,
     struct bufferevent *pair[2]);
 
-
 /**
    Abstract type used to configure rate-limiting on a bufferevent or a group
    of bufferevents.
  */
 struct ev_token_bucket_cfg;
+
 /**
    A group of bufferevents which are configured to respect the same rate
    limit.
@@ -545,6 +545,7 @@ void ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg);
  */
 int bufferevent_set_rate_limit(struct bufferevent *bev,
     struct ev_token_bucket_cfg *cfg);
+
 /**
    Create a new rate-limit group for bufferevents.  A rate-limit group
    constrains the maximum number of bytes sent and received, in toto,
@@ -584,6 +585,62 @@ int bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
 /** Remove 'bev' from its current rate-limit group (if any). */
 int bufferevent_remove_from_rate_limit_group(struct bufferevent *bev);
 
+/*@{*/
+/**
+   Return the current read or write bucket size for a bufferevent.
+   If it is not configured with a per-bufferevent ratelimit, return
+   EV_SSIZE_MAX.  This function does not inspect the group limit, if any.
+   Note that it can return a negative value if the bufferevent has been
+   made to read or write more than its limit.
+ */
+ev_ssize_t bufferevent_get_read_limit(struct bufferevent *bev);
+ev_ssize_t bufferevent_get_write_limit(struct bufferevent *bev);
+/*@}*/
+
+/*@{*/
+/**
+   Return the read or write bucket size for a bufferevent rate limit
+   group.  Note that it can return a negative value if bufferevents in
+   the group have been made to read or write more than their limits.
+ */
+ev_ssize_t bufferevent_rate_limit_group_get_read_limit(
+       struct bufferevent_rate_limit_group *);
+ev_ssize_t bufferevent_rate_limit_group_get_write_limit(
+       struct bufferevent_rate_limit_group *);
+/*@}*/
+
+/*@{*/
+/**
+   Subtract a number of bytes from a bufferevent's read or write bucket.
+   The decrement value can be negative, if you want to manually refill
+   the bucket.  If the change puts the bucket above or below zero, the
+   bufferevent will resume or suspend reading writing as appropriate.
+   These functions make no change in the buckets for the bufferevent's
+   group, if any.
+
+   Returns 0 on success, -1 on internal error.
+ */
+int bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr);
+int bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr);
+/*@}*/
+
+/*@{*/
+/**
+   Subtract a number of bytes from a bufferevent rate-limiting group's
+   read or write bucket.  The decrement value can be negative, if you
+   want to manually refill the bucket.  If the change puts the bucket
+   above or below zero, the bufferevents in the group will resume or
+   suspend reading writing as appropriate.
+
+   Returns 0 on success, -1 on internal error.
+ */
+int bufferevent_rate_limit_group_decrement_read(
+       struct bufferevent_rate_limit_group *, ev_ssize_t);
+int bufferevent_rate_limit_group_decrement_write(
+       struct bufferevent_rate_limit_group *, ev_ssize_t);
+/*@}*/
+
+
 #ifdef __cplusplus
 }
 #endif