h2_mplx *m;
h2_stream *stream;
apr_time_t now;
+ apr_size_t count;
} stream_iter_ctx;
+static apr_status_t mplx_be_happy(h2_mplx *m);
+static apr_status_t mplx_be_annoyed(h2_mplx *m);
+
apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s)
{
return APR_SUCCESS;
static void stream_joined(h2_mplx *m, h2_stream *stream)
{
- ap_assert(!stream->task || stream->task->worker_done);
+ ap_assert(!h2_task_has_started(stream->task) || stream->task->worker_done);
h2_ihash_remove(m->shold, stream->id);
h2_ihash_add(m->spurge, stream);
h2_ififo_remove(m->readyq, stream->id);
h2_ihash_add(m->shold, stream);
- if (!stream->task || stream->task->worker_done) {
+ if (!h2_task_has_started(stream->task) || stream->task->done_done) {
stream_joined(m, stream);
}
else if (stream->task) {
m->stream_max_mem = h2_config_sgeti(s, H2_CONF_STREAM_MAX_MEM);
m->streams = h2_ihash_create(m->pool, offsetof(h2_stream,id));
- m->sredo = h2_ihash_create(m->pool, offsetof(h2_stream,id));
m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id));
m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id));
m->q = h2_iq_create(m->pool, m->max_streams);
m->workers = workers;
m->max_active = workers->max_workers;
m->limit_active = 6; /* the original h1 max parallel connections */
- m->last_limit_change = m->last_idle_block = apr_time_now();
- m->limit_change_interval = apr_time_from_msec(100);
+ m->last_mood_change = apr_time_now();
+ m->mood_update_interval = apr_time_from_msec(100);
m->spare_slaves = apr_array_make(m->pool, 10, sizeof(conn_rec*));
}
/* How to shut down a h2 connection:
* 1. cancel all streams still active */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): release, %d/%d/%d streams (total/hold/purge), %d active tasks",
+ m->id, (int)h2_ihash_count(m->streams),
+ (int)h2_ihash_count(m->shold), (int)h2_ihash_count(m->spurge), m->tasks_active);
while (!h2_ihash_iter(m->streams, stream_cancel_iter, m)) {
/* until empty */
}
h2_ihash_iter(m->shold, report_stream_iter, m);
}
}
- ap_assert(m->tasks_active == 0);
m->join_wait = NULL;
-
+
/* 4. With all workers done, all streams should be in spurge */
+ ap_assert(m->tasks_active == 0);
if (!h2_ihash_empty(m->shold)) {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03516)
"h2_mplx(%ld): unexpected %d streams in hold",
m->c->aborted = old_aborted;
H2_MPLX_LEAVE(m);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): released", m->id);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): released", m->id);
}
apr_status_t h2_mplx_stream_cleanup(h2_mplx *m, h2_stream *stream)
}
if (!stream->task) {
-
if (sid > m->max_stream_started) {
m->max_stream_started = sid;
}
"create task"));
return NULL;
}
-
}
+ stream->task->started_at = apr_time_now();
++m->tasks_active;
return stream->task;
}
"h2_mplx(%s): request done, %f ms elapsed", task->id,
(task->done_at - task->started_at) / 1000.0);
- if (task->started_at > m->last_idle_block) {
- /* this task finished without causing an 'idle block', e.g.
- * a block by flow control.
- */
- if (task->done_at- m->last_limit_change >= m->limit_change_interval
- && m->limit_active < m->max_active) {
- /* Well behaving stream, allow it more workers */
- m->limit_active = H2MIN(m->limit_active * 2,
- m->max_active);
- m->last_limit_change = task->done_at;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): increase worker limit to %d",
- m->id, m->limit_active);
- }
+ if (task->c && !task->c->aborted && task->started_at > m->last_mood_change) {
+ mplx_be_happy(m);
}
-
+
ap_assert(task->done_done == 0);
stream = h2_ihash_get(m->streams, task->stream_id);
if (stream) {
/* stream not done yet. */
- if (!m->aborted && h2_ihash_get(m->sredo, stream->id)) {
+ if (!m->aborted && task->redo) {
/* reset and schedule again */
- task->worker_done = 0;
h2_task_redo(task);
- h2_ihash_remove(m->sredo, stream->id);
h2_iq_add(m->q, stream->id, NULL, NULL);
ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, m->c,
H2_STRM_MSG(stream, "redo, added to q"));
{
H2_MPLX_ENTER_ALWAYS(m);
- task_done(m, task);
--m->tasks_active;
+ task_done(m, task);
if (m->join_wait) {
apr_thread_cond_signal(m->join_wait);
* h2_mplx DoS protection
******************************************************************************/
-static int latest_repeatable_unsubmitted_iter(void *data, void *val)
+static int timed_out_busy_iter(void *data, void *val)
{
stream_iter_ctx *ctx = data;
h2_stream *stream = val;
-
- if (stream->task && !stream->task->worker_done
- && h2_task_can_redo(stream->task)
- && !h2_ihash_get(ctx->m->sredo, stream->id)) {
- if (!h2_stream_is_ready(stream)) {
- /* this task occupies a worker, the response has not been submitted
- * yet, not been cancelled and it is a repeatable request
- * -> it can be re-scheduled later */
- if (!ctx->stream
- || (ctx->stream->task->started_at < stream->task->started_at)) {
- /* we did not have one or this one was started later */
- ctx->stream = stream;
- }
- }
+ if (h2_task_has_started(stream->task) && !stream->task->worker_done
+ && (ctx->now - stream->task->started_at) > stream->task->timeout) {
+ /* timed out stream occupying a worker, found */
+ ctx->stream = stream;
+ return 0;
}
return 1;
}
-static h2_stream *get_latest_repeatable_unsubmitted_stream(h2_mplx *m)
+static h2_stream *get_timed_out_busy_stream(h2_mplx *m)
{
stream_iter_ctx ctx;
ctx.m = m;
ctx.stream = NULL;
- h2_ihash_iter(m->streams, latest_repeatable_unsubmitted_iter, &ctx);
+ ctx.now = apr_time_now();
+ h2_ihash_iter(m->streams, timed_out_busy_iter, &ctx);
return ctx.stream;
}
-static int timed_out_busy_iter(void *data, void *val)
+static int latest_repeatable_unsubmitted_iter(void *data, void *val)
{
stream_iter_ctx *ctx = data;
h2_stream *stream = val;
- if (stream->task && !stream->task->worker_done
- && (ctx->now - stream->task->started_at) > stream->task->timeout) {
- /* timed out stream occupying a worker, found */
- ctx->stream = stream;
- return 0;
+
+ if (!stream->task) goto leave;
+ if (!h2_task_has_started(stream->task) || stream->task->worker_done) goto leave;
+ if (h2_stream_is_ready(stream)) goto leave;
+ if (stream->task->redo) {
+ ++ctx->count;
+ goto leave;
+ }
+ if (h2_task_can_redo(stream->task)) {
+ /* this task occupies a worker, the response has not been submitted
+ * yet, not been cancelled and it is a repeatable request
+ * -> we could redo it later */
+ if (!ctx->stream
+ || (ctx->stream->task->started_at < stream->task->started_at)) {
+ /* we did not have one or this one was started later */
+ ctx->stream = stream;
+ }
}
+leave:
return 1;
}
-static h2_stream *get_timed_out_busy_stream(h2_mplx *m)
+static apr_status_t assess_task_to_throttle(h2_task **ptask, h2_mplx *m)
{
stream_iter_ctx ctx;
+
+ /* count the running tasks already marked for redo and get one that could
+ * be throttled */
+ *ptask = NULL;
ctx.m = m;
ctx.stream = NULL;
- ctx.now = apr_time_now();
- h2_ihash_iter(m->streams, timed_out_busy_iter, &ctx);
- return ctx.stream;
+ ctx.count = 0;
+ h2_ihash_iter(m->streams, latest_repeatable_unsubmitted_iter, &ctx);
+ if (m->tasks_active - ctx.count > m->limit_active) {
+ /* we are above the limit of running tasks, accounting for the ones
+ * already throttled. */
+ if (ctx.stream && ctx.stream->task) {
+ *ptask = ctx.stream->task;
+ return APR_EAGAIN;
+ }
+ /* above limit, be seeing no candidate for easy throttling */
+ if (get_timed_out_busy_stream(m)) {
+ /* Too many busy workers, unable to cancel enough streams
+ * and with a busy, timed out stream, we tell the client
+ * to go away... */
+ return APR_TIMEUP;
+ }
+ }
+ return APR_SUCCESS;
}
static apr_status_t unschedule_slow_tasks(h2_mplx *m)
{
- h2_stream *stream;
- int n;
+ h2_task *task;
+ apr_status_t rv;
/* Try to get rid of streams that occupy workers. Look for safe requests
* that are repeatable. If none found, fail the connection.
*/
- n = (m->tasks_active - m->limit_active - (int)h2_ihash_count(m->sredo));
- while (n > 0 && (stream = get_latest_repeatable_unsubmitted_stream(m))) {
+ while (APR_EAGAIN == (rv = assess_task_to_throttle(&task, m))) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_mplx(%s): unschedule, resetting task for redo later",
- stream->task->id);
- h2_task_rst(stream->task, H2_ERR_CANCEL);
- h2_ihash_add(m->sredo, stream);
- --n;
+ task->id);
+ task->redo = 1;
+ h2_task_rst(task, H2_ERR_CANCEL);
}
-
- if ((m->tasks_active - h2_ihash_count(m->sredo)) > m->limit_active) {
- stream = get_timed_out_busy_stream(m);
- if (stream) {
- /* Too many busy workers, unable to cancel enough streams
- * and with a busy, timed out stream, we tell the client
- * to go away... */
- return APR_TIMEUP;
- }
+ return rv;
+}
+
+static apr_status_t mplx_be_happy(h2_mplx *m)
+{
+ apr_time_t now;
+
+ --m->irritations_since;
+ now = apr_time_now();
+ if (m->limit_active < m->max_active
+ && (now - m->last_mood_change >= m->mood_update_interval
+ || m->irritations_since < -m->limit_active)) {
+ m->limit_active = H2MIN(m->limit_active * 2, m->max_active);
+ m->last_mood_change = now;
+ m->irritations_since = 0;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): mood update, increasing worker limit to %d",
+ m->id, m->limit_active);
}
return APR_SUCCESS;
}
-apr_status_t h2_mplx_idle(h2_mplx *m)
+static apr_status_t mplx_be_annoyed(h2_mplx *m)
{
apr_status_t status = APR_SUCCESS;
apr_time_t now;
+
+ ++m->irritations_since;
+ now = apr_time_now();
+ if (m->limit_active > 2 &&
+ ((now - m->last_mood_change >= m->mood_update_interval)
+ || (m->irritations_since >= m->limit_active))) {
+
+ if (m->limit_active > 16) {
+ m->limit_active = 16;
+ }
+ else if (m->limit_active > 8) {
+ m->limit_active = 8;
+ }
+ else if (m->limit_active > 4) {
+ m->limit_active = 4;
+ }
+ else if (m->limit_active > 2) {
+ m->limit_active = 2;
+ }
+ m->last_mood_change = now;
+ m->irritations_since = 0;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): mood update, decreasing worker limit to %d",
+ m->id, m->limit_active);
+ }
+
+ if (m->tasks_active > m->limit_active) {
+ status = unschedule_slow_tasks(m);
+ }
+ return status;
+}
+
+apr_status_t h2_mplx_idle(h2_mplx *m)
+{
+ apr_status_t status = APR_SUCCESS;
apr_size_t scount;
H2_MPLX_ENTER(m);
* of busy workers we allow for this connection until it
* well behaves.
*/
- now = apr_time_now();
- m->last_idle_block = now;
- if (m->limit_active > 2
- && now - m->last_limit_change >= m->limit_change_interval) {
- if (m->limit_active > 16) {
- m->limit_active = 16;
- }
- else if (m->limit_active > 8) {
- m->limit_active = 8;
- }
- else if (m->limit_active > 4) {
- m->limit_active = 4;
- }
- else if (m->limit_active > 2) {
- m->limit_active = 2;
- }
- m->last_limit_change = now;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): decrease worker limit to %d",
- m->id, m->limit_active);
- }
-
- if (m->tasks_active > m->limit_active) {
- status = unschedule_slow_tasks(m);
- }
+ status = mplx_be_annoyed(m);
}
else if (!h2_iq_empty(m->q)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
if (h2_ihash_empty(m->streams)) {
waiting = 0;
}
- else if (!m->tasks_active && !h2_ififo_count(m->readyq)
- && h2_iq_empty(m->q)) {
+ else if (!m->tasks_active && !h2_ififo_count(m->readyq) && h2_iq_empty(m->q)) {
waiting = 0;
}
H2_MPLX_LEAVE(m);
return waiting;
}
+
+apr_status_t h2_mplx_client_rst(h2_mplx *m, int stream_id)
+{
+ h2_stream *stream;
+ apr_status_t status = APR_SUCCESS;
+
+ H2_MPLX_ENTER_ALWAYS(m);
+ stream = h2_ihash_get(m->streams, stream_id);
+ if (stream && stream->task) {
+ status = mplx_be_annoyed(m);
+ }
+ H2_MPLX_LEAVE(m);
+ return status;
+}