2 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
3 * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * 3. The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 #include "evconfig-private.h"
30 #include <sys/types.h>
35 #include "event2/event.h"
36 #include "event2/event_struct.h"
37 #include "event2/util.h"
38 #include "event2/bufferevent.h"
39 #include "event2/bufferevent_struct.h"
40 #include "event2/buffer.h"
42 #include "ratelim-internal.h"
44 #include "bufferevent-internal.h"
45 #include "mm-internal.h"
46 #include "util-internal.h"
47 #include "event-internal.h"
50 ev_token_bucket_init_(struct ev_token_bucket *bucket,
51 const struct ev_token_bucket_cfg *cfg,
52 ev_uint32_t current_tick,
56 /* on reinitialization, we only clip downwards, since we've
57 already used who-knows-how-much bandwidth this tick. We
58 leave "last_updated" as it is; the next update will add the
59 appropriate amount of bandwidth to the bucket.
61 if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
62 bucket->read_limit = cfg->read_maximum;
63 if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
64 bucket->write_limit = cfg->write_maximum;
66 bucket->read_limit = cfg->read_rate;
67 bucket->write_limit = cfg->write_rate;
68 bucket->last_updated = current_tick;
74 ev_token_bucket_update_(struct ev_token_bucket *bucket,
75 const struct ev_token_bucket_cfg *cfg,
76 ev_uint32_t current_tick)
78 /* It's okay if the tick number overflows, since we'll just
79 * wrap around when we do the unsigned subtraction. */
80 unsigned n_ticks = current_tick - bucket->last_updated;
82 /* Make sure some ticks actually happened, and that time didn't
84 if (n_ticks == 0 || n_ticks > INT_MAX)
87 /* Naively, we would say
88 bucket->limit += n_ticks * cfg->rate;
90 if (bucket->limit > cfg->maximum)
91 bucket->limit = cfg->maximum;
93 But we're worried about overflow, so we do it like this:
96 if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
97 bucket->read_limit = cfg->read_maximum;
99 bucket->read_limit += n_ticks * cfg->read_rate;
102 if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
103 bucket->write_limit = cfg->write_maximum;
105 bucket->write_limit += n_ticks * cfg->write_rate;
108 bucket->last_updated = current_tick;
114 bufferevent_update_buckets(struct bufferevent_private *bev)
116 /* Must hold lock on bev. */
119 event_base_gettimeofday_cached(bev->bev.ev_base, &now);
120 tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);
121 if (tick != bev->rate_limiting->limit.last_updated)
122 ev_token_bucket_update_(&bev->rate_limiting->limit,
123 bev->rate_limiting->cfg, tick);
127 ev_token_bucket_get_tick_(const struct timeval *tv,
128 const struct ev_token_bucket_cfg *cfg)
130 /* This computation uses two multiplies and a divide. We could do
131 * fewer if we knew that the tick length was an integer number of
132 * seconds, or if we knew it divided evenly into a second. We should
133 * investigate that more.
136 /* We cast to an ev_uint64_t first, since we don't want to overflow
137 * before we do the final divide. */
138 ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
139 return (unsigned)(msec / cfg->msec_per_tick);
142 struct ev_token_bucket_cfg *
143 ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
144 size_t write_rate, size_t write_burst,
145 const struct timeval *tick_len)
147 struct ev_token_bucket_cfg *r;
154 if (read_rate > read_burst || write_rate > write_burst ||
155 read_rate < 1 || write_rate < 1)
157 if (read_rate > EV_RATE_LIMIT_MAX ||
158 write_rate > EV_RATE_LIMIT_MAX ||
159 read_burst > EV_RATE_LIMIT_MAX ||
160 write_burst > EV_RATE_LIMIT_MAX)
162 r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
165 r->read_rate = read_rate;
166 r->write_rate = write_rate;
167 r->read_maximum = read_burst;
168 r->write_maximum = write_burst;
169 memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
170 r->msec_per_tick = (tick_len->tv_sec * 1000) +
171 (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
176 ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
181 /* Default values for max_single_read & max_single_write variables. */
182 #define MAX_SINGLE_READ_DEFAULT 16384
183 #define MAX_SINGLE_WRITE_DEFAULT 16384
185 #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
186 #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
188 static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);
189 static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);
190 static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);
191 static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);
193 /** Helper: figure out the maximum amount we should write if is_write, or
194 the maximum amount we should read if is_read. Return that maximum, or
195 0 if our bucket is wholly exhausted.
197 static inline ev_ssize_t
198 bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)
200 /* needs lock on bev. */
201 ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
204 (is_write ? (x).write_limit : (x).read_limit)
206 #define GROUP_SUSPENDED(g) \
207 (is_write ? (g)->write_suspended : (g)->read_suspended)
209 /* Sets max_so_far to MIN(x, max_so_far) */
212 if (max_so_far > (x)) \
216 if (!bev->rate_limiting)
219 /* If rate-limiting is enabled at all, update the appropriate
220 bucket, and take the smaller of our rate limit and the group
224 if (bev->rate_limiting->cfg) {
225 bufferevent_update_buckets(bev);
226 max_so_far = LIM(bev->rate_limiting->limit);
228 if (bev->rate_limiting->group) {
229 struct bufferevent_rate_limit_group *g =
230 bev->rate_limiting->group;
233 if (GROUP_SUSPENDED(g)) {
234 /* We can get here if we failed to lock this
235 * particular bufferevent while suspending the whole
238 bufferevent_suspend_write_(&bev->bev,
239 BEV_SUSPEND_BW_GROUP);
241 bufferevent_suspend_read_(&bev->bev,
242 BEV_SUSPEND_BW_GROUP);
245 /* XXXX probably we should divide among the active
246 * members, not the total members. */
247 share = LIM(g->rate_limit) / g->n_members;
248 if (share < g->min_share)
249 share = g->min_share;
261 bufferevent_get_read_max_(struct bufferevent_private *bev)
263 return bufferevent_get_rlim_max_(bev, 0);
267 bufferevent_get_write_max_(struct bufferevent_private *bev)
269 return bufferevent_get_rlim_max_(bev, 1);
273 bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
275 /* XXXXX Make sure all users of this function check its return value */
277 /* need to hold lock on bev */
278 if (!bev->rate_limiting)
281 if (bev->rate_limiting->cfg) {
282 bev->rate_limiting->limit.read_limit -= bytes;
283 if (bev->rate_limiting->limit.read_limit <= 0) {
284 bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);
285 if (event_add(&bev->rate_limiting->refill_bucket_event,
286 &bev->rate_limiting->cfg->tick_timeout) < 0)
288 } else if (bev->read_suspended & BEV_SUSPEND_BW) {
289 if (!(bev->write_suspended & BEV_SUSPEND_BW))
290 event_del(&bev->rate_limiting->refill_bucket_event);
291 bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
295 if (bev->rate_limiting->group) {
296 LOCK_GROUP(bev->rate_limiting->group);
297 bev->rate_limiting->group->rate_limit.read_limit -= bytes;
298 bev->rate_limiting->group->total_read += bytes;
299 if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
300 bev_group_suspend_reading_(bev->rate_limiting->group);
301 } else if (bev->rate_limiting->group->read_suspended) {
302 bev_group_unsuspend_reading_(bev->rate_limiting->group);
304 UNLOCK_GROUP(bev->rate_limiting->group);
311 bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
313 /* XXXXX Make sure all users of this function check its return value */
315 /* need to hold lock */
316 if (!bev->rate_limiting)
319 if (bev->rate_limiting->cfg) {
320 bev->rate_limiting->limit.write_limit -= bytes;
321 if (bev->rate_limiting->limit.write_limit <= 0) {
322 bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);
323 if (event_add(&bev->rate_limiting->refill_bucket_event,
324 &bev->rate_limiting->cfg->tick_timeout) < 0)
326 } else if (bev->write_suspended & BEV_SUSPEND_BW) {
327 if (!(bev->read_suspended & BEV_SUSPEND_BW))
328 event_del(&bev->rate_limiting->refill_bucket_event);
329 bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
333 if (bev->rate_limiting->group) {
334 LOCK_GROUP(bev->rate_limiting->group);
335 bev->rate_limiting->group->rate_limit.write_limit -= bytes;
336 bev->rate_limiting->group->total_written += bytes;
337 if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
338 bev_group_suspend_writing_(bev->rate_limiting->group);
339 } else if (bev->rate_limiting->group->write_suspended) {
340 bev_group_unsuspend_writing_(bev->rate_limiting->group);
342 UNLOCK_GROUP(bev->rate_limiting->group);
348 /** Stop reading on every bufferevent in <b>g</b> */
350 bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)
352 /* Needs group lock */
353 struct bufferevent_private *bev;
354 g->read_suspended = 1;
355 g->pending_unsuspend_read = 0;
357 /* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,
358 to prevent a deadlock. (Ordinarily, the group lock nests inside
359 the bufferevent locks. If we are unable to lock any individual
360 bufferevent, it will find out later when it looks at its limit
361 and sees that its group is suspended.)
363 LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
364 if (EVLOCK_TRY_LOCK_(bev->lock)) {
365 bufferevent_suspend_read_(&bev->bev,
366 BEV_SUSPEND_BW_GROUP);
367 EVLOCK_UNLOCK(bev->lock, 0);
373 /** Stop writing on every bufferevent in <b>g</b> */
375 bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)
377 /* Needs group lock */
378 struct bufferevent_private *bev;
379 g->write_suspended = 1;
380 g->pending_unsuspend_write = 0;
381 LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
382 if (EVLOCK_TRY_LOCK_(bev->lock)) {
383 bufferevent_suspend_write_(&bev->bev,
384 BEV_SUSPEND_BW_GROUP);
385 EVLOCK_UNLOCK(bev->lock, 0);
391 /** Timer callback invoked on a single bufferevent with one or more exhausted
392 buckets when they are ready to refill. */
394 bev_refill_callback_(evutil_socket_t fd, short what, void *arg)
398 struct bufferevent_private *bev = arg;
401 if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
402 BEV_UNLOCK(&bev->bev);
406 /* First, update the bucket */
407 event_base_gettimeofday_cached(bev->bev.ev_base, &now);
408 tick = ev_token_bucket_get_tick_(&now,
409 bev->rate_limiting->cfg);
410 ev_token_bucket_update_(&bev->rate_limiting->limit,
411 bev->rate_limiting->cfg,
414 /* Now unsuspend any read/write operations as appropriate. */
415 if ((bev->read_suspended & BEV_SUSPEND_BW)) {
416 if (bev->rate_limiting->limit.read_limit > 0)
417 bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
421 if ((bev->write_suspended & BEV_SUSPEND_BW)) {
422 if (bev->rate_limiting->limit.write_limit > 0)
423 bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
428 /* One or more of the buckets may need another refill if they
431 XXXX if we need to be quiet for more ticks, we should
432 maybe figure out what timeout we really want.
434 /* XXXX Handle event_add failure somehow */
435 event_add(&bev->rate_limiting->refill_bucket_event,
436 &bev->rate_limiting->cfg->tick_timeout);
438 BEV_UNLOCK(&bev->bev);
441 /** Helper: grab a random element from a bufferevent group.
443 * Requires that we hold the lock on the group.
445 static struct bufferevent_private *
446 bev_group_random_element_(struct bufferevent_rate_limit_group *group)
449 struct bufferevent_private *bev;
451 /* requires group lock */
453 if (!group->n_members)
456 EVUTIL_ASSERT(! LIST_EMPTY(&group->members));
458 which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);
460 bev = LIST_FIRST(&group->members);
462 bev = LIST_NEXT(bev, rate_limiting->next_in_group);
467 /** Iterate over the elements of a rate-limiting group 'g' with a random
468 starting point, assigning each to the variable 'bev', and executing the
471 We do this in a half-baked effort to get fairness among group members.
472 XXX Round-robin or some kind of priority queue would be even more fair.
474 #define FOREACH_RANDOM_ORDER(block) \
476 first = bev_group_random_element_(g); \
477 for (bev = first; bev != LIST_END(&g->members); \
478 bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
481 for (bev = LIST_FIRST(&g->members); bev && bev != first; \
482 bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
488 bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)
491 struct bufferevent_private *bev, *first;
493 g->read_suspended = 0;
494 FOREACH_RANDOM_ORDER({
495 if (EVLOCK_TRY_LOCK_(bev->lock)) {
496 bufferevent_unsuspend_read_(&bev->bev,
497 BEV_SUSPEND_BW_GROUP);
498 EVLOCK_UNLOCK(bev->lock, 0);
503 g->pending_unsuspend_read = again;
507 bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)
510 struct bufferevent_private *bev, *first;
511 g->write_suspended = 0;
513 FOREACH_RANDOM_ORDER({
514 if (EVLOCK_TRY_LOCK_(bev->lock)) {
515 bufferevent_unsuspend_write_(&bev->bev,
516 BEV_SUSPEND_BW_GROUP);
517 EVLOCK_UNLOCK(bev->lock, 0);
522 g->pending_unsuspend_write = again;
525 /** Callback invoked every tick to add more elements to the group bucket
526 and unsuspend group members as needed.
529 bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)
531 struct bufferevent_rate_limit_group *g = arg;
535 event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
539 tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);
540 ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);
542 if (g->pending_unsuspend_read ||
543 (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
544 bev_group_unsuspend_reading_(g);
546 if (g->pending_unsuspend_write ||
547 (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
548 bev_group_unsuspend_writing_(g);
551 /* XXXX Rather than waiting to the next tick to unsuspend stuff
552 * with pending_unsuspend_write/read, we should do it on the
553 * next iteration of the mainloop.
560 bufferevent_set_rate_limit(struct bufferevent *bev,
561 struct ev_token_bucket_cfg *cfg)
563 struct bufferevent_private *bevp = BEV_UPCAST(bev);
565 struct bufferevent_rate_limit *rlim;
568 int reinit = 0, suspended = 0;
569 /* XXX reference-count cfg */
574 if (bevp->rate_limiting) {
575 rlim = bevp->rate_limiting;
577 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
578 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
579 if (event_initialized(&rlim->refill_bucket_event))
580 event_del(&rlim->refill_bucket_event);
586 event_base_gettimeofday_cached(bev->ev_base, &now);
587 tick = ev_token_bucket_get_tick_(&now, cfg);
589 if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
594 if (bevp->rate_limiting == NULL) {
595 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
598 bevp->rate_limiting = rlim;
600 rlim = bevp->rate_limiting;
602 reinit = rlim->cfg != NULL;
605 ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);
608 EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
609 event_del(&rlim->refill_bucket_event);
611 event_assign(&rlim->refill_bucket_event, bev->ev_base,
612 -1, EV_FINALIZE, bev_refill_callback_, bevp);
614 if (rlim->limit.read_limit > 0) {
615 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
617 bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
620 if (rlim->limit.write_limit > 0) {
621 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
623 bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
628 event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
637 struct bufferevent_rate_limit_group *
638 bufferevent_rate_limit_group_new(struct event_base *base,
639 const struct ev_token_bucket_cfg *cfg)
641 struct bufferevent_rate_limit_group *g;
645 event_base_gettimeofday_cached(base, &now);
646 tick = ev_token_bucket_get_tick_(&now, cfg);
648 g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
651 memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
652 LIST_INIT(&g->members);
654 ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
656 event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
657 bev_group_refill_callback_, g);
658 /*XXXX handle event_add failure */
659 event_add(&g->master_refill_event, &cfg->tick_timeout);
661 EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
663 bufferevent_rate_limit_group_set_min_share(g, 64);
665 evutil_weakrand_seed_(&g->weakrand_seed,
666 (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));
672 bufferevent_rate_limit_group_set_cfg(
673 struct bufferevent_rate_limit_group *g,
674 const struct ev_token_bucket_cfg *cfg)
681 same_tick = evutil_timercmp(
682 &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
683 memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
685 if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
686 g->rate_limit.read_limit = cfg->read_maximum;
687 if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
688 g->rate_limit.write_limit = cfg->write_maximum;
691 /* This can cause a hiccup in the schedule */
692 event_add(&g->master_refill_event, &cfg->tick_timeout);
695 /* The new limits might force us to adjust min_share differently. */
696 bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
703 bufferevent_rate_limit_group_set_min_share(
704 struct bufferevent_rate_limit_group *g,
707 if (share > EV_SSIZE_MAX)
710 g->configured_min_share = share;
712 /* Can't set share to less than the one-tick maximum. IOW, at steady
713 * state, at least one connection can go per tick. */
714 if (share > g->rate_limit_cfg.read_rate)
715 share = g->rate_limit_cfg.read_rate;
716 if (share > g->rate_limit_cfg.write_rate)
717 share = g->rate_limit_cfg.write_rate;
719 g->min_share = share;
724 bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
727 EVUTIL_ASSERT(0 == g->n_members);
728 event_del(&g->master_refill_event);
730 EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
735 bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
736 struct bufferevent_rate_limit_group *g)
738 int wsuspend, rsuspend;
739 struct bufferevent_private *bevp = BEV_UPCAST(bev);
742 if (!bevp->rate_limiting) {
743 struct bufferevent_rate_limit *rlim;
744 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
749 event_assign(&rlim->refill_bucket_event, bev->ev_base,
750 -1, EV_FINALIZE, bev_refill_callback_, bevp);
751 bevp->rate_limiting = rlim;
754 if (bevp->rate_limiting->group == g) {
758 if (bevp->rate_limiting->group)
759 bufferevent_remove_from_rate_limit_group(bev);
762 bevp->rate_limiting->group = g;
764 LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);
766 rsuspend = g->read_suspended;
767 wsuspend = g->write_suspended;
772 bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);
774 bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);
781 bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
783 return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);
787 bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,
790 struct bufferevent_private *bevp = BEV_UPCAST(bev);
792 if (bevp->rate_limiting && bevp->rate_limiting->group) {
793 struct bufferevent_rate_limit_group *g =
794 bevp->rate_limiting->group;
796 bevp->rate_limiting->group = NULL;
798 LIST_REMOVE(bevp, rate_limiting->next_in_group);
802 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);
803 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);
810 * API functions to expose rate limits.
812 * Don't use these from inside Libevent; they're meant to be for use by
816 /* Mostly you don't want to use this function from inside libevent;
817 * bufferevent_get_read_max_() is more likely what you want*/
819 bufferevent_get_read_limit(struct bufferevent *bev)
822 struct bufferevent_private *bevp;
824 bevp = BEV_UPCAST(bev);
825 if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
826 bufferevent_update_buckets(bevp);
827 r = bevp->rate_limiting->limit.read_limit;
835 /* Mostly you don't want to use this function from inside libevent;
836 * bufferevent_get_write_max_() is more likely what you want*/
838 bufferevent_get_write_limit(struct bufferevent *bev)
841 struct bufferevent_private *bevp;
843 bevp = BEV_UPCAST(bev);
844 if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
845 bufferevent_update_buckets(bevp);
846 r = bevp->rate_limiting->limit.write_limit;
855 bufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
857 struct bufferevent_private *bevp;
860 bevp = BEV_UPCAST(bev);
861 if (size == 0 || size > EV_SSIZE_MAX)
862 bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
864 bevp->max_single_read = size;
865 ret = evbuffer_set_max_read(bev->input, bevp->max_single_read);
871 bufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
873 struct bufferevent_private *bevp;
875 bevp = BEV_UPCAST(bev);
876 if (size == 0 || size > EV_SSIZE_MAX)
877 bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
879 bevp->max_single_write = size;
885 bufferevent_get_max_single_read(struct bufferevent *bev)
890 r = BEV_UPCAST(bev)->max_single_read;
896 bufferevent_get_max_single_write(struct bufferevent *bev)
901 r = BEV_UPCAST(bev)->max_single_write;
907 bufferevent_get_max_to_read(struct bufferevent *bev)
911 r = bufferevent_get_read_max_(BEV_UPCAST(bev));
917 bufferevent_get_max_to_write(struct bufferevent *bev)
921 r = bufferevent_get_write_max_(BEV_UPCAST(bev));
926 const struct ev_token_bucket_cfg *
927 bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) {
928 struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
929 struct ev_token_bucket_cfg *cfg;
933 if (bufev_private->rate_limiting) {
934 cfg = bufev_private->rate_limiting->cfg;
944 /* Mostly you don't want to use this function from inside libevent;
945 * bufferevent_get_read_max_() is more likely what you want*/
947 bufferevent_rate_limit_group_get_read_limit(
948 struct bufferevent_rate_limit_group *grp)
952 r = grp->rate_limit.read_limit;
957 /* Mostly you don't want to use this function from inside libevent;
958 * bufferevent_get_write_max_() is more likely what you want. */
960 bufferevent_rate_limit_group_get_write_limit(
961 struct bufferevent_rate_limit_group *grp)
965 r = grp->rate_limit.write_limit;
971 bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
974 ev_ssize_t old_limit, new_limit;
975 struct bufferevent_private *bevp;
977 bevp = BEV_UPCAST(bev);
978 EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
979 old_limit = bevp->rate_limiting->limit.read_limit;
981 new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
982 if (old_limit > 0 && new_limit <= 0) {
983 bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
984 if (event_add(&bevp->rate_limiting->refill_bucket_event,
985 &bevp->rate_limiting->cfg->tick_timeout) < 0)
987 } else if (old_limit <= 0 && new_limit > 0) {
988 if (!(bevp->write_suspended & BEV_SUSPEND_BW))
989 event_del(&bevp->rate_limiting->refill_bucket_event);
990 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
998 bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
1000 /* XXXX this is mostly copy-and-paste from
1001 * bufferevent_decrement_read_limit */
1003 ev_ssize_t old_limit, new_limit;
1004 struct bufferevent_private *bevp;
1006 bevp = BEV_UPCAST(bev);
1007 EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
1008 old_limit = bevp->rate_limiting->limit.write_limit;
1010 new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
1011 if (old_limit > 0 && new_limit <= 0) {
1012 bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
1013 if (event_add(&bevp->rate_limiting->refill_bucket_event,
1014 &bevp->rate_limiting->cfg->tick_timeout) < 0)
1016 } else if (old_limit <= 0 && new_limit > 0) {
1017 if (!(bevp->read_suspended & BEV_SUSPEND_BW))
1018 event_del(&bevp->rate_limiting->refill_bucket_event);
1019 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
1027 bufferevent_rate_limit_group_decrement_read(
1028 struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1031 ev_ssize_t old_limit, new_limit;
1033 old_limit = grp->rate_limit.read_limit;
1034 new_limit = (grp->rate_limit.read_limit -= decr);
1036 if (old_limit > 0 && new_limit <= 0) {
1037 bev_group_suspend_reading_(grp);
1038 } else if (old_limit <= 0 && new_limit > 0) {
1039 bev_group_unsuspend_reading_(grp);
1047 bufferevent_rate_limit_group_decrement_write(
1048 struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1051 ev_ssize_t old_limit, new_limit;
1053 old_limit = grp->rate_limit.write_limit;
1054 new_limit = (grp->rate_limit.write_limit -= decr);
1056 if (old_limit > 0 && new_limit <= 0) {
1057 bev_group_suspend_writing_(grp);
1058 } else if (old_limit <= 0 && new_limit > 0) {
1059 bev_group_unsuspend_writing_(grp);
1067 bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
1068 ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
1070 EVUTIL_ASSERT(grp != NULL);
1072 *total_read_out = grp->total_read;
1073 if (total_written_out)
1074 *total_written_out = grp->total_written;
1078 bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
1080 grp->total_read = grp->total_written = 0;
1084 bufferevent_ratelim_init_(struct bufferevent_private *bev)
1086 bev->rate_limiting = NULL;
1087 bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
1088 bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
1090 if (evbuffer_set_max_read(bev->bev.input, bev->max_single_read))