return 1;
}
+static inline void
+bufferevent_update_buckets(struct bufferevent_private *bev)
+{
+ /* Must hold lock on bev. */
+ struct timeval now;
+ unsigned tick;
+ event_base_gettimeofday_cached(bev->bev.ev_base, &now);
+ tick = ev_token_bucket_get_tick(&now, bev->rate_limiting->cfg);
+ if (tick != bev->rate_limiting->limit.last_updated)
+ ev_token_bucket_update(&bev->rate_limiting->limit,
+ bev->rate_limiting->cfg, tick);
+}
+
ev_uint32_t
ev_token_bucket_get_tick(const struct timeval *tv,
const struct ev_token_bucket_cfg *cfg)
{
/* needs lock on bev. */
int max_so_far = is_write?MAX_TO_WRITE_EVER:MAX_TO_READ_EVER;
- struct timeval now;
#define LIM(x) \
(is_write ? (x).write_limit : (x).read_limit)
*/
if (bev->rate_limiting->cfg) {
- unsigned tick;
-
- event_base_gettimeofday_cached(bev->bev.ev_base, &now);
- tick = ev_token_bucket_get_tick(&now, bev->rate_limiting->cfg);
- ev_token_bucket_update(&bev->rate_limiting->limit,
- bev->rate_limiting->cfg, tick);
+ bufferevent_update_buckets(bev);
max_so_far = LIM(bev->rate_limiting->limit);
}
if (bev->rate_limiting->group) {
} \
} while (0)
+static void
+_bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g)
+{
+ int again = 0;
+ struct bufferevent_private *bev, *first;
+
+ g->read_suspended = 0;
+ FOREACH_RANDOM_ORDER({
+ if (EVLOCK_TRY_LOCK(bev->lock)) {
+ bufferevent_unsuspend_read(&bev->bev,
+ BEV_SUSPEND_BW_GROUP);
+ EVLOCK_UNLOCK(bev->lock, 0);
+ } else {
+ again = 1;
+ }
+ });
+ g->pending_unsuspend_read = again;
+}
+
+static void
+_bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g)
+{
+ int again = 0;
+ struct bufferevent_private *bev, *first;
+ g->write_suspended = 0;
+
+ FOREACH_RANDOM_ORDER({
+ if (EVLOCK_TRY_LOCK(bev->lock)) {
+ bufferevent_unsuspend_write(&bev->bev,
+ BEV_SUSPEND_BW_GROUP);
+ EVLOCK_UNLOCK(bev->lock, 0);
+ } else {
+ again = 1;
+ }
+ });
+ g->pending_unsuspend_write = again;
+}
+
/** Callback invoked every tick to add more elements to the group bucket
and unsuspend group members as needed.
*/
struct bufferevent_rate_limit_group *g = arg;
unsigned tick;
struct timeval now;
- int again = 0;
- struct bufferevent_private *bev, *first;
event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
if (g->pending_unsuspend_read ||
(g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
- g->read_suspended = 0;
- again = 0;
- FOREACH_RANDOM_ORDER({
- if (EVLOCK_TRY_LOCK(bev->lock)) {
- bufferevent_unsuspend_read(&bev->bev,
- BEV_SUSPEND_BW_GROUP);
- EVLOCK_UNLOCK(bev->lock, 0);
- } else {
- again = 1;
- }
- });
- g->pending_unsuspend_read = again;
+ _bev_group_unsuspend_reading(g);
}
if (g->pending_unsuspend_write ||
(g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
- g->write_suspended = 0;
- again = 0;
- FOREACH_RANDOM_ORDER({
- if (EVLOCK_TRY_LOCK(bev->lock)) {
- bufferevent_unsuspend_write(&bev->bev,
- BEV_SUSPEND_BW_GROUP);
- EVLOCK_UNLOCK(bev->lock, 0);
- } else {
- again = 1;
- }
- });
- g->pending_unsuspend_write = again;
+ _bev_group_unsuspend_writing(g);
}
/* XXXX Rather than waiting to the next tick to unsuspend stuff
BEV_UNLOCK(bev);
return 0;
}
+
+/* ===
+ * API functions to expose rate limits.
+ *
+ * Don't use these from inside Libevent; they're meant to be for use by
+ * the program.
+ * === */
+
+/* Mostly you don't want to use this function from inside libevent;
+ * _bufferevent_get_read_max() is more likely what you want*/
+ev_ssize_t
+bufferevent_get_read_limit(struct bufferevent *bev)
+{
+ ev_ssize_t r;
+ struct bufferevent_private *bevp;
+ BEV_LOCK(bev);
+ bevp = BEV_UPCAST(bev);
+ if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
+ bufferevent_update_buckets(bevp);
+ r = bevp->rate_limiting->limit.read_limit;
+ } else {
+ r = EV_SSIZE_MAX;
+ }
+ BEV_UNLOCK(bev);
+ return r;
+}
+
+/* Mostly you don't want to use this function from inside libevent;
+ * _bufferevent_get_write_max() is more likely what you want*/
+ev_ssize_t
+bufferevent_get_write_limit(struct bufferevent *bev)
+{
+ ev_ssize_t r;
+ struct bufferevent_private *bevp;
+ BEV_LOCK(bev);
+ bevp = BEV_UPCAST(bev);
+ if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
+ bufferevent_update_buckets(bevp);
+ r = bevp->rate_limiting->limit.write_limit;
+ } else {
+ r = EV_SSIZE_MAX;
+ }
+ BEV_UNLOCK(bev);
+ return r;
+}
+
+/* Mostly you don't want to use this function from inside libevent;
+ * _bufferevent_get_read_max() is more likely what you want*/
+ev_ssize_t
+bufferevent_rate_limit_group_get_read_limit(
+ struct bufferevent_rate_limit_group *grp)
+{
+ ev_ssize_t r;
+ LOCK_GROUP(grp);
+ r = grp->rate_limit.read_limit;
+ UNLOCK_GROUP(grp);
+ return r;
+}
+
+/* Mostly you don't want to use this function from inside libevent;
+ * _bufferevent_get_write_max() is more likely what you want. */
+ev_ssize_t
+bufferevent_rate_limit_group_get_write_limit(
+ struct bufferevent_rate_limit_group *grp)
+{
+ ev_ssize_t r;
+ LOCK_GROUP(grp);
+ r = grp->rate_limit.write_limit;
+ UNLOCK_GROUP(grp);
+ return r;
+}
+
+int
+bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
+{
+ int r = 0;
+ ev_int32_t old_limit, new_limit;
+ struct bufferevent_private *bevp;
+ BEV_LOCK(bev);
+ bevp = BEV_UPCAST(bev);
+ EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
+ old_limit = bevp->rate_limiting->limit.read_limit;
+
+ new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
+ if (old_limit > 0 && new_limit <= 0) {
+ bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
+ if (event_add(&bevp->rate_limiting->refill_bucket_event,
+ &bevp->rate_limiting->cfg->tick_timeout) < 0)
+ r = -1;
+ } else if (old_limit <= 0 && new_limit > 0) {
+ event_del(&bevp->rate_limiting->refill_bucket_event);
+ bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
+ }
+
+ BEV_UNLOCK(bev);
+ return r;
+}
+
+int
+bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
+{
+ /* XXXX this is mostly copy-and-paste from
+ * bufferevent_decrement_read_limit */
+ int r = 0;
+ ev_int32_t old_limit, new_limit;
+ struct bufferevent_private *bevp;
+ BEV_LOCK(bev);
+ bevp = BEV_UPCAST(bev);
+ EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
+ old_limit = bevp->rate_limiting->limit.write_limit;
+
+ new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
+ if (old_limit > 0 && new_limit <= 0) {
+ bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
+ if (event_add(&bevp->rate_limiting->refill_bucket_event,
+ &bevp->rate_limiting->cfg->tick_timeout) < 0)
+ r = -1;
+ } else if (old_limit <= 0 && new_limit > 0) {
+ event_del(&bevp->rate_limiting->refill_bucket_event);
+ bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
+ }
+
+ BEV_UNLOCK(bev);
+ return r;
+}
+
+int
+bufferevent_rate_limit_group_decrement_read(
+ struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
+{
+ int r = 0;
+ ev_int32_t old_limit, new_limit;
+ LOCK_GROUP(grp);
+ old_limit = grp->rate_limit.read_limit;
+ new_limit = (grp->rate_limit.read_limit -= decr);
+
+ if (old_limit > 0 && new_limit <= 0) {
+ _bev_group_suspend_reading(grp);
+ } else if (old_limit <= 0 && new_limit > 0) {
+ _bev_group_unsuspend_reading(grp);
+ }
+
+ UNLOCK_GROUP(grp);
+ return r;
+}
+
+int
+bufferevent_rate_limit_group_decrement_write(
+ struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
+{
+ int r = 0;
+ ev_int32_t old_limit, new_limit;
+ LOCK_GROUP(grp);
+ old_limit = grp->rate_limit.write_limit;
+ new_limit = (grp->rate_limit.write_limit -= decr);
+
+ if (old_limit > 0 && new_limit <= 0) {
+ _bev_group_suspend_writing(grp);
+ } else if (old_limit <= 0 && new_limit > 0) {
+ _bev_group_unsuspend_writing(grp);
+ }
+
+ UNLOCK_GROUP(grp);
+ return r;
+}
+
bufferevent_pair_new(struct event_base *base, int options,
struct bufferevent *pair[2]);
-
/**
Abstract type used to configure rate-limiting on a bufferevent or a group
of bufferevents.
*/
struct ev_token_bucket_cfg;
+
/**
A group of bufferevents which are configured to respect the same rate
limit.
*/
int bufferevent_set_rate_limit(struct bufferevent *bev,
struct ev_token_bucket_cfg *cfg);
+
/**
Create a new rate-limit group for bufferevents. A rate-limit group
constrains the maximum number of bytes sent and received, in toto,
/** Remove 'bev' from its current rate-limit group (if any). */
int bufferevent_remove_from_rate_limit_group(struct bufferevent *bev);
+/*@{*/
+/**
+ Return the current read or write bucket size for a bufferevent.
+ If it is not configured with a per-bufferevent ratelimit, return
+ EV_SSIZE_MAX. This function does not inspect the group limit, if any.
+ Note that it can return a negative value if the bufferevent has been
+ made to read or write more than its limit.
+ */
+ev_ssize_t bufferevent_get_read_limit(struct bufferevent *bev);
+ev_ssize_t bufferevent_get_write_limit(struct bufferevent *bev);
+/*@}*/
+
+/*@{*/
+/**
+ Return the read or write bucket size for a bufferevent rate limit
+ group. Note that it can return a negative value if bufferevents in
+ the group have been made to read or write more than their limits.
+ */
+ev_ssize_t bufferevent_rate_limit_group_get_read_limit(
+ struct bufferevent_rate_limit_group *);
+ev_ssize_t bufferevent_rate_limit_group_get_write_limit(
+ struct bufferevent_rate_limit_group *);
+/*@}*/
+
+/*@{*/
+/**
+ Subtract a number of bytes from a bufferevent's read or write bucket.
+ The decrement value can be negative, if you want to manually refill
+ the bucket. If the change puts the bucket above or below zero, the
+ bufferevent will resume or suspend reading writing as appropriate.
+ These functions make no change in the buckets for the bufferevent's
+ group, if any.
+
+ Returns 0 on success, -1 on internal error.
+ */
+int bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr);
+int bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr);
+/*@}*/
+
+/*@{*/
+/**
+ Subtract a number of bytes from a bufferevent rate-limiting group's
+ read or write bucket. The decrement value can be negative, if you
+ want to manually refill the bucket. If the change puts the bucket
+ above or below zero, the bufferevents in the group will resume or
+ suspend reading writing as appropriate.
+
+ Returns 0 on success, -1 on internal error.
+ */
+int bufferevent_rate_limit_group_decrement_read(
+ struct bufferevent_rate_limit_group *, ev_ssize_t);
+int bufferevent_rate_limit_group_decrement_write(
+ struct bufferevent_rate_limit_group *, ev_ssize_t);
+/*@}*/
+
+
#ifdef __cplusplus
}
#endif