m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
m->stream_timeout = stream_timeout;
m->workers = workers;
- m->workers_max = 6;
+ m->workers_max = h2_config_geti(conf, H2_CONF_MAX_WORKERS);
+ m->workers_def_limit = 4;
+ m->workers_limit = m->workers_def_limit;
+ m->last_limit_change = m->last_idle_block = apr_time_now();
+ m->limit_change_interval = apr_time_from_msec(200);
m->tx_handles_reserved = 0;
m->tx_chunk_size = 4;
h2_io_set_remove(m->stream_ios, io);
h2_io_set_remove(m->ready_ios, io);
+ if (m->redo_ios) {
+ h2_io_set_remove(m->redo_ios, io);
+ }
if (pool) {
apr_pool_clear(pool);
{
/* Remove io from ready set, we will never submit it */
h2_io_set_remove(m->ready_ios, io);
- if (!io->processing_started || io->processing_done) {
+ if (!io->worker_started || io->worker_done) {
/* already finished or not even started yet */
h2_iq_remove(m->q, io->id);
io_destroy(m, io, 1);
io->request->method, io->request->authority, io->request->path,
io->response? "http" : (io->rst_error? "reset" : "?"),
io->response? io->response->http_status : io->rst_error,
- io->orphaned, io->processing_started, io->processing_done,
+ io->orphaned, io->worker_started, io->worker_done,
io->eos_in, io->eos_out);
}
else if (io) {
m->id, io->id,
io->response? "http" : (io->rst_error? "reset" : "?"),
io->response? io->response->http_status : io->rst_error,
- io->orphaned, io->processing_started, io->processing_done,
+ io->orphaned, io->worker_started, io->worker_done,
io->eos_in, io->eos_out);
}
else {
if (io && !m->aborted) {
stream = h2_ihash_get(streams, io->id);
if (stream) {
+ io->submitted = 1;
if (io->rst_error) {
h2_stream_rst(stream, io->rst_error);
}
"resetting io to close request processing",
m->id, io->id);
h2_io_make_orphaned(io, H2_ERR_STREAM_CLOSED);
- if (!io->processing_started || io->processing_done) {
+ if (!io->worker_started || io->worker_done) {
io_destroy(m, io, 1);
}
else {
return status;
}
-static h2_io *open_io(h2_mplx *m, int stream_id)
+static h2_io *open_io(h2_mplx *m, int stream_id, const h2_request *request)
{
apr_pool_t *io_pool = m->spare_pool;
h2_io *io;
m->spare_pool = NULL;
}
- io = h2_io_create(stream_id, io_pool);
+ io = h2_io_create(stream_id, io_pool, request);
h2_io_set_add(m->stream_ios, io);
return io;
status = APR_ECONNABORTED;
}
else {
- h2_io *io = open_io(m, stream_id);
- io->request = req;
+ h2_io *io = open_io(m, stream_id, req);
if (!io->request->body) {
status = h2_io_in_close(io);
h2_task *task = NULL;
int sid;
while (!m->aborted && !task
- && (m->workers_busy < m->workers_max)
+ && (m->workers_busy < m->workers_limit)
&& (sid = h2_iq_shift(m->q)) > 0) {
h2_io *io = h2_io_set_get(m->stream_ios, sid);
- if (io) {
+ if (io && io->orphaned) {
+ io_destroy(m, io, 0);
+ if (m->join_wait) {
+ apr_thread_cond_signal(m->join_wait);
+ }
+ }
+ else if (io) {
conn_rec *slave = h2_slave_create(m->c, m->pool, m->spare_allocator);
m->spare_allocator = NULL;
task = h2_task_create(m->id, io->request, slave, m);
-
- io->processing_started = 1;
+ io->worker_started = 1;
+ io->started_at = apr_time_now();
if (sid > m->max_stream_started) {
m->max_stream_started = sid;
}
}
else {
h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
+
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_mplx(%ld): task(%s) done", m->id, task->id);
/* clean our references and report request as done. Signal
h2_slave_destroy(task->c, &m->spare_allocator);
task = NULL;
if (io) {
- io->processing_done = 1;
+ apr_time_t now = apr_time_now();
+ if (!io->orphaned && m->redo_ios
+ && h2_io_set_get(m->redo_ios, io->id)) {
+ /* reset and schedule again */
+ h2_io_redo(io);
+ h2_io_set_remove(m->redo_ios, io);
+ h2_iq_add(m->q, io->id, NULL, NULL);
+ }
+ else {
+ io->worker_done = 1;
+ io->done_at = now;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): request(%d) done, %f ms"
+ " elapsed", m->id, io->id,
+ (io->done_at - io->started_at) / 1000.0);
+ if (io->started_at > m->last_idle_block) {
+ /* this task finished without causing an 'idle block', e.g.
+ * a block by flow control.
+ */
+ if (now - m->last_limit_change >= m->limit_change_interval
+ && m->workers_limit < m->workers_max) {
+ /* Well behaving stream, allow it more workers */
+ m->workers_limit = H2MIN(m->workers_limit * 2,
+ m->workers_max);
+ m->last_limit_change = now;
+ m->need_registration = 1;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): increase worker limit to %d",
+ m->id, m->workers_limit);
+ }
+ }
+ }
+
if (io->orphaned) {
io_destroy(m, io, 0);
if (m->join_wait) {
apr_thread_cond_broadcast(m->task_done);
}
}
-
}
void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask)
{
- int acquired, do_registration = 0;
+ int acquired;
if (enter_mutex(m, &acquired) == APR_SUCCESS) {
task_done(m, task);
/* caller wants another task */
*ptask = pop_task(m);
}
- do_registration = (m->workers_busy+1 == m->workers_max);
leave_mutex(m, acquired);
}
- if (do_registration) {
- workers_register(m);
+}
+
+/*******************************************************************************
+ * h2_mplx DoS protection
+ ******************************************************************************/
+
+typedef struct {
+ h2_mplx *m;
+ h2_io *io;
+ apr_time_t now;
+} io_iter_ctx;
+
+static int latest_repeatable_busy_unsubmitted_iter(void *data, h2_io *io)
+{
+ io_iter_ctx *ctx = data;
+ if (io->worker_started && !io->worker_done
+ && h2_io_is_repeatable(io)
+ && !h2_io_set_get(ctx->m->redo_ios, io->id)) {
+ /* this io occupies a worker, the response has not been submitted yet,
+ * not been cancelled and it is a repeatable request
+ * -> it can be re-scheduled later */
+ if (!ctx->io || ctx->io->started_at < io->started_at) {
+ /* we did not have one or this one was started later */
+ ctx->io = io;
+ }
+ }
+ return 1;
+}
+
+static h2_io *get_latest_repeatable_busy_unsubmitted_io(h2_mplx *m)
+{
+ io_iter_ctx ctx;
+ ctx.m = m;
+ ctx.io = NULL;
+ h2_io_set_iter(m->stream_ios, latest_repeatable_busy_unsubmitted_iter, &ctx);
+ return ctx.io;
+}
+
+static int timed_out_busy_iter(void *data, h2_io *io)
+{
+ io_iter_ctx *ctx = data;
+ if (io->worker_started && !io->worker_done
+ && (ctx->now - io->started_at) > ctx->m->stream_timeout) {
+ /* timed out stream occupying a worker, found */
+ ctx->io = io;
+ return 0;
}
+ return 1;
+}
+static h2_io *get_timed_out_busy_stream(h2_mplx *m)
+{
+ io_iter_ctx ctx;
+ ctx.m = m;
+ ctx.io = NULL;
+ ctx.now = apr_time_now();
+ h2_io_set_iter(m->stream_ios, timed_out_busy_iter, &ctx);
+ return ctx.io;
+}
+
+static apr_status_t unschedule_slow_ios(h2_mplx *m)
+{
+ h2_io *io;
+ int n;
+
+ if (!m->redo_ios) {
+ m->redo_ios = h2_io_set_create(m->pool);
+ }
+ /* Try to get rid of streams that occupy workers. Look for safe requests
+ * that are repeatable. If none found, fail the connection.
+ */
+ n = (m->workers_busy - m->workers_limit - h2_io_set_size(m->redo_ios));
+ while (n > 0 && (io = get_latest_repeatable_busy_unsubmitted_io(m))) {
+ h2_io_set_add(m->redo_ios, io);
+ h2_io_rst(io, H2_ERR_CANCEL);
+ --n;
+ }
+
+ if ((m->workers_busy - h2_io_set_size(m->redo_ios)) > m->workers_limit) {
+ io = get_timed_out_busy_stream(m);
+ if (io) {
+ /* Too many busy workers, unable to cancel enough streams
+ * and with a busy, timed out stream, we tell the client
+ * to go away... */
+ return APR_TIMEUP;
+ }
+ }
+ return APR_SUCCESS;
+}
+
+apr_status_t h2_mplx_idle(h2_mplx *m)
+{
+ apr_status_t status = APR_SUCCESS;
+ apr_time_t now;
+ int acquired;
+
+ if (enter_mutex(m, &acquired) == APR_SUCCESS) {
+ apr_size_t scount = h2_io_set_size(m->stream_ios);
+ if (scount > 0 && m->workers_busy) {
+ /* If we have streams in connection state 'IDLE', meaning
+ * all streams are ready to sent data out, but lack
+ * WINDOW_UPDATEs.
+ *
+ * This is ok, unless we have streams that still occupy
+ * h2 workers. As worker threads are a scarce resource,
+ * we need to take measures that we do not get DoSed.
+ *
+ * This is what we call an 'idle block'. Limit the amount
+ * of busy workers we allow for this connection until it
+ * well behaves.
+ */
+ now = apr_time_now();
+ m->last_idle_block = now;
+ if (m->workers_limit > 2
+ && now - m->last_limit_change >= m->limit_change_interval) {
+ if (m->workers_limit > 16) {
+ m->workers_limit = 16;
+ }
+ else if (m->workers_limit > 8) {
+ m->workers_limit = 8;
+ }
+ else if (m->workers_limit > 4) {
+ m->workers_limit = 4;
+ }
+ else if (m->workers_limit > 2) {
+ m->workers_limit = 2;
+ }
+ m->last_limit_change = now;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): decrease worker limit to %d",
+ m->id, m->workers_limit);
+ }
+
+ if (m->workers_busy > m->workers_limit) {
+ status = unschedule_slow_ios(m);
+ }
+ }
+ leave_mutex(m, acquired);
+ }
+ return status;
}
/*******************************************************************************