]> granicus.if.org Git - libevent/blob - bufferevent_ratelim.c
epoll: use epoll_pwait2() if available
[libevent] / bufferevent_ratelim.c
1 /*
2  * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
3  * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. The name of the author may not be used to endorse or promote products
15  *    derived from this software without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27  */
28 #include "evconfig-private.h"
29
30 #include <sys/types.h>
31 #include <limits.h>
32 #include <string.h>
33 #include <stdlib.h>
34
35 #include "event2/event.h"
36 #include "event2/event_struct.h"
37 #include "event2/util.h"
38 #include "event2/bufferevent.h"
39 #include "event2/bufferevent_struct.h"
40 #include "event2/buffer.h"
41
42 #include "ratelim-internal.h"
43
44 #include "bufferevent-internal.h"
45 #include "mm-internal.h"
46 #include "util-internal.h"
47 #include "event-internal.h"
48
49 int
50 ev_token_bucket_init_(struct ev_token_bucket *bucket,
51     const struct ev_token_bucket_cfg *cfg,
52     ev_uint32_t current_tick,
53     int reinitialize)
54 {
55         if (reinitialize) {
56                 /* on reinitialization, we only clip downwards, since we've
57                    already used who-knows-how-much bandwidth this tick.  We
58                    leave "last_updated" as it is; the next update will add the
59                    appropriate amount of bandwidth to the bucket.
60                 */
61                 if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
62                         bucket->read_limit = cfg->read_maximum;
63                 if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
64                         bucket->write_limit = cfg->write_maximum;
65         } else {
66                 bucket->read_limit = cfg->read_rate;
67                 bucket->write_limit = cfg->write_rate;
68                 bucket->last_updated = current_tick;
69         }
70         return 0;
71 }
72
73 int
74 ev_token_bucket_update_(struct ev_token_bucket *bucket,
75     const struct ev_token_bucket_cfg *cfg,
76     ev_uint32_t current_tick)
77 {
78         /* It's okay if the tick number overflows, since we'll just
79          * wrap around when we do the unsigned subtraction. */
80         unsigned n_ticks = current_tick - bucket->last_updated;
81
82         /* Make sure some ticks actually happened, and that time didn't
83          * roll back. */
84         if (n_ticks == 0 || n_ticks > INT_MAX)
85                 return 0;
86
87         /* Naively, we would say
88                 bucket->limit += n_ticks * cfg->rate;
89
90                 if (bucket->limit > cfg->maximum)
91                         bucket->limit = cfg->maximum;
92
93            But we're worried about overflow, so we do it like this:
94         */
95
96         if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
97                 bucket->read_limit = cfg->read_maximum;
98         else
99                 bucket->read_limit += n_ticks * cfg->read_rate;
100
101
102         if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
103                 bucket->write_limit = cfg->write_maximum;
104         else
105                 bucket->write_limit += n_ticks * cfg->write_rate;
106
107
108         bucket->last_updated = current_tick;
109
110         return 1;
111 }
112
113 static inline void
114 bufferevent_update_buckets(struct bufferevent_private *bev)
115 {
116         /* Must hold lock on bev. */
117         struct timeval now;
118         unsigned tick;
119         event_base_gettimeofday_cached(bev->bev.ev_base, &now);
120         tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);
121         if (tick != bev->rate_limiting->limit.last_updated)
122                 ev_token_bucket_update_(&bev->rate_limiting->limit,
123                     bev->rate_limiting->cfg, tick);
124 }
125
126 ev_uint32_t
127 ev_token_bucket_get_tick_(const struct timeval *tv,
128     const struct ev_token_bucket_cfg *cfg)
129 {
130         /* This computation uses two multiplies and a divide.  We could do
131          * fewer if we knew that the tick length was an integer number of
132          * seconds, or if we knew it divided evenly into a second.  We should
133          * investigate that more.
134          */
135
136         /* We cast to an ev_uint64_t first, since we don't want to overflow
137          * before we do the final divide. */
138         ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
139         return (unsigned)(msec / cfg->msec_per_tick);
140 }
141
142 struct ev_token_bucket_cfg *
143 ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
144     size_t write_rate, size_t write_burst,
145     const struct timeval *tick_len)
146 {
147         struct ev_token_bucket_cfg *r;
148         struct timeval g;
149         if (! tick_len) {
150                 g.tv_sec = 1;
151                 g.tv_usec = 0;
152                 tick_len = &g;
153         }
154         if (read_rate > read_burst || write_rate > write_burst ||
155             read_rate < 1 || write_rate < 1)
156                 return NULL;
157         if (read_rate > EV_RATE_LIMIT_MAX ||
158             write_rate > EV_RATE_LIMIT_MAX ||
159             read_burst > EV_RATE_LIMIT_MAX ||
160             write_burst > EV_RATE_LIMIT_MAX)
161                 return NULL;
162         r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
163         if (!r)
164                 return NULL;
165         r->read_rate = read_rate;
166         r->write_rate = write_rate;
167         r->read_maximum = read_burst;
168         r->write_maximum = write_burst;
169         memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
170         r->msec_per_tick = (tick_len->tv_sec * 1000) +
171             (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
172         return r;
173 }
174
175 void
176 ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
177 {
178         mm_free(cfg);
179 }
180
181 /* Default values for max_single_read & max_single_write variables. */
182 #define MAX_SINGLE_READ_DEFAULT 16384
183 #define MAX_SINGLE_WRITE_DEFAULT 16384
184
185 #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
186 #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
187
188 static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);
189 static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);
190 static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);
191 static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);
192
193 /** Helper: figure out the maximum amount we should write if is_write, or
194     the maximum amount we should read if is_read.  Return that maximum, or
195     0 if our bucket is wholly exhausted.
196  */
197 static inline ev_ssize_t
198 bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)
199 {
200         /* needs lock on bev. */
201         ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
202
203 #define LIM(x)                                          \
204         (is_write ? (x).write_limit : (x).read_limit)
205
206 #define GROUP_SUSPENDED(g)                      \
207         (is_write ? (g)->write_suspended : (g)->read_suspended)
208
209         /* Sets max_so_far to MIN(x, max_so_far) */
210 #define CLAMPTO(x)                              \
211         do {                                    \
212                 if (max_so_far > (x))           \
213                         max_so_far = (x);       \
214         } while (0);
215
216         if (!bev->rate_limiting)
217                 return max_so_far;
218
219         /* If rate-limiting is enabled at all, update the appropriate
220            bucket, and take the smaller of our rate limit and the group
221            rate limit.
222          */
223
224         if (bev->rate_limiting->cfg) {
225                 bufferevent_update_buckets(bev);
226                 max_so_far = LIM(bev->rate_limiting->limit);
227         }
228         if (bev->rate_limiting->group) {
229                 struct bufferevent_rate_limit_group *g =
230                     bev->rate_limiting->group;
231                 ev_ssize_t share;
232                 LOCK_GROUP(g);
233                 if (GROUP_SUSPENDED(g)) {
234                         /* We can get here if we failed to lock this
235                          * particular bufferevent while suspending the whole
236                          * group. */
237                         if (is_write)
238                                 bufferevent_suspend_write_(&bev->bev,
239                                     BEV_SUSPEND_BW_GROUP);
240                         else
241                                 bufferevent_suspend_read_(&bev->bev,
242                                     BEV_SUSPEND_BW_GROUP);
243                         share = 0;
244                 } else {
245                         /* XXXX probably we should divide among the active
246                          * members, not the total members. */
247                         share = LIM(g->rate_limit) / g->n_members;
248                         if (share < g->min_share)
249                                 share = g->min_share;
250                 }
251                 UNLOCK_GROUP(g);
252                 CLAMPTO(share);
253         }
254
255         if (max_so_far < 0)
256                 max_so_far = 0;
257         return max_so_far;
258 }
259
260 ev_ssize_t
261 bufferevent_get_read_max_(struct bufferevent_private *bev)
262 {
263         return bufferevent_get_rlim_max_(bev, 0);
264 }
265
266 ev_ssize_t
267 bufferevent_get_write_max_(struct bufferevent_private *bev)
268 {
269         return bufferevent_get_rlim_max_(bev, 1);
270 }
271
272 int
273 bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
274 {
275         /* XXXXX Make sure all users of this function check its return value */
276         int r = 0;
277         /* need to hold lock on bev */
278         if (!bev->rate_limiting)
279                 return 0;
280
281         if (bev->rate_limiting->cfg) {
282                 bev->rate_limiting->limit.read_limit -= bytes;
283                 if (bev->rate_limiting->limit.read_limit <= 0) {
284                         bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);
285                         if (event_add(&bev->rate_limiting->refill_bucket_event,
286                                 &bev->rate_limiting->cfg->tick_timeout) < 0)
287                                 r = -1;
288                 } else if (bev->read_suspended & BEV_SUSPEND_BW) {
289                         if (!(bev->write_suspended & BEV_SUSPEND_BW))
290                                 event_del(&bev->rate_limiting->refill_bucket_event);
291                         bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
292                 }
293         }
294
295         if (bev->rate_limiting->group) {
296                 LOCK_GROUP(bev->rate_limiting->group);
297                 bev->rate_limiting->group->rate_limit.read_limit -= bytes;
298                 bev->rate_limiting->group->total_read += bytes;
299                 if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
300                         bev_group_suspend_reading_(bev->rate_limiting->group);
301                 } else if (bev->rate_limiting->group->read_suspended) {
302                         bev_group_unsuspend_reading_(bev->rate_limiting->group);
303                 }
304                 UNLOCK_GROUP(bev->rate_limiting->group);
305         }
306
307         return r;
308 }
309
310 int
311 bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
312 {
313         /* XXXXX Make sure all users of this function check its return value */
314         int r = 0;
315         /* need to hold lock */
316         if (!bev->rate_limiting)
317                 return 0;
318
319         if (bev->rate_limiting->cfg) {
320                 bev->rate_limiting->limit.write_limit -= bytes;
321                 if (bev->rate_limiting->limit.write_limit <= 0) {
322                         bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);
323                         if (event_add(&bev->rate_limiting->refill_bucket_event,
324                                 &bev->rate_limiting->cfg->tick_timeout) < 0)
325                                 r = -1;
326                 } else if (bev->write_suspended & BEV_SUSPEND_BW) {
327                         if (!(bev->read_suspended & BEV_SUSPEND_BW))
328                                 event_del(&bev->rate_limiting->refill_bucket_event);
329                         bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
330                 }
331         }
332
333         if (bev->rate_limiting->group) {
334                 LOCK_GROUP(bev->rate_limiting->group);
335                 bev->rate_limiting->group->rate_limit.write_limit -= bytes;
336                 bev->rate_limiting->group->total_written += bytes;
337                 if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
338                         bev_group_suspend_writing_(bev->rate_limiting->group);
339                 } else if (bev->rate_limiting->group->write_suspended) {
340                         bev_group_unsuspend_writing_(bev->rate_limiting->group);
341                 }
342                 UNLOCK_GROUP(bev->rate_limiting->group);
343         }
344
345         return r;
346 }
347
348 /** Stop reading on every bufferevent in <b>g</b> */
349 static int
350 bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)
351 {
352         /* Needs group lock */
353         struct bufferevent_private *bev;
354         g->read_suspended = 1;
355         g->pending_unsuspend_read = 0;
356
357         /* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,
358            to prevent a deadlock.  (Ordinarily, the group lock nests inside
359            the bufferevent locks.  If we are unable to lock any individual
360            bufferevent, it will find out later when it looks at its limit
361            and sees that its group is suspended.)
362         */
363         LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
364                 if (EVLOCK_TRY_LOCK_(bev->lock)) {
365                         bufferevent_suspend_read_(&bev->bev,
366                             BEV_SUSPEND_BW_GROUP);
367                         EVLOCK_UNLOCK(bev->lock, 0);
368                 }
369         }
370         return 0;
371 }
372
373 /** Stop writing on every bufferevent in <b>g</b> */
374 static int
375 bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)
376 {
377         /* Needs group lock */
378         struct bufferevent_private *bev;
379         g->write_suspended = 1;
380         g->pending_unsuspend_write = 0;
381         LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
382                 if (EVLOCK_TRY_LOCK_(bev->lock)) {
383                         bufferevent_suspend_write_(&bev->bev,
384                             BEV_SUSPEND_BW_GROUP);
385                         EVLOCK_UNLOCK(bev->lock, 0);
386                 }
387         }
388         return 0;
389 }
390
391 /** Timer callback invoked on a single bufferevent with one or more exhausted
392     buckets when they are ready to refill. */
393 static void
394 bev_refill_callback_(evutil_socket_t fd, short what, void *arg)
395 {
396         unsigned tick;
397         struct timeval now;
398         struct bufferevent_private *bev = arg;
399         int again = 0;
400         BEV_LOCK(&bev->bev);
401         if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
402                 BEV_UNLOCK(&bev->bev);
403                 return;
404         }
405
406         /* First, update the bucket */
407         event_base_gettimeofday_cached(bev->bev.ev_base, &now);
408         tick = ev_token_bucket_get_tick_(&now,
409             bev->rate_limiting->cfg);
410         ev_token_bucket_update_(&bev->rate_limiting->limit,
411             bev->rate_limiting->cfg,
412             tick);
413
414         /* Now unsuspend any read/write operations as appropriate. */
415         if ((bev->read_suspended & BEV_SUSPEND_BW)) {
416                 if (bev->rate_limiting->limit.read_limit > 0)
417                         bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
418                 else
419                         again = 1;
420         }
421         if ((bev->write_suspended & BEV_SUSPEND_BW)) {
422                 if (bev->rate_limiting->limit.write_limit > 0)
423                         bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
424                 else
425                         again = 1;
426         }
427         if (again) {
428                 /* One or more of the buckets may need another refill if they
429                    started negative.
430
431                    XXXX if we need to be quiet for more ticks, we should
432                    maybe figure out what timeout we really want.
433                 */
434                 /* XXXX Handle event_add failure somehow */
435                 event_add(&bev->rate_limiting->refill_bucket_event,
436                     &bev->rate_limiting->cfg->tick_timeout);
437         }
438         BEV_UNLOCK(&bev->bev);
439 }
440
441 /** Helper: grab a random element from a bufferevent group.
442  *
443  * Requires that we hold the lock on the group.
444  */
445 static struct bufferevent_private *
446 bev_group_random_element_(struct bufferevent_rate_limit_group *group)
447 {
448         int which;
449         struct bufferevent_private *bev;
450
451         /* requires group lock */
452
453         if (!group->n_members)
454                 return NULL;
455
456         EVUTIL_ASSERT(! LIST_EMPTY(&group->members));
457
458         which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);
459
460         bev = LIST_FIRST(&group->members);
461         while (which--)
462                 bev = LIST_NEXT(bev, rate_limiting->next_in_group);
463
464         return bev;
465 }
466
467 /** Iterate over the elements of a rate-limiting group 'g' with a random
468     starting point, assigning each to the variable 'bev', and executing the
469     block 'block'.
470
471     We do this in a half-baked effort to get fairness among group members.
472     XXX Round-robin or some kind of priority queue would be even more fair.
473  */
474 #define FOREACH_RANDOM_ORDER(block)                     \
475         do {                                            \
476                 first = bev_group_random_element_(g);   \
477                 for (bev = first; bev != LIST_END(&g->members); \
478                     bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
479                         block ;                                  \
480                 }                                                \
481                 for (bev = LIST_FIRST(&g->members); bev && bev != first; \
482                     bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
483                         block ;                                         \
484                 }                                                       \
485         } while (0)
486
487 static void
488 bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)
489 {
490         int again = 0;
491         struct bufferevent_private *bev, *first;
492
493         g->read_suspended = 0;
494         FOREACH_RANDOM_ORDER({
495                 if (EVLOCK_TRY_LOCK_(bev->lock)) {
496                         bufferevent_unsuspend_read_(&bev->bev,
497                             BEV_SUSPEND_BW_GROUP);
498                         EVLOCK_UNLOCK(bev->lock, 0);
499                 } else {
500                         again = 1;
501                 }
502         });
503         g->pending_unsuspend_read = again;
504 }
505
506 static void
507 bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)
508 {
509         int again = 0;
510         struct bufferevent_private *bev, *first;
511         g->write_suspended = 0;
512
513         FOREACH_RANDOM_ORDER({
514                 if (EVLOCK_TRY_LOCK_(bev->lock)) {
515                         bufferevent_unsuspend_write_(&bev->bev,
516                             BEV_SUSPEND_BW_GROUP);
517                         EVLOCK_UNLOCK(bev->lock, 0);
518                 } else {
519                         again = 1;
520                 }
521         });
522         g->pending_unsuspend_write = again;
523 }
524
525 /** Callback invoked every tick to add more elements to the group bucket
526     and unsuspend group members as needed.
527  */
528 static void
529 bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)
530 {
531         struct bufferevent_rate_limit_group *g = arg;
532         unsigned tick;
533         struct timeval now;
534
535         event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
536
537         LOCK_GROUP(g);
538
539         tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);
540         ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);
541
542         if (g->pending_unsuspend_read ||
543             (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
544                 bev_group_unsuspend_reading_(g);
545         }
546         if (g->pending_unsuspend_write ||
547             (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
548                 bev_group_unsuspend_writing_(g);
549         }
550
551         /* XXXX Rather than waiting to the next tick to unsuspend stuff
552          * with pending_unsuspend_write/read, we should do it on the
553          * next iteration of the mainloop.
554          */
555
556         UNLOCK_GROUP(g);
557 }
558
559 int
560 bufferevent_set_rate_limit(struct bufferevent *bev,
561     struct ev_token_bucket_cfg *cfg)
562 {
563         struct bufferevent_private *bevp = BEV_UPCAST(bev);
564         int r = -1;
565         struct bufferevent_rate_limit *rlim;
566         struct timeval now;
567         ev_uint32_t tick;
568         int reinit = 0, suspended = 0;
569         /* XXX reference-count cfg */
570
571         BEV_LOCK(bev);
572
573         if (cfg == NULL) {
574                 if (bevp->rate_limiting) {
575                         rlim = bevp->rate_limiting;
576                         rlim->cfg = NULL;
577                         bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
578                         bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
579                         if (event_initialized(&rlim->refill_bucket_event))
580                                 event_del(&rlim->refill_bucket_event);
581                 }
582                 r = 0;
583                 goto done;
584         }
585
586         event_base_gettimeofday_cached(bev->ev_base, &now);
587         tick = ev_token_bucket_get_tick_(&now, cfg);
588
589         if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
590                 /* no-op */
591                 r = 0;
592                 goto done;
593         }
594         if (bevp->rate_limiting == NULL) {
595                 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
596                 if (!rlim)
597                         goto done;
598                 bevp->rate_limiting = rlim;
599         } else {
600                 rlim = bevp->rate_limiting;
601         }
602         reinit = rlim->cfg != NULL;
603
604         rlim->cfg = cfg;
605         ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);
606
607         if (reinit) {
608                 EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
609                 event_del(&rlim->refill_bucket_event);
610         }
611         event_assign(&rlim->refill_bucket_event, bev->ev_base,
612             -1, EV_FINALIZE, bev_refill_callback_, bevp);
613
614         if (rlim->limit.read_limit > 0) {
615                 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
616         } else {
617                 bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
618                 suspended=1;
619         }
620         if (rlim->limit.write_limit > 0) {
621                 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
622         } else {
623                 bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
624                 suspended = 1;
625         }
626
627         if (suspended)
628                 event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
629
630         r = 0;
631
632 done:
633         BEV_UNLOCK(bev);
634         return r;
635 }
636
637 struct bufferevent_rate_limit_group *
638 bufferevent_rate_limit_group_new(struct event_base *base,
639     const struct ev_token_bucket_cfg *cfg)
640 {
641         struct bufferevent_rate_limit_group *g;
642         struct timeval now;
643         ev_uint32_t tick;
644
645         event_base_gettimeofday_cached(base, &now);
646         tick = ev_token_bucket_get_tick_(&now, cfg);
647
648         g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
649         if (!g)
650                 return NULL;
651         memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
652         LIST_INIT(&g->members);
653
654         ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
655
656         event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
657             bev_group_refill_callback_, g);
658         /*XXXX handle event_add failure */
659         event_add(&g->master_refill_event, &cfg->tick_timeout);
660
661         EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
662
663         bufferevent_rate_limit_group_set_min_share(g, 64);
664
665         evutil_weakrand_seed_(&g->weakrand_seed,
666             (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));
667
668         return g;
669 }
670
671 int
672 bufferevent_rate_limit_group_set_cfg(
673         struct bufferevent_rate_limit_group *g,
674         const struct ev_token_bucket_cfg *cfg)
675 {
676         int same_tick;
677         if (!g || !cfg)
678                 return -1;
679
680         LOCK_GROUP(g);
681         same_tick = evutil_timercmp(
682                 &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
683         memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
684
685         if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
686                 g->rate_limit.read_limit = cfg->read_maximum;
687         if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
688                 g->rate_limit.write_limit = cfg->write_maximum;
689
690         if (!same_tick) {
691                 /* This can cause a hiccup in the schedule */
692                 event_add(&g->master_refill_event, &cfg->tick_timeout);
693         }
694
695         /* The new limits might force us to adjust min_share differently. */
696         bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
697
698         UNLOCK_GROUP(g);
699         return 0;
700 }
701
702 int
703 bufferevent_rate_limit_group_set_min_share(
704         struct bufferevent_rate_limit_group *g,
705         size_t share)
706 {
707         if (share > EV_SSIZE_MAX)
708                 return -1;
709
710         g->configured_min_share = share;
711
712         /* Can't set share to less than the one-tick maximum.  IOW, at steady
713          * state, at least one connection can go per tick. */
714         if (share > g->rate_limit_cfg.read_rate)
715                 share = g->rate_limit_cfg.read_rate;
716         if (share > g->rate_limit_cfg.write_rate)
717                 share = g->rate_limit_cfg.write_rate;
718
719         g->min_share = share;
720         return 0;
721 }
722
723 void
724 bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
725 {
726         LOCK_GROUP(g);
727         EVUTIL_ASSERT(0 == g->n_members);
728         event_del(&g->master_refill_event);
729         UNLOCK_GROUP(g);
730         EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
731         mm_free(g);
732 }
733
734 int
735 bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
736     struct bufferevent_rate_limit_group *g)
737 {
738         int wsuspend, rsuspend;
739         struct bufferevent_private *bevp = BEV_UPCAST(bev);
740         BEV_LOCK(bev);
741
742         if (!bevp->rate_limiting) {
743                 struct bufferevent_rate_limit *rlim;
744                 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
745                 if (!rlim) {
746                         BEV_UNLOCK(bev);
747                         return -1;
748                 }
749                 event_assign(&rlim->refill_bucket_event, bev->ev_base,
750                     -1, EV_FINALIZE, bev_refill_callback_, bevp);
751                 bevp->rate_limiting = rlim;
752         }
753
754         if (bevp->rate_limiting->group == g) {
755                 BEV_UNLOCK(bev);
756                 return 0;
757         }
758         if (bevp->rate_limiting->group)
759                 bufferevent_remove_from_rate_limit_group(bev);
760
761         LOCK_GROUP(g);
762         bevp->rate_limiting->group = g;
763         ++g->n_members;
764         LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);
765
766         rsuspend = g->read_suspended;
767         wsuspend = g->write_suspended;
768
769         UNLOCK_GROUP(g);
770
771         if (rsuspend)
772                 bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);
773         if (wsuspend)
774                 bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);
775
776         BEV_UNLOCK(bev);
777         return 0;
778 }
779
780 int
781 bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
782 {
783         return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);
784 }
785
786 int
787 bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,
788     int unsuspend)
789 {
790         struct bufferevent_private *bevp = BEV_UPCAST(bev);
791         BEV_LOCK(bev);
792         if (bevp->rate_limiting && bevp->rate_limiting->group) {
793                 struct bufferevent_rate_limit_group *g =
794                     bevp->rate_limiting->group;
795                 LOCK_GROUP(g);
796                 bevp->rate_limiting->group = NULL;
797                 --g->n_members;
798                 LIST_REMOVE(bevp, rate_limiting->next_in_group);
799                 UNLOCK_GROUP(g);
800         }
801         if (unsuspend) {
802                 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);
803                 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);
804         }
805         BEV_UNLOCK(bev);
806         return 0;
807 }
808
809 /* ===
810  * API functions to expose rate limits.
811  *
812  * Don't use these from inside Libevent; they're meant to be for use by
813  * the program.
814  * === */
815
816 /* Mostly you don't want to use this function from inside libevent;
817  * bufferevent_get_read_max_() is more likely what you want*/
818 ev_ssize_t
819 bufferevent_get_read_limit(struct bufferevent *bev)
820 {
821         ev_ssize_t r;
822         struct bufferevent_private *bevp;
823         BEV_LOCK(bev);
824         bevp = BEV_UPCAST(bev);
825         if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
826                 bufferevent_update_buckets(bevp);
827                 r = bevp->rate_limiting->limit.read_limit;
828         } else {
829                 r = EV_SSIZE_MAX;
830         }
831         BEV_UNLOCK(bev);
832         return r;
833 }
834
835 /* Mostly you don't want to use this function from inside libevent;
836  * bufferevent_get_write_max_() is more likely what you want*/
837 ev_ssize_t
838 bufferevent_get_write_limit(struct bufferevent *bev)
839 {
840         ev_ssize_t r;
841         struct bufferevent_private *bevp;
842         BEV_LOCK(bev);
843         bevp = BEV_UPCAST(bev);
844         if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
845                 bufferevent_update_buckets(bevp);
846                 r = bevp->rate_limiting->limit.write_limit;
847         } else {
848                 r = EV_SSIZE_MAX;
849         }
850         BEV_UNLOCK(bev);
851         return r;
852 }
853
854 int
855 bufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
856 {
857         struct bufferevent_private *bevp;
858         int ret = 0;
859         BEV_LOCK(bev);
860         bevp = BEV_UPCAST(bev);
861         if (size == 0 || size > EV_SSIZE_MAX)
862                 bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
863         else
864                 bevp->max_single_read = size;
865         ret = evbuffer_set_max_read(bev->input, bevp->max_single_read);
866         BEV_UNLOCK(bev);
867         return ret;
868 }
869
870 int
871 bufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
872 {
873         struct bufferevent_private *bevp;
874         BEV_LOCK(bev);
875         bevp = BEV_UPCAST(bev);
876         if (size == 0 || size > EV_SSIZE_MAX)
877                 bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
878         else
879                 bevp->max_single_write = size;
880         BEV_UNLOCK(bev);
881         return 0;
882 }
883
884 ev_ssize_t
885 bufferevent_get_max_single_read(struct bufferevent *bev)
886 {
887         ev_ssize_t r;
888
889         BEV_LOCK(bev);
890         r = BEV_UPCAST(bev)->max_single_read;
891         BEV_UNLOCK(bev);
892         return r;
893 }
894
895 ev_ssize_t
896 bufferevent_get_max_single_write(struct bufferevent *bev)
897 {
898         ev_ssize_t r;
899
900         BEV_LOCK(bev);
901         r = BEV_UPCAST(bev)->max_single_write;
902         BEV_UNLOCK(bev);
903         return r;
904 }
905
906 ev_ssize_t
907 bufferevent_get_max_to_read(struct bufferevent *bev)
908 {
909         ev_ssize_t r;
910         BEV_LOCK(bev);
911         r = bufferevent_get_read_max_(BEV_UPCAST(bev));
912         BEV_UNLOCK(bev);
913         return r;
914 }
915
916 ev_ssize_t
917 bufferevent_get_max_to_write(struct bufferevent *bev)
918 {
919         ev_ssize_t r;
920         BEV_LOCK(bev);
921         r = bufferevent_get_write_max_(BEV_UPCAST(bev));
922         BEV_UNLOCK(bev);
923         return r;
924 }
925
926 const struct ev_token_bucket_cfg *
927 bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) {
928         struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
929         struct ev_token_bucket_cfg *cfg;
930
931         BEV_LOCK(bev);
932
933         if (bufev_private->rate_limiting) {
934                 cfg = bufev_private->rate_limiting->cfg;
935         } else {
936                 cfg = NULL;
937         }
938
939         BEV_UNLOCK(bev);
940
941         return cfg;
942 }
943
944 /* Mostly you don't want to use this function from inside libevent;
945  * bufferevent_get_read_max_() is more likely what you want*/
946 ev_ssize_t
947 bufferevent_rate_limit_group_get_read_limit(
948         struct bufferevent_rate_limit_group *grp)
949 {
950         ev_ssize_t r;
951         LOCK_GROUP(grp);
952         r = grp->rate_limit.read_limit;
953         UNLOCK_GROUP(grp);
954         return r;
955 }
956
957 /* Mostly you don't want to use this function from inside libevent;
958  * bufferevent_get_write_max_() is more likely what you want. */
959 ev_ssize_t
960 bufferevent_rate_limit_group_get_write_limit(
961         struct bufferevent_rate_limit_group *grp)
962 {
963         ev_ssize_t r;
964         LOCK_GROUP(grp);
965         r = grp->rate_limit.write_limit;
966         UNLOCK_GROUP(grp);
967         return r;
968 }
969
970 int
971 bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
972 {
973         int r = 0;
974         ev_ssize_t old_limit, new_limit;
975         struct bufferevent_private *bevp;
976         BEV_LOCK(bev);
977         bevp = BEV_UPCAST(bev);
978         EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
979         old_limit = bevp->rate_limiting->limit.read_limit;
980
981         new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
982         if (old_limit > 0 && new_limit <= 0) {
983                 bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
984                 if (event_add(&bevp->rate_limiting->refill_bucket_event,
985                         &bevp->rate_limiting->cfg->tick_timeout) < 0)
986                         r = -1;
987         } else if (old_limit <= 0 && new_limit > 0) {
988                 if (!(bevp->write_suspended & BEV_SUSPEND_BW))
989                         event_del(&bevp->rate_limiting->refill_bucket_event);
990                 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
991         }
992
993         BEV_UNLOCK(bev);
994         return r;
995 }
996
997 int
998 bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
999 {
1000         /* XXXX this is mostly copy-and-paste from
1001          * bufferevent_decrement_read_limit */
1002         int r = 0;
1003         ev_ssize_t old_limit, new_limit;
1004         struct bufferevent_private *bevp;
1005         BEV_LOCK(bev);
1006         bevp = BEV_UPCAST(bev);
1007         EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
1008         old_limit = bevp->rate_limiting->limit.write_limit;
1009
1010         new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
1011         if (old_limit > 0 && new_limit <= 0) {
1012                 bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
1013                 if (event_add(&bevp->rate_limiting->refill_bucket_event,
1014                         &bevp->rate_limiting->cfg->tick_timeout) < 0)
1015                         r = -1;
1016         } else if (old_limit <= 0 && new_limit > 0) {
1017                 if (!(bevp->read_suspended & BEV_SUSPEND_BW))
1018                         event_del(&bevp->rate_limiting->refill_bucket_event);
1019                 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
1020         }
1021
1022         BEV_UNLOCK(bev);
1023         return r;
1024 }
1025
1026 int
1027 bufferevent_rate_limit_group_decrement_read(
1028         struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1029 {
1030         int r = 0;
1031         ev_ssize_t old_limit, new_limit;
1032         LOCK_GROUP(grp);
1033         old_limit = grp->rate_limit.read_limit;
1034         new_limit = (grp->rate_limit.read_limit -= decr);
1035
1036         if (old_limit > 0 && new_limit <= 0) {
1037                 bev_group_suspend_reading_(grp);
1038         } else if (old_limit <= 0 && new_limit > 0) {
1039                 bev_group_unsuspend_reading_(grp);
1040         }
1041
1042         UNLOCK_GROUP(grp);
1043         return r;
1044 }
1045
1046 int
1047 bufferevent_rate_limit_group_decrement_write(
1048         struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1049 {
1050         int r = 0;
1051         ev_ssize_t old_limit, new_limit;
1052         LOCK_GROUP(grp);
1053         old_limit = grp->rate_limit.write_limit;
1054         new_limit = (grp->rate_limit.write_limit -= decr);
1055
1056         if (old_limit > 0 && new_limit <= 0) {
1057                 bev_group_suspend_writing_(grp);
1058         } else if (old_limit <= 0 && new_limit > 0) {
1059                 bev_group_unsuspend_writing_(grp);
1060         }
1061
1062         UNLOCK_GROUP(grp);
1063         return r;
1064 }
1065
1066 void
1067 bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
1068     ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
1069 {
1070         EVUTIL_ASSERT(grp != NULL);
1071         if (total_read_out)
1072                 *total_read_out = grp->total_read;
1073         if (total_written_out)
1074                 *total_written_out = grp->total_written;
1075 }
1076
1077 void
1078 bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
1079 {
1080         grp->total_read = grp->total_written = 0;
1081 }
1082
1083 int
1084 bufferevent_ratelim_init_(struct bufferevent_private *bev)
1085 {
1086         bev->rate_limiting = NULL;
1087         bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
1088         bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
1089
1090         if (evbuffer_set_max_read(bev->bev.input, bev->max_single_read))
1091                 return -1;
1092
1093         return 0;
1094 }