From 2e4775717151c828daad0111a690e458c97ab5ab Mon Sep 17 00:00:00 2001 From: Stefan Eissing Date: Wed, 24 Feb 2016 16:20:13 +0000 Subject: [PATCH] limiting the number of threads a http/2 connection may occupy git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1732183 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES | 8 ++ modules/http2/h2_int_queue.c | 5 + modules/http2/h2_int_queue.h | 5 + modules/http2/h2_mplx.c | 119 +++++++++++++--------- modules/http2/h2_mplx.h | 5 +- modules/http2/h2_proxy_session.c | 3 - modules/http2/h2_version.h | 4 +- modules/http2/h2_workers.c | 164 ++++++++++++++++--------------- modules/http2/h2_workers.h | 3 +- 9 files changed, 184 insertions(+), 132 deletions(-) diff --git a/CHANGES b/CHANGES index 82d78253b2..bd0430105d 100644 --- a/CHANGES +++ b/CHANGES @@ -1,6 +1,14 @@ -*- coding: utf-8 -*- Changes with Apache 2.5.0 + *) mod_http2: limiting the amount of h2 workers a HTTP/2 connection can + occupy at a time to make DoS life harder. Limiting this to static 6 in + honor of http/1.1 connection limits, considering implementing dynamic + adjustments based on load and throughput. + This does *not* limit the number of streams a client may open, rather the + number of server threads a connection might use. + [Stefan Eissing] + *) mod_auth_digest: Fix compatibility with expression-based Authname. PR59039. [Eric Covener] diff --git a/modules/http2/h2_int_queue.c b/modules/http2/h2_int_queue.c index ba44afb71c..472ae34063 100644 --- a/modules/http2/h2_int_queue.c +++ b/modules/http2/h2_int_queue.c @@ -87,6 +87,11 @@ int h2_iq_remove(h2_int_queue *q, int sid) return 0; } +void h2_iq_clear(h2_int_queue *q) +{ + q->nelts = 0; +} + void h2_iq_sort(h2_int_queue *q, h2_iq_cmp *cmp, void *ctx) { /* Assume that changes in ordering are minimal. This needs, diff --git a/modules/http2/h2_int_queue.h b/modules/http2/h2_int_queue.h index 6cdd84c42b..69f1e1c982 100644 --- a/modules/http2/h2_int_queue.h +++ b/modules/http2/h2_int_queue.h @@ -81,6 +81,11 @@ void h2_iq_add(h2_int_queue *q, int sid, h2_iq_cmp *cmp, void *ctx); */ int h2_iq_remove(h2_int_queue *q, int sid); +/** + * Remove all entries in the queue. + */ +void h2_iq_clear(h2_int_queue *q); + /** * Sort the stream idqueue again. Call if the task ordering * has changed. diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index 1302bc12d0..47e222f3d6 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -199,7 +199,7 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, return NULL; } - status = apr_thread_cond_create(&m->request_done, m->pool); + status = apr_thread_cond_create(&m->task_done, m->pool); if (status != APR_SUCCESS) { h2_mplx_destroy(m); return NULL; @@ -218,6 +218,7 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM); m->stream_timeout = stream_timeout; m->workers = workers; + m->workers_max = 6; m->tx_handles_reserved = 0; m->tx_chunk_size = 4; @@ -250,6 +251,7 @@ static void workers_register(h2_mplx *m) * Therefore: ref counting for h2_workers in not needed, ref counting * for h2_worker using this is critical. */ + m->need_registration = 0; h2_workers_register(m->workers, m); } @@ -354,13 +356,15 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) int acquired; h2_workers_unregister(m->workers, m); + if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { int i, wait_secs = 5; /* disable WINDOW_UPDATE callbacks */ h2_mplx_set_consumed_cb(m, NULL, NULL); - apr_thread_cond_broadcast(m->request_done); + h2_iq_clear(m->q); + apr_thread_cond_broadcast(m->task_done); while (!h2_io_set_iter(m->stream_ios, stream_done_iter, m)) { /* iterate until all ios have been orphaned or destroyed */ } @@ -392,7 +396,7 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) } } m->aborted = 1; - apr_thread_cond_broadcast(m->request_done); + apr_thread_cond_broadcast(m->task_done); } } ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056) @@ -1014,7 +1018,7 @@ apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, const h2_request *req, h2_stream_pri_cmp *cmp, void *ctx) { apr_status_t status; - int was_empty = 0; + int do_registration = 0; int acquired; AP_DEBUG_ASSERT(m); @@ -1030,7 +1034,8 @@ apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, const h2_request *req, status = h2_io_in_close(io); } - was_empty = h2_iq_empty(m->q); + m->need_registration = m->need_registration || h2_iq_empty(m->q); + do_registration = (m->need_registration && m->workers_busy < m->workers_max); h2_iq_add(m->q, io->id, cmp, ctx); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c, @@ -1039,7 +1044,7 @@ apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, const h2_request *req, } leave_mutex(m, acquired); } - if (status == APR_SUCCESS && was_empty) { + if (status == APR_SUCCESS && do_registration) { workers_register(m); } return status; @@ -1049,7 +1054,9 @@ static h2_task *pop_task(h2_mplx *m) { h2_task *task = NULL; int sid; - while (!m->aborted && !task && (sid = h2_iq_shift(m->q)) > 0) { + while (!m->aborted && !task + && (m->workers_busy < m->workers_max) + && (sid = h2_iq_shift(m->q)) > 0) { h2_io *io = h2_io_set_get(m->stream_ios, sid); if (io) { conn_rec *c; @@ -1077,6 +1084,7 @@ static h2_task *pop_task(h2_mplx *m) if (sid > m->max_stream_started) { m->max_stream_started = sid; } + ++m->workers_busy; } } return task; @@ -1097,62 +1105,75 @@ h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more) task = pop_task(m); *has_more = !h2_iq_empty(m->q); } + + if (has_more && !task) { + m->need_registration = 1; + } leave_mutex(m, acquired); } return task; } -void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask) +static void task_done(h2_mplx *m, h2_task *task) { - int acquired; - - if (enter_mutex(m, &acquired) == APR_SUCCESS) { - if (task) { - if (task->frozen) { - /* this task was handed over to an engine for processing */ - h2_task_thaw(task); - /* TODO: can we signal an engine that it can now start on this? */ + if (task) { + if (task->frozen) { + /* this task was handed over to an engine for processing */ + h2_task_thaw(task); + /* TODO: can we signal an engine that it can now start on this? */ + } + else { + h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id); + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%ld): task(%s) done", m->id, task->id); + /* clean our references and report request as done. Signal + * that we want another unless we have been aborted */ + /* TODO: this will keep a worker attached to this h2_mplx as + * long as it has requests to handle. Might no be fair to + * other mplx's. Perhaps leave after n requests? */ + + if (task->c) { + apr_pool_destroy(task->c->pool); } - else { - h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id); - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - "h2_mplx(%ld): task(%s) done", m->id, task->id); - /* clean our references and report request as done. Signal - * that we want another unless we have been aborted */ - /* TODO: this will keep a worker attached to this h2_mplx as - * long as it has requests to handle. Might no be fair to - * other mplx's. Perhaps leave after n requests? */ - - if (task->c) { - apr_pool_destroy(task->c->pool); - } - task = NULL; - if (io) { - io->processing_done = 1; - h2_mplx_out_close(m, io->id, NULL); - if (io->orphaned) { - io_destroy(m, io, 0); - if (m->join_wait) { - apr_thread_cond_signal(m->join_wait); - } - } - else { - /* hang around until the stream deregisteres */ + task = NULL; + if (io) { + io->processing_done = 1; + h2_mplx_out_close(m, io->id, NULL); + if (io->orphaned) { + io_destroy(m, io, 0); + if (m->join_wait) { + apr_thread_cond_signal(m->join_wait); } } - apr_thread_cond_broadcast(m->request_done); + else { + /* hang around until the stream deregisteres */ + } } + apr_thread_cond_broadcast(m->task_done); } - + } + +} + +void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask) +{ + int acquired, do_registration = 0; + + if (enter_mutex(m, &acquired) == APR_SUCCESS) { + task_done(m, task); + --m->workers_busy; if (ptask) { /* caller wants another task */ *ptask = pop_task(m); } + do_registration = (m->workers_busy+1 == m->workers_max); leave_mutex(m, acquired); } + if (do_registration) { + workers_register(m); + } } - /******************************************************************************* * HTTP/2 request engines ******************************************************************************/ @@ -1327,7 +1348,7 @@ static apr_status_t engine_pull(h2_mplx *m, h2_req_engine_i *engine, *pr = NULL; return APR_EOF; } - apr_thread_cond_timedwait(m->request_done, m->lock, + apr_thread_cond_timedwait(m->task_done, m->lock, apr_time_from_msec(100)); } } @@ -1351,6 +1372,7 @@ apr_status_t h2_mplx_engine_pull(h2_req_engine *pub_engine, static void engine_done(h2_mplx *m, h2_req_engine_i *engine, h2_task *task, int waslive, int aborted) { + int acquired; ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, "h2_mplx(%ld): task %s %s by %s", m->id, task->id, aborted? "aborted":"done", @@ -1360,7 +1382,10 @@ static void engine_done(h2_mplx *m, h2_req_engine_i *engine, h2_task *task, if (waslive) engine->no_live--; engine->no_assigned--; if (task->c != engine->c) { /* do not release what the engine runs on */ - h2_mplx_task_done(m, task, NULL); + if (enter_mutex(m, &acquired) == APR_SUCCESS) { + task_done(m, task); + leave_mutex(m, acquired); + } } } @@ -1406,7 +1431,7 @@ void h2_mplx_engine_exit(h2_req_engine *pub_engine) engine_done(m, engine, task, 0, 1); } } - if (engine->no_assigned > 0 || engine->no_live > 0) { + if (engine->no_assigned > 1 || engine->no_live > 1) { ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, "h2_mplx(%ld): exit engine %s (%s), " "assigned=%ld, live=%ld, finished=%ld", diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index 7b8b30251d..aebd672e04 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -74,10 +74,13 @@ struct h2_mplx { struct h2_io_set *ready_ios; int max_stream_started; /* highest stream id that started processing */ + int workers_busy; /* # of workers processing on this mplx */ + int workers_max; /* max # of workers occupied by this mplx */ + int need_registration; apr_thread_mutex_t *lock; struct apr_thread_cond_t *added_output; - struct apr_thread_cond_t *request_done; + struct apr_thread_cond_t *task_done; struct apr_thread_cond_t *join_wait; apr_socket_t *dummy_socket; diff --git a/modules/http2/h2_proxy_session.c b/modules/http2/h2_proxy_session.c index 83755da711..9d37cbade1 100644 --- a/modules/http2/h2_proxy_session.c +++ b/modules/http2/h2_proxy_session.c @@ -154,7 +154,6 @@ static int on_frame_recv(nghttp2_session *ngh2, const nghttp2_frame *frame, { h2_proxy_session *session = user_data; h2_proxy_stream *stream; - int eos; if (APLOGcdebug(session->c)) { char buffer[256]; @@ -168,8 +167,6 @@ static int on_frame_recv(nghttp2_session *ngh2, const nghttp2_frame *frame, switch (frame->hd.type) { case NGHTTP2_HEADERS: stream = nghttp2_session_get_stream_user_data(ngh2, frame->hd.stream_id); - eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM); - break; case NGHTTP2_PUSH_PROMISE: break; diff --git a/modules/http2/h2_version.h b/modules/http2/h2_version.h index 40b06cc7b4..f02560cedb 100644 --- a/modules/http2/h2_version.h +++ b/modules/http2/h2_version.h @@ -26,7 +26,7 @@ * @macro * Version number of the http2 module as c string */ -#define MOD_HTTP2_VERSION "1.3.0-DEV" +#define MOD_HTTP2_VERSION "1.3.1-DEV" /** * @macro @@ -34,7 +34,7 @@ * release. This is a 24 bit number with 8 bits for major number, 8 bits * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203. */ -#define MOD_HTTP2_VERSION_NUM 0x010300 +#define MOD_HTTP2_VERSION_NUM 0x010301 #endif /* mod_h2_h2_version_h */ diff --git a/modules/http2/h2_workers.c b/modules/http2/h2_workers.c index 7423707e5c..6b6897cbe5 100644 --- a/modules/http2/h2_workers.c +++ b/modules/http2/h2_workers.c @@ -61,6 +61,43 @@ static void cleanup_zombies(h2_workers *workers, int lock) } } +static h2_task *next_task(h2_workers *workers) +{ + h2_task *task = NULL; + h2_mplx *last = NULL; + int has_more; + + /* Get the next h2_mplx to process that has a task to hand out. + * If it does, place it at the end of the queu and return the + * task to the worker. + * If it (currently) has no tasks, remove it so that it needs + * to register again for scheduling. + * If we run out of h2_mplx in the queue, we need to wait for + * new mplx to arrive. Depending on how many workers do exist, + * we do a timed wait or block indefinitely. + */ + while (!task && !H2_MPLX_LIST_EMPTY(&workers->mplxs)) { + h2_mplx *m = H2_MPLX_LIST_FIRST(&workers->mplxs); + + if (last == m) { + break; + } + H2_MPLX_REMOVE(m); + --workers->mplx_count; + + task = h2_mplx_pop_task(m, &has_more); + + if (has_more) { + H2_MPLX_LIST_INSERT_TAIL(&workers->mplxs, m); + ++workers->mplx_count; + if (!last) { + last = m; + } + } + } + return task; +} + /** * Get the next task for the given worker. Will block until a task arrives * or the max_wait timer expires and more than min workers exist. @@ -69,8 +106,8 @@ static apr_status_t get_mplx_next(h2_worker *worker, void *ctx, h2_task **ptask, int *psticky) { apr_status_t status; - apr_time_t max_wait, start_wait = 0; - h2_workers *workers = (h2_workers *)ctx; + apr_time_t wait_until = 0, now; + h2_workers *workers = ctx; h2_task *task = NULL; *ptask = NULL; @@ -82,75 +119,51 @@ static apr_status_t get_mplx_next(h2_worker *worker, void *ctx, ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, "h2_worker(%d): looking for work", h2_worker_get_id(worker)); - while (!task && !h2_worker_is_aborted(worker) && !workers->aborted) { - - /* Get the next h2_mplx to process that has a task to hand out. - * If it does, place it at the end of the queu and return the - * task to the worker. - * If it (currently) has no tasks, remove it so that it needs - * to register again for scheduling. - * If we run out of h2_mplx in the queue, we need to wait for - * new mplx to arrive. Depending on how many workers do exist, - * we do a timed wait or block indefinitely. - */ - while (!task && !H2_MPLX_LIST_EMPTY(&workers->mplxs)) { - h2_mplx *m; - int has_more = 0; - - m = H2_MPLX_LIST_FIRST(&workers->mplxs); - H2_MPLX_REMOVE(m); - --workers->mplx_count; - - task = h2_mplx_pop_task(m, &has_more); + while (!h2_worker_is_aborted(worker) && !workers->aborted + && !(task = next_task(workers))) { + + /* Need to wait for a new tasks to arrive. If we are above + * minimum workers, we do a timed wait. When timeout occurs + * and we have still more workers, we shut down one after + * the other. */ + cleanup_zombies(workers, 0); + if (workers->worker_count > workers->min_workers) { + now = apr_time_now(); + if (now >= wait_until) { + wait_until = now + apr_time_from_sec(workers->max_idle_secs); + } - if (task) { - if (has_more) { - H2_MPLX_LIST_INSERT_TAIL(&workers->mplxs, m); - ++workers->mplx_count; - } + ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, + "h2_worker(%d): waiting signal, " + "workers=%d, idle=%d", worker->id, + (int)workers->worker_count, + workers->idle_workers); + status = apr_thread_cond_timedwait(workers->mplx_added, + workers->lock, + wait_until - now); + if (status == APR_TIMEUP + && workers->worker_count > workers->min_workers) { + /* waited long enough without getting a task and + * we are above min workers, abort this one. */ + ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, + workers->s, + "h2_workers: aborting idle worker"); + h2_worker_abort(worker); break; } } - - if (!task) { - /* Need to wait for a new mplx to arrive. - */ - cleanup_zombies(workers, 0); - - if (workers->worker_count > workers->min_workers) { - if (start_wait == 0) { - start_wait = apr_time_now(); - max_wait = apr_time_from_sec(apr_atomic_read32(&workers->max_idle_secs)); - } - else if (apr_time_now() >= (start_wait + max_wait)) { - /* waited long enough without getting a task. */ - if (workers->worker_count > workers->min_workers) { - ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, - workers->s, - "h2_workers: aborting idle worker"); - h2_worker_abort(worker); - break; - } - } - ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, - "h2_worker(%d): waiting signal, " - "worker_count=%d", worker->id, - (int)workers->worker_count); - apr_thread_cond_timedwait(workers->mplx_added, - workers->lock, max_wait); - } - else { - ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, - "h2_worker(%d): waiting signal (eternal), " - "worker_count=%d", worker->id, - (int)workers->worker_count); - apr_thread_cond_wait(workers->mplx_added, workers->lock); - } + else { + ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, + "h2_worker(%d): waiting signal (eternal), " + "worker_count=%d, idle=%d", worker->id, + (int)workers->worker_count, + workers->idle_workers); + apr_thread_cond_wait(workers->mplx_added, workers->lock); } } - /* Here, we either have gotten task and mplx for the worker or - * needed to give up with more than enough workers. + /* Here, we either have gotten task or decided to shut down + * the calling worker. */ if (task) { /* Ok, we got something to give back to the worker for execution. @@ -158,30 +171,28 @@ static apr_status_t get_mplx_next(h2_worker *worker, void *ctx, * we let the worker be sticky, e.g. making it poll the task's * h2_mplx instance for more work before asking back here. * This avoids entering our global lock as long as enough idle - * workers remain. + * workers remain. Stickiness of a worker ends when the connection + * has no new tasks to process, so the worker will get back here + * eventually. */ *ptask = task; - *psticky = (workers->idle_workers - 1 > workers->mplx_count); + *psticky = (workers->max_workers >= workers->mplx_count); if (workers->mplx_count && workers->idle_workers > 1) { apr_thread_cond_signal(workers->mplx_added); } - status = APR_SUCCESS; - } - else { - status = APR_EOF; } --workers->idle_workers; apr_thread_mutex_unlock(workers->lock); } - return status; + return *ptask? APR_SUCCESS : APR_EOF; } static void worker_done(h2_worker *worker, void *ctx) { - h2_workers *workers = (h2_workers *)ctx; + h2_workers *workers = ctx; apr_status_t status = apr_thread_mutex_lock(workers->lock); if (status == APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, @@ -249,7 +260,7 @@ h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool, workers->pool = pool; workers->min_workers = min_workers; workers->max_workers = max_workers; - apr_atomic_set32(&workers->max_idle_secs, 10); + workers->max_idle_secs = 10; workers->max_tx_handles = max_tx_handles; workers->spare_tx_handles = workers->max_tx_handles; @@ -324,10 +335,9 @@ apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m) apr_status_t status = apr_thread_mutex_lock(workers->lock); if (status == APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_TRACE3, status, workers->s, - "h2_workers: register mplx(%ld)", m->id); + "h2_workers: register mplx(%ld), idle=%d", + m->id, workers->idle_workers); if (in_list(workers, m)) { - ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, - "h2_workers: already registered mplx(%ld)", m->id); status = APR_EAGAIN; } else { @@ -373,7 +383,7 @@ void h2_workers_set_max_idle_secs(h2_workers *workers, int idle_secs) " is not valid, ignored.", idle_secs); return; } - apr_atomic_set32(&workers->max_idle_secs, idle_secs); + workers->max_idle_secs = idle_secs; } apr_size_t h2_workers_tx_reserve(h2_workers *workers, apr_size_t count) diff --git a/modules/http2/h2_workers.h b/modules/http2/h2_workers.h index 5a19e8e27e..ae7b4d8969 100644 --- a/modules/http2/h2_workers.h +++ b/modules/http2/h2_workers.h @@ -39,6 +39,7 @@ struct h2_workers { int max_workers; int worker_count; int idle_workers; + int max_idle_secs; apr_size_t max_tx_handles; apr_size_t spare_tx_handles; @@ -52,8 +53,6 @@ struct h2_workers { APR_RING_HEAD(h2_mplx_list, h2_mplx) mplxs; int mplx_count; - volatile apr_uint32_t max_idle_secs; - struct apr_thread_mutex_t *lock; struct apr_thread_cond_t *mplx_added; -- 2.40.0