From: Nick Mathewson Date: Wed, 3 Feb 2010 20:12:04 +0000 (-0500) Subject: Functions to view and manipulate rate-limiting buckets. X-Git-Tag: release-2.0.4-alpha~44 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=85047a69836391bf095b98c36ec2b17876b2e325;p=libevent Functions to view and manipulate rate-limiting buckets. We need these for Tor, and other projects probably need them too. Uses include: - Checking whether bandwidth is mostly-used, and only taking some actions when there's plenty of bandwidth. - Deducting some non-bufferevent activities from a rate-limit group. --- diff --git a/bufferevent_ratelim.c b/bufferevent_ratelim.c index bbbadcad..f2438202 100644 --- a/bufferevent_ratelim.c +++ b/bufferevent_ratelim.c @@ -108,6 +108,19 @@ ev_token_bucket_update(struct ev_token_bucket *bucket, return 1; } +static inline void +bufferevent_update_buckets(struct bufferevent_private *bev) +{ + /* Must hold lock on bev. */ + struct timeval now; + unsigned tick; + event_base_gettimeofday_cached(bev->bev.ev_base, &now); + tick = ev_token_bucket_get_tick(&now, bev->rate_limiting->cfg); + if (tick != bev->rate_limiting->limit.last_updated) + ev_token_bucket_update(&bev->rate_limiting->limit, + bev->rate_limiting->cfg, tick); +} + ev_uint32_t ev_token_bucket_get_tick(const struct timeval *tv, const struct ev_token_bucket_cfg *cfg) @@ -179,7 +192,6 @@ _bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write) { /* needs lock on bev. */ int max_so_far = is_write?MAX_TO_WRITE_EVER:MAX_TO_READ_EVER; - struct timeval now; #define LIM(x) \ (is_write ? (x).write_limit : (x).read_limit) @@ -203,12 +215,7 @@ _bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write) */ if (bev->rate_limiting->cfg) { - unsigned tick; - - event_base_gettimeofday_cached(bev->bev.ev_base, &now); - tick = ev_token_bucket_get_tick(&now, bev->rate_limiting->cfg); - ev_token_bucket_update(&bev->rate_limiting->limit, - bev->rate_limiting->cfg, tick); + bufferevent_update_buckets(bev); max_so_far = LIM(bev->rate_limiting->limit); } if (bev->rate_limiting->group) { @@ -451,6 +458,44 @@ _bev_group_random_element(struct bufferevent_rate_limit_group *group) } \ } while (0) +static void +_bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g) +{ + int again = 0; + struct bufferevent_private *bev, *first; + + g->read_suspended = 0; + FOREACH_RANDOM_ORDER({ + if (EVLOCK_TRY_LOCK(bev->lock)) { + bufferevent_unsuspend_read(&bev->bev, + BEV_SUSPEND_BW_GROUP); + EVLOCK_UNLOCK(bev->lock, 0); + } else { + again = 1; + } + }); + g->pending_unsuspend_read = again; +} + +static void +_bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g) +{ + int again = 0; + struct bufferevent_private *bev, *first; + g->write_suspended = 0; + + FOREACH_RANDOM_ORDER({ + if (EVLOCK_TRY_LOCK(bev->lock)) { + bufferevent_unsuspend_write(&bev->bev, + BEV_SUSPEND_BW_GROUP); + EVLOCK_UNLOCK(bev->lock, 0); + } else { + again = 1; + } + }); + g->pending_unsuspend_write = again; +} + /** Callback invoked every tick to add more elements to the group bucket and unsuspend group members as needed. */ @@ -460,8 +505,6 @@ _bev_group_refill_callback(evutil_socket_t fd, short what, void *arg) struct bufferevent_rate_limit_group *g = arg; unsigned tick; struct timeval now; - int again = 0; - struct bufferevent_private *bev, *first; event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now); @@ -471,33 +514,11 @@ _bev_group_refill_callback(evutil_socket_t fd, short what, void *arg) if (g->pending_unsuspend_read || (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) { - g->read_suspended = 0; - again = 0; - FOREACH_RANDOM_ORDER({ - if (EVLOCK_TRY_LOCK(bev->lock)) { - bufferevent_unsuspend_read(&bev->bev, - BEV_SUSPEND_BW_GROUP); - EVLOCK_UNLOCK(bev->lock, 0); - } else { - again = 1; - } - }); - g->pending_unsuspend_read = again; + _bev_group_unsuspend_reading(g); } if (g->pending_unsuspend_write || (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){ - g->write_suspended = 0; - again = 0; - FOREACH_RANDOM_ORDER({ - if (EVLOCK_TRY_LOCK(bev->lock)) { - bufferevent_unsuspend_write(&bev->bev, - BEV_SUSPEND_BW_GROUP); - EVLOCK_UNLOCK(bev->lock, 0); - } else { - again = 1; - } - }); - g->pending_unsuspend_write = again; + _bev_group_unsuspend_writing(g); } /* XXXX Rather than waiting to the next tick to unsuspend stuff @@ -660,3 +681,169 @@ bufferevent_remove_from_rate_limit_group(struct bufferevent *bev) BEV_UNLOCK(bev); return 0; } + +/* === + * API functions to expose rate limits. + * + * Don't use these from inside Libevent; they're meant to be for use by + * the program. + * === */ + +/* Mostly you don't want to use this function from inside libevent; + * _bufferevent_get_read_max() is more likely what you want*/ +ev_ssize_t +bufferevent_get_read_limit(struct bufferevent *bev) +{ + ev_ssize_t r; + struct bufferevent_private *bevp; + BEV_LOCK(bev); + bevp = BEV_UPCAST(bev); + if (bevp->rate_limiting && bevp->rate_limiting->cfg) { + bufferevent_update_buckets(bevp); + r = bevp->rate_limiting->limit.read_limit; + } else { + r = EV_SSIZE_MAX; + } + BEV_UNLOCK(bev); + return r; +} + +/* Mostly you don't want to use this function from inside libevent; + * _bufferevent_get_write_max() is more likely what you want*/ +ev_ssize_t +bufferevent_get_write_limit(struct bufferevent *bev) +{ + ev_ssize_t r; + struct bufferevent_private *bevp; + BEV_LOCK(bev); + bevp = BEV_UPCAST(bev); + if (bevp->rate_limiting && bevp->rate_limiting->cfg) { + bufferevent_update_buckets(bevp); + r = bevp->rate_limiting->limit.write_limit; + } else { + r = EV_SSIZE_MAX; + } + BEV_UNLOCK(bev); + return r; +} + +/* Mostly you don't want to use this function from inside libevent; + * _bufferevent_get_read_max() is more likely what you want*/ +ev_ssize_t +bufferevent_rate_limit_group_get_read_limit( + struct bufferevent_rate_limit_group *grp) +{ + ev_ssize_t r; + LOCK_GROUP(grp); + r = grp->rate_limit.read_limit; + UNLOCK_GROUP(grp); + return r; +} + +/* Mostly you don't want to use this function from inside libevent; + * _bufferevent_get_write_max() is more likely what you want. */ +ev_ssize_t +bufferevent_rate_limit_group_get_write_limit( + struct bufferevent_rate_limit_group *grp) +{ + ev_ssize_t r; + LOCK_GROUP(grp); + r = grp->rate_limit.write_limit; + UNLOCK_GROUP(grp); + return r; +} + +int +bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr) +{ + int r = 0; + ev_int32_t old_limit, new_limit; + struct bufferevent_private *bevp; + BEV_LOCK(bev); + bevp = BEV_UPCAST(bev); + EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg); + old_limit = bevp->rate_limiting->limit.read_limit; + + new_limit = (bevp->rate_limiting->limit.read_limit -= decr); + if (old_limit > 0 && new_limit <= 0) { + bufferevent_suspend_read(bev, BEV_SUSPEND_BW); + if (event_add(&bevp->rate_limiting->refill_bucket_event, + &bevp->rate_limiting->cfg->tick_timeout) < 0) + r = -1; + } else if (old_limit <= 0 && new_limit > 0) { + event_del(&bevp->rate_limiting->refill_bucket_event); + bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW); + } + + BEV_UNLOCK(bev); + return r; +} + +int +bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr) +{ + /* XXXX this is mostly copy-and-paste from + * bufferevent_decrement_read_limit */ + int r = 0; + ev_int32_t old_limit, new_limit; + struct bufferevent_private *bevp; + BEV_LOCK(bev); + bevp = BEV_UPCAST(bev); + EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg); + old_limit = bevp->rate_limiting->limit.write_limit; + + new_limit = (bevp->rate_limiting->limit.write_limit -= decr); + if (old_limit > 0 && new_limit <= 0) { + bufferevent_suspend_write(bev, BEV_SUSPEND_BW); + if (event_add(&bevp->rate_limiting->refill_bucket_event, + &bevp->rate_limiting->cfg->tick_timeout) < 0) + r = -1; + } else if (old_limit <= 0 && new_limit > 0) { + event_del(&bevp->rate_limiting->refill_bucket_event); + bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW); + } + + BEV_UNLOCK(bev); + return r; +} + +int +bufferevent_rate_limit_group_decrement_read( + struct bufferevent_rate_limit_group *grp, ev_ssize_t decr) +{ + int r = 0; + ev_int32_t old_limit, new_limit; + LOCK_GROUP(grp); + old_limit = grp->rate_limit.read_limit; + new_limit = (grp->rate_limit.read_limit -= decr); + + if (old_limit > 0 && new_limit <= 0) { + _bev_group_suspend_reading(grp); + } else if (old_limit <= 0 && new_limit > 0) { + _bev_group_unsuspend_reading(grp); + } + + UNLOCK_GROUP(grp); + return r; +} + +int +bufferevent_rate_limit_group_decrement_write( + struct bufferevent_rate_limit_group *grp, ev_ssize_t decr) +{ + int r = 0; + ev_int32_t old_limit, new_limit; + LOCK_GROUP(grp); + old_limit = grp->rate_limit.write_limit; + new_limit = (grp->rate_limit.write_limit -= decr); + + if (old_limit > 0 && new_limit <= 0) { + _bev_group_suspend_writing(grp); + } else if (old_limit <= 0 && new_limit > 0) { + _bev_group_unsuspend_writing(grp); + } + + UNLOCK_GROUP(grp); + return r; +} + diff --git a/include/event2/bufferevent.h b/include/event2/bufferevent.h index 795607b0..18f5d373 100644 --- a/include/event2/bufferevent.h +++ b/include/event2/bufferevent.h @@ -492,12 +492,12 @@ int bufferevent_pair_new(struct event_base *base, int options, struct bufferevent *pair[2]); - /** Abstract type used to configure rate-limiting on a bufferevent or a group of bufferevents. */ struct ev_token_bucket_cfg; + /** A group of bufferevents which are configured to respect the same rate limit. @@ -545,6 +545,7 @@ void ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg); */ int bufferevent_set_rate_limit(struct bufferevent *bev, struct ev_token_bucket_cfg *cfg); + /** Create a new rate-limit group for bufferevents. A rate-limit group constrains the maximum number of bytes sent and received, in toto, @@ -584,6 +585,62 @@ int bufferevent_add_to_rate_limit_group(struct bufferevent *bev, /** Remove 'bev' from its current rate-limit group (if any). */ int bufferevent_remove_from_rate_limit_group(struct bufferevent *bev); +/*@{*/ +/** + Return the current read or write bucket size for a bufferevent. + If it is not configured with a per-bufferevent ratelimit, return + EV_SSIZE_MAX. This function does not inspect the group limit, if any. + Note that it can return a negative value if the bufferevent has been + made to read or write more than its limit. + */ +ev_ssize_t bufferevent_get_read_limit(struct bufferevent *bev); +ev_ssize_t bufferevent_get_write_limit(struct bufferevent *bev); +/*@}*/ + +/*@{*/ +/** + Return the read or write bucket size for a bufferevent rate limit + group. Note that it can return a negative value if bufferevents in + the group have been made to read or write more than their limits. + */ +ev_ssize_t bufferevent_rate_limit_group_get_read_limit( + struct bufferevent_rate_limit_group *); +ev_ssize_t bufferevent_rate_limit_group_get_write_limit( + struct bufferevent_rate_limit_group *); +/*@}*/ + +/*@{*/ +/** + Subtract a number of bytes from a bufferevent's read or write bucket. + The decrement value can be negative, if you want to manually refill + the bucket. If the change puts the bucket above or below zero, the + bufferevent will resume or suspend reading writing as appropriate. + These functions make no change in the buckets for the bufferevent's + group, if any. + + Returns 0 on success, -1 on internal error. + */ +int bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr); +int bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr); +/*@}*/ + +/*@{*/ +/** + Subtract a number of bytes from a bufferevent rate-limiting group's + read or write bucket. The decrement value can be negative, if you + want to manually refill the bucket. If the change puts the bucket + above or below zero, the bufferevents in the group will resume or + suspend reading writing as appropriate. + + Returns 0 on success, -1 on internal error. + */ +int bufferevent_rate_limit_group_decrement_read( + struct bufferevent_rate_limit_group *, ev_ssize_t); +int bufferevent_rate_limit_group_decrement_write( + struct bufferevent_rate_limit_group *, ev_ssize_t); +/*@}*/ + + #ifdef __cplusplus } #endif