struct bufferevent_rate_limit *rlim;
struct timeval now;
ev_uint32_t tick;
+ int reinit = 0, suspended = 0;
/* XXX reference-count cfg */
BEV_LOCK(bev);
if (cfg == NULL) {
if (bevp->rate_limiting) {
- bevp->rate_limiting->cfg = NULL;
+ rlim = bevp->rate_limiting;
+ rlim->cfg = NULL;
bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
+ if (event_initialized(&rlim->refill_bucket_event))
+ event_del(&rlim->refill_bucket_event);
}
r = 0;
goto done;
tick = ev_token_bucket_get_tick(&now, cfg);
if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
- ;
- } else if (bevp->rate_limiting) {
- bevp->rate_limiting->cfg = cfg;
- ev_token_bucket_init(&bevp->rate_limiting->limit, cfg, tick, 1);
- if (bevp->rate_limiting->limit.read_limit > 0)
- bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
- else
- bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
- if (bevp->rate_limiting->limit.write_limit > 0)
- bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
- else
- bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
- } else {
+ /* no-op */
+ r = 0;
+ goto done;
+ }
+ if (bevp->rate_limiting == NULL) {
rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
if (!rlim)
goto done;
- rlim->cfg = cfg;
- ev_token_bucket_init(&rlim->limit, cfg, tick, 0);
- evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
- _bev_refill_callback, bevp);
bevp->rate_limiting = rlim;
+ } else {
+ rlim = bevp->rate_limiting;
+ }
+ reinit = rlim->cfg != NULL;
+
+ rlim->cfg = cfg;
+ ev_token_bucket_init(&rlim->limit, cfg, tick, reinit);
+
+ if (reinit) {
+ EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
+ event_del(&rlim->refill_bucket_event);
}
+ evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
+ _bev_refill_callback, bevp);
+
+ if (rlim->limit.read_limit > 0) {
+ bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
+ } else {
+ bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
+ suspended=1;
+ }
+ if (rlim->limit.write_limit > 0) {
+ bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
+ } else {
+ bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
+ suspended = 1;
+ }
+
+ if (suspended)
+ event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
+
r = 0;
+
done:
BEV_UNLOCK(bev);
return r;
&bevp->rate_limiting->cfg->tick_timeout) < 0)
r = -1;
} else if (old_limit <= 0 && new_limit > 0) {
- event_del(&bevp->rate_limiting->refill_bucket_event);
+ if (!(bevp->write_suspended & BEV_SUSPEND_BW))
+ event_del(&bevp->rate_limiting->refill_bucket_event);
bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
}
&bevp->rate_limiting->cfg->tick_timeout) < 0)
r = -1;
} else if (old_limit <= 0 && new_limit > 0) {
- event_del(&bevp->rate_limiting->refill_bucket_event);
+ if (!(bevp->read_suspended & BEV_SUSPEND_BW))
+ event_del(&bevp->rate_limiting->refill_bucket_event);
bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
}