typedef enum rl_state_e
{
- RATE_ERROR,
RATE_LIMIT,
RATE_FULLSPEED
} rl_state_e;
int speed;
int chunk_size;
int burst;
+ int do_sleep;
rl_state_e state;
apr_bucket_brigade *tmpbb;
apr_bucket_brigade *holdingbb;
#endif /* RLFDEBUG */
static apr_status_t
-rate_limit_filter(ap_filter_t *f, apr_bucket_brigade *input_bb)
+rate_limit_filter(ap_filter_t *f, apr_bucket_brigade *bb)
{
apr_status_t rv = APR_SUCCESS;
rl_ctx_t *ctx = f->ctx;
- apr_bucket *fb;
- int do_sleep = 0;
apr_bucket_alloc_t *ba = f->r->connection->bucket_alloc;
- apr_bucket_brigade *bb = input_bb;
-
- if (f->c->aborted) {
- ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, f->r, APLOGNO(01454) "rl: conn aborted");
- apr_brigade_cleanup(bb);
- return APR_ECONNABORTED;
- }
/* Set up our rl_ctx_t on first use */
if (ctx == NULL) {
ctx->state = RATE_LIMIT;
ctx->speed = ratelimit;
ctx->burst = burst;
+ ctx->do_sleep = 0;
/* calculate how many bytes / interval we want to send */
/* speed is bytes / second, so, how many (speed / 1000 % interval) */
ctx->tmpbb = apr_brigade_create(f->r->pool, ba);
ctx->holdingbb = apr_brigade_create(f->r->pool, ba);
}
+ else {
+ APR_BRIGADE_PREPEND(bb, ctx->holdingbb);
+ }
- while (ctx->state != RATE_ERROR &&
- (!APR_BRIGADE_EMPTY(bb) || !APR_BRIGADE_EMPTY(ctx->holdingbb))) {
+ while (!APR_BRIGADE_EMPTY(bb)) {
apr_bucket *e;
- if (!APR_BRIGADE_EMPTY(ctx->holdingbb)) {
- APR_BRIGADE_CONCAT(bb, ctx->holdingbb);
- }
-
- while (ctx->state == RATE_FULLSPEED && !APR_BRIGADE_EMPTY(bb)) {
+ if (ctx->state == RATE_FULLSPEED) {
/* Find where we 'stop' going full speed. */
for (e = APR_BRIGADE_FIRST(bb);
e != APR_BRIGADE_SENTINEL(bb); e = APR_BUCKET_NEXT(e)) {
if (AP_RL_BUCKET_IS_END(e)) {
- apr_bucket *f;
- f = APR_RING_LAST(&bb->list);
- APR_RING_UNSPLICE(e, f, link);
- APR_RING_SPLICE_TAIL(&ctx->holdingbb->list, e, f,
- apr_bucket, link);
+ apr_brigade_split_ex(bb, e, ctx->holdingbb);
ctx->state = RATE_LIMIT;
break;
}
}
- if (f->c->aborted) {
- apr_brigade_cleanup(bb);
- ctx->state = RATE_ERROR;
- break;
- }
-
- fb = apr_bucket_flush_create(ba);
- APR_BRIGADE_INSERT_TAIL(bb, fb);
+ e = apr_bucket_flush_create(ba);
+ APR_BRIGADE_INSERT_TAIL(bb, e);
rv = ap_pass_brigade(f->next, bb);
+ apr_brigade_cleanup(bb);
if (rv != APR_SUCCESS) {
- ctx->state = RATE_ERROR;
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, rv, f->r, APLOGNO(01455)
"rl: full speed brigade pass failed.");
+ return rv;
}
}
-
- while (ctx->state == RATE_LIMIT && !APR_BRIGADE_EMPTY(bb)) {
+ else {
for (e = APR_BRIGADE_FIRST(bb);
e != APR_BRIGADE_SENTINEL(bb); e = APR_BUCKET_NEXT(e)) {
if (AP_RL_BUCKET_IS_START(e)) {
- apr_bucket *f;
- f = APR_RING_LAST(&bb->list);
- APR_RING_UNSPLICE(e, f, link);
- APR_RING_SPLICE_TAIL(&ctx->holdingbb->list, e, f,
- apr_bucket, link);
+ apr_brigade_split_ex(bb, e, ctx->holdingbb);
ctx->state = RATE_FULLSPEED;
break;
}
}
while (!APR_BRIGADE_EMPTY(bb)) {
- apr_bucket *stop_point;
- apr_off_t len = 0;
+ apr_off_t len = ctx->chunk_size + ctx->burst;
- if (f->c->aborted) {
- apr_brigade_cleanup(bb);
- ctx->state = RATE_ERROR;
- break;
- }
-
- if (do_sleep) {
- apr_sleep(RATE_INTERVAL_MS * 1000);
- }
- else {
- do_sleep = 1;
- }
-
- apr_brigade_length(bb, 1, &len);
+ APR_BRIGADE_CONCAT(ctx->tmpbb, bb);
/*
* Pull next chunk of data; the initial amount is our
* burst amounts we have left (in case not done in the
* first bucket).
*/
- rv = apr_brigade_partition(bb,
- ctx->chunk_size + ctx->burst, &stop_point);
+ rv = apr_brigade_partition(ctx->tmpbb, len, &e);
if (rv != APR_SUCCESS && rv != APR_INCOMPLETE) {
- ctx->state = RATE_ERROR;
ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, f->r, APLOGNO(01456)
"rl: partition failed.");
- break;
+ return rv;
}
-
- if (stop_point != APR_BRIGADE_SENTINEL(bb)) {
- apr_bucket *f;
- apr_bucket *e = APR_BUCKET_PREV(stop_point);
- f = APR_RING_FIRST(&bb->list);
- APR_RING_UNSPLICE(f, e, link);
- APR_RING_SPLICE_HEAD(&ctx->tmpbb->list, f, e, apr_bucket,
- link);
+ /* Send next metadata now if any */
+ while (e != APR_BRIGADE_SENTINEL(ctx->tmpbb)
+ && APR_BUCKET_IS_METADATA(e)) {
+ e = APR_BUCKET_NEXT(e);
+ }
+ if (e != APR_BRIGADE_SENTINEL(ctx->tmpbb)) {
+ apr_brigade_split_ex(ctx->tmpbb, e, bb);
}
else {
- APR_BRIGADE_CONCAT(ctx->tmpbb, bb);
+ apr_brigade_length(ctx->tmpbb, 1, &len);
}
- fb = apr_bucket_flush_create(ba);
-
- APR_BRIGADE_INSERT_TAIL(ctx->tmpbb, fb);
-
/*
* Adjust the burst amount depending on how much
* we've done up to now.
*/
if (ctx->burst) {
- len = ctx->burst;
- apr_brigade_length(ctx->tmpbb, 1, &len);
ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, f->r,
APLOGNO(03485) "rl: burst %d; len %"APR_OFF_T_FMT, ctx->burst, len);
if (len < ctx->burst) {
}
}
+ e = APR_BRIGADE_LAST(ctx->tmpbb);
+ if (APR_BUCKET_IS_EOS(e)) {
+ ap_remove_output_filter(f);
+ }
+ else if (!APR_BUCKET_IS_FLUSH(e)) {
+ if (APR_BRIGADE_EMPTY(bb)) {
+ /* Wait for more (or next call) */
+ break;
+ }
+ e = apr_bucket_flush_create(ba);
+ APR_BRIGADE_INSERT_TAIL(ctx->tmpbb, e);
+ }
+
#if defined(RLFDEBUG)
brigade_dump(f->r, ctx->tmpbb);
brigade_dump(f->r, bb);
#endif /* RLFDEBUG */
+ if (ctx->do_sleep) {
+ apr_sleep(RATE_INTERVAL_MS * 1000);
+ }
+ else {
+ ctx->do_sleep = 1;
+ }
+
rv = ap_pass_brigade(f->next, ctx->tmpbb);
apr_brigade_cleanup(ctx->tmpbb);
if (rv != APR_SUCCESS) {
/* Most often, user disconnects from stream */
- ctx->state = RATE_ERROR;
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, rv, f->r, APLOGNO(01457)
"rl: brigade pass failed.");
- break;
+ return rv;
}
}
}
+
+ if (!APR_BRIGADE_EMPTY(ctx->holdingbb)) {
+ /* Any rate-limited data in tmpbb is sent unlimited along
+ * with the rest.
+ */
+ APR_BRIGADE_CONCAT(bb, ctx->tmpbb);
+ APR_BRIGADE_CONCAT(bb, ctx->holdingbb);
+ }
}
- return rv;
+#if defined(RLFDEBUG)
+ brigade_dump(f->r, ctx->tmpbb);
+#endif /* RLFDEBUG */
+
+ /* Save remaining tmpbb with the correct lifetime for the next call */
+ return ap_save_brigade(f, &ctx->holdingbb, &ctx->tmpbb, f->r->pool);
}