From 6db47ac4498920b4a9e4ef7bb7ff922b103cba4a Mon Sep 17 00:00:00 2001 From: Stefan Eissing Date: Mon, 10 Apr 2017 15:04:55 +0000 Subject: [PATCH] On the 2.4.x branch: Merged /httpd/httpd/trunk:r1789740,1790102,1790113,1790284,1790754,1790826-1790827,1790842 git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/branches/2.4.x@1790847 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES | 7 +- modules/http2/h2_bucket_beam.c | 4 + modules/http2/h2_filter.c | 31 +- modules/http2/h2_mplx.c | 653 +++++++++++++++---------------- modules/http2/h2_mplx.h | 4 +- modules/http2/h2_ngn_shed.c | 15 +- modules/http2/h2_proxy_session.c | 26 +- modules/http2/h2_proxy_util.c | 281 +++++++++++++ modules/http2/h2_proxy_util.h | 51 +++ modules/http2/h2_session.c | 28 +- modules/http2/h2_session.h | 5 + modules/http2/h2_stream.c | 171 ++++---- modules/http2/h2_task.c | 2 +- modules/http2/h2_util.c | 88 +++-- modules/http2/h2_util.h | 28 +- modules/http2/h2_version.h | 4 +- modules/http2/h2_workers.c | 38 +- modules/http2/h2_workers.h | 4 +- modules/http2/mod_proxy_http2.c | 179 ++++----- 19 files changed, 1011 insertions(+), 608 deletions(-) diff --git a/CHANGES b/CHANGES index 25198ab917..42d1f0b048 100644 --- a/CHANGES +++ b/CHANGES @@ -2,6 +2,10 @@ Changes with Apache 2.4.26 + *) mod_proxy_http2: Fixed bug in re-attempting proxy requests after + connection error. Reliability of reconnect handling improved. + [Stefan Eissing] + *) mod_http2: better performance, eliminated need for nested locks and thread privates. Moving request setups from the main connection to the worker threads. Increase number of spare connections kept. @@ -22,9 +26,6 @@ Changes with Apache 2.4.26 format from 2.2 in the Last Modified column. PR60846. [Hank Ibell ] - *) mod_http2: fixed PR60869 by making h2 workers exit explicitly waking up - all threads to exit in a defined way. [Stefan Eissing] - *) core: Add %{REMOTE_PORT} to the expression parser. PR59938 [Hank Ibell ] diff --git a/modules/http2/h2_bucket_beam.c b/modules/http2/h2_bucket_beam.c index 17ad3d95f1..872c67544d 100644 --- a/modules/http2/h2_bucket_beam.c +++ b/modules/http2/h2_bucket_beam.c @@ -1035,7 +1035,11 @@ transfer: ++transferred; } else { + /* let outside hook determine how bucket is beamed */ + leave_yellow(beam, &bl); brecv = h2_beam_bucket(beam, bb, bsender); + enter_yellow(beam, &bl); + while (brecv && brecv != APR_BRIGADE_SENTINEL(bb)) { ++transferred; remain -= brecv->length; diff --git a/modules/http2/h2_filter.c b/modules/http2/h2_filter.c index 3a8a3b1ad1..c1f1a847d2 100644 --- a/modules/http2/h2_filter.c +++ b/modules/http2/h2_filter.c @@ -428,38 +428,41 @@ static void add_stats(apr_bucket_brigade *bb, h2_session *s, static apr_status_t h2_status_insert(h2_task *task, apr_bucket *b) { - h2_mplx *m = task->mplx; - h2_stream *stream = h2_mplx_stream_get(m, task->stream_id); - h2_session *s; - conn_rec *c; - + conn_rec *c = task->c->master; + h2_ctx *h2ctx = h2_ctx_get(c, 0); + h2_session *session; + h2_stream *stream; apr_bucket_brigade *bb; apr_bucket *e; int32_t connFlowIn, connFlowOut; + + if (!h2ctx || (session = h2_ctx_session_get(h2ctx)) == NULL) { + return APR_SUCCESS; + } + + stream = h2_session_stream_get(session, task->stream_id); if (!stream) { /* stream already done */ return APR_SUCCESS; } - s = stream->session; - c = s->c; bb = apr_brigade_create(stream->pool, c->bucket_alloc); - connFlowIn = nghttp2_session_get_effective_local_window_size(s->ngh2); - connFlowOut = nghttp2_session_get_remote_window_size(s->ngh2); + connFlowIn = nghttp2_session_get_effective_local_window_size(session->ngh2); + connFlowOut = nghttp2_session_get_remote_window_size(session->ngh2); bbout(bb, "{\n"); bbout(bb, " \"version\": \"draft-01\",\n"); - add_settings(bb, s, 0); - add_peer_settings(bb, s, 0); + add_settings(bb, session, 0); + add_peer_settings(bb, session, 0); bbout(bb, " \"connFlowIn\": %d,\n", connFlowIn); bbout(bb, " \"connFlowOut\": %d,\n", connFlowOut); - bbout(bb, " \"sentGoAway\": %d,\n", s->local.shutdown); + bbout(bb, " \"sentGoAway\": %d,\n", session->local.shutdown); - add_streams(bb, s, 0); + add_streams(bb, session, 0); - add_stats(bb, s, stream, 1); + add_stats(bb, session, stream, 1); bbout(bb, "}\n"); while ((e = APR_BRIGADE_FIRST(bb)) != APR_BRIGADE_SENTINEL(bb)) { diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index 04fbbd05cb..357bf5eaad 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -55,58 +55,29 @@ typedef struct { apr_time_t now; } stream_iter_ctx; -/* NULL or the mutex hold by this thread, used for recursive calls - */ -static const int nested_lock = 0; - -static apr_threadkey_t *thread_lock; - apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s) { - if (nested_lock) { - return apr_threadkey_private_create(&thread_lock, NULL, pool); - } return APR_SUCCESS; } -static apr_status_t enter_mutex(h2_mplx *m, int *pacquired) -{ - apr_status_t status; - - if (nested_lock) { - void *mutex = NULL; - /* Enter the mutex if this thread already holds the lock or - * if we can acquire it. Only on the later case do we unlock - * onleaving the mutex. - * This allow recursive entering of the mutex from the saem thread, - * which is what we need in certain situations involving callbacks - */ - apr_threadkey_private_get(&mutex, thread_lock); - if (mutex == m->lock) { - *pacquired = 0; - ap_assert(NULL); /* nested, why? */ - return APR_SUCCESS; - } - } - status = apr_thread_mutex_lock(m->lock); - *pacquired = (status == APR_SUCCESS); - if (nested_lock && *pacquired) { - apr_threadkey_private_set(m->lock, thread_lock); - } - return status; -} +#define H2_MPLX_ENTER(m) \ + do { apr_status_t rv; if ((rv = apr_thread_mutex_lock(m->lock)) != APR_SUCCESS) {\ + return rv;\ + } } while(0) -static void leave_mutex(h2_mplx *m, int acquired) -{ - if (acquired) { - if (nested_lock) { - apr_threadkey_private_set(NULL, thread_lock); - } - apr_thread_mutex_unlock(m->lock); - } -} +#define H2_MPLX_LEAVE(m) \ + apr_thread_mutex_unlock(m->lock) + +#define H2_MPLX_ENTER_ALWAYS(m) \ + apr_thread_mutex_lock(m->lock) -static void check_data_for(h2_mplx *m, int stream_id); +#define H2_MPLX_ENTER_MAYBE(m, lock) \ + if (lock) apr_thread_mutex_lock(m->lock) + +#define H2_MPLX_LEAVE_MAYBE(m, lock) \ + if (lock) apr_thread_mutex_unlock(m->lock) + +static void check_data_for(h2_mplx *m, h2_stream *stream, int lock); static void stream_output_consumed(void *ctx, h2_bucket_beam *beam, apr_off_t length) @@ -155,6 +126,7 @@ static void stream_cleanup(h2_mplx *m, h2_stream *stream) h2_stream_cleanup(stream); h2_iq_remove(m->q, stream->id); + h2_fifo_remove(m->readyq, stream); h2_ihash_remove(m->streams, stream->id); h2_ihash_add(m->shold, stream); @@ -240,7 +212,12 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id)); m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id)); m->q = h2_iq_create(m->pool, m->max_streams); - m->readyq = h2_iq_create(m->pool, m->max_streams); + + status = h2_fifo_set_create(&m->readyq, m->pool, m->max_streams); + if (status != APR_SUCCESS) { + apr_pool_destroy(m->pool); + return NULL; + } m->workers = workers; m->max_active = workers->max_workers; @@ -259,14 +236,15 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, int h2_mplx_shutdown(h2_mplx *m) { - int acquired, max_stream_started = 0; + int max_stream_started = 0; - if (enter_mutex(m, &acquired) == APR_SUCCESS) { - max_stream_started = m->max_stream_started; - /* Clear schedule queue, disabling existing streams from starting */ - h2_iq_clear(m->q); - leave_mutex(m, acquired); - } + H2_MPLX_ENTER(m); + + max_stream_started = m->max_stream_started; + /* Clear schedule queue, disabling existing streams from starting */ + h2_iq_clear(m->q); + + H2_MPLX_LEAVE(m); return max_stream_started; } @@ -341,12 +319,14 @@ static int stream_destroy_iter(void *ctx, void *val) return 0; } -static void purge_streams(h2_mplx *m) +static void purge_streams(h2_mplx *m, int lock) { if (!h2_ihash_empty(m->spurge)) { + H2_MPLX_ENTER_MAYBE(m, lock); while (!h2_ihash_iter(m->spurge, stream_destroy_iter, m)) { /* repeat until empty */ } + H2_MPLX_LEAVE_MAYBE(m, lock); } } @@ -363,18 +343,16 @@ static int stream_iter_wrap(void *ctx, void *stream) apr_status_t h2_mplx_stream_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx) { - apr_status_t status; - int acquired; + stream_iter_ctx_t x; - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - stream_iter_ctx_t x; - x.cb = cb; - x.ctx = ctx; - h2_ihash_iter(m->streams, stream_iter_wrap, &x); + H2_MPLX_ENTER(m); + + x.cb = cb; + x.ctx = ctx; + h2_ihash_iter(m->streams, stream_iter_wrap, &x); - leave_mutex(m, acquired); - } - return status; + H2_MPLX_LEAVE(m); + return APR_SUCCESS; } static int report_stream_iter(void *ctx, void *val) { @@ -430,14 +408,13 @@ void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) { apr_status_t status; int i, wait_secs = 60; - int acquired; /* How to shut down a h2 connection: * 0. abort and tell the workers that no more tasks will come from us */ m->aborted = 1; h2_workers_unregister(m->workers, m); - enter_mutex(m, &acquired); + H2_MPLX_ENTER_ALWAYS(m); /* How to shut down a h2 connection: * 1. cancel all streams still active */ @@ -482,7 +459,7 @@ void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) h2_ihash_iter(m->shold, unexpected_stream_iter, m); } - leave_mutex(m, acquired); + H2_MPLX_LEAVE(m); /* 5. unregister again, now that our workers are done */ h2_workers_unregister(m->workers, m); @@ -493,41 +470,34 @@ void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) apr_status_t h2_mplx_stream_cleanup(h2_mplx *m, h2_stream *stream) { - apr_status_t status = APR_SUCCESS; - int acquired; + H2_MPLX_ENTER(m); - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - H2_STRM_MSG(stream, "cleanup")); - stream_cleanup(m, stream); - leave_mutex(m, acquired); - } - return status; + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + H2_STRM_MSG(stream, "cleanup")); + stream_cleanup(m, stream); + + H2_MPLX_LEAVE(m); + return APR_SUCCESS; } h2_stream *h2_mplx_stream_get(h2_mplx *m, int id) { h2_stream *s = NULL; - int acquired; - if ((enter_mutex(m, &acquired)) == APR_SUCCESS) { - s = h2_ihash_get(m->streams, id); - leave_mutex(m, acquired); - } + H2_MPLX_ENTER_ALWAYS(m); + + s = h2_ihash_get(m->streams, id); + + H2_MPLX_LEAVE(m); return s; } static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes) { - h2_mplx *m = ctx; - int acquired; + h2_stream *stream = ctx; + h2_mplx *m = stream->session->mplx; - if (enter_mutex(m, &acquired) == APR_SUCCESS) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - "h2_mplx(%ld-%d): output_produced", m->c->id, beam->id); - check_data_for(m, beam->id); - leave_mutex(m, acquired); - } + check_data_for(m, stream, 1); } static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam) @@ -551,7 +521,7 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam) } h2_beam_on_consumed(stream->output, NULL, stream_output_consumed, stream); - h2_beam_on_produced(stream->output, output_produced, m); + h2_beam_on_produced(stream->output, output_produced, stream); if (stream->task->output.copy_files) { h2_beam_on_file_beam(stream->output, h2_beam_no_files, NULL); } @@ -561,24 +531,24 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam) /* we might see some file buckets in the output, see * if we have enough handles reserved. */ - check_data_for(m, stream->id); + check_data_for(m, stream, 0); return status; } apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam) { apr_status_t status; - int acquired; - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - if (m->aborted) { - status = APR_ECONNABORTED; - } - else { - status = out_open(m, stream_id, beam); - } - leave_mutex(m, acquired); + H2_MPLX_ENTER(m); + + if (m->aborted) { + status = APR_ECONNABORTED; } + else { + status = out_open(m, stream_id, beam); + } + + H2_MPLX_LEAVE(m); return status; } @@ -601,7 +571,7 @@ static apr_status_t out_close(h2_mplx *m, h2_task *task) status = h2_beam_close(task->output.beam); h2_beam_log(task->output.beam, m->c, APLOG_TRACE2, "out_close"); output_consumed_signal(m, task); - check_data_for(m, task->stream_id); + check_data_for(m, stream, 0); return status; } @@ -609,58 +579,61 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout, apr_thread_cond_t *iowait) { apr_status_t status; - int acquired; - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - if (m->aborted) { - status = APR_ECONNABORTED; - } - else if (apr_atomic_read32(&m->event_pending) > 0) { - status = APR_SUCCESS; - } - else { - purge_streams(m); - h2_ihash_iter(m->streams, report_consumption_iter, m); - m->added_output = iowait; - status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout); - if (APLOGctrace2(m->c)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - "h2_mplx(%ld): trywait on data for %f ms)", - m->id, timeout/1000.0); - } - m->added_output = NULL; + H2_MPLX_ENTER(m); + + if (m->aborted) { + status = APR_ECONNABORTED; + } + else if (h2_mplx_has_master_events(m)) { + status = APR_SUCCESS; + } + else { + purge_streams(m, 0); + h2_ihash_iter(m->streams, report_consumption_iter, m); + m->added_output = iowait; + status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout); + if (APLOGctrace2(m->c)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%ld): trywait on data for %f ms)", + m->id, timeout/1000.0); } - leave_mutex(m, acquired); + m->added_output = NULL; } + + H2_MPLX_LEAVE(m); return status; } -static void check_data_for(h2_mplx *m, int stream_id) +static void check_data_for(h2_mplx *m, h2_stream *stream, int lock) { - ap_assert(m); - h2_iq_append(m->readyq, stream_id); - apr_atomic_set32(&m->event_pending, 1); - if (m->added_output) { - apr_thread_cond_signal(m->added_output); + if (h2_fifo_push(m->readyq, stream) == APR_SUCCESS) { + apr_atomic_set32(&m->event_pending, 1); + H2_MPLX_ENTER_MAYBE(m, lock); + if (m->added_output) { + apr_thread_cond_signal(m->added_output); + } + H2_MPLX_LEAVE_MAYBE(m, lock); } } apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx) { apr_status_t status; - int acquired; - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - if (m->aborted) { - status = APR_ECONNABORTED; - } - else { - h2_iq_sort(m->q, cmp, ctx); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): reprioritize tasks", m->id); - } - leave_mutex(m, acquired); + H2_MPLX_ENTER(m); + + if (m->aborted) { + status = APR_ECONNABORTED; + } + else { + h2_iq_sort(m->q, cmp, ctx); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + "h2_mplx(%ld): reprioritize tasks", m->id); + status = APR_SUCCESS; } + + H2_MPLX_LEAVE(m); return status; } @@ -682,29 +655,30 @@ apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream, h2_stream_pri_cmp *cmp, void *ctx) { apr_status_t status; - int acquired; - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - if (m->aborted) { - status = APR_ECONNABORTED; + H2_MPLX_ENTER(m); + + if (m->aborted) { + status = APR_ECONNABORTED; + } + else { + status = APR_SUCCESS; + h2_ihash_add(m->streams, stream); + if (h2_stream_is_ready(stream)) { + /* already have a response */ + check_data_for(m, stream, 0); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + H2_STRM_MSG(stream, "process, add to readyq")); } else { - h2_ihash_add(m->streams, stream); - if (h2_stream_is_ready(stream)) { - /* already have a response */ - check_data_for(m, stream->id); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - H2_STRM_MSG(stream, "process, add to readyq")); - } - else { - h2_iq_add(m->q, stream->id, cmp, ctx); - register_if_needed(m); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - H2_STRM_MSG(stream, "process, added to q")); - } + h2_iq_add(m->q, stream->id, cmp, ctx); + register_if_needed(m); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + H2_STRM_MSG(stream, "process, added to q")); } - leave_mutex(m, acquired); } + + H2_MPLX_LEAVE(m); return status; } @@ -762,22 +736,21 @@ static h2_task *next_stream_task(h2_mplx *m) h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more) { h2_task *task = NULL; - apr_status_t status; - int acquired; + H2_MPLX_ENTER_ALWAYS(m); + *has_more = 0; - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - if (!m->aborted) { - task = next_stream_task(m); - if (task != NULL && !h2_iq_empty(m->q)) { - *has_more = 1; - } - else { - m->is_registered = 0; /* h2_workers will discard this mplx */ - } + if (!m->aborted) { + task = next_stream_task(m); + if (task != NULL && !h2_iq_empty(m->q)) { + *has_more = 1; + } + else { + m->is_registered = 0; /* h2_workers will discard this mplx */ } - leave_mutex(m, acquired); } + + H2_MPLX_LEAVE(m); return task; } @@ -814,7 +787,7 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) if (task->engine) { if (!m->aborted && !task->c->aborted && !h2_req_engine_is_shutdown(task->engine)) { - ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(10022) "h2_mplx(%ld): task(%s) has not-shutdown " "engine(%s)", m->id, task->id, h2_req_engine_get_id(task->engine)); @@ -845,35 +818,37 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) } stream = h2_ihash_get(m->streams, task->stream_id); - if (stream && !m->aborted && h2_ihash_get(m->sredo, stream->id)) { - /* reset and schedule again */ - h2_task_redo(task); - h2_ihash_remove(m->sredo, stream->id); - h2_iq_add(m->q, stream->id, NULL, NULL); - return; - } - if (stream) { - /* stream not cleaned up, stay around */ - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - H2_STRM_MSG(stream, "task_done, stream open")); - /* more data will not arrive, resume the stream */ - if (stream->input) { - h2_beam_mutex_disable(stream->input); - h2_beam_leave(stream->input); + /* stream not done yet. */ + if (!m->aborted && h2_ihash_get(m->sredo, stream->id)) { + /* reset and schedule again */ + h2_task_redo(task); + h2_ihash_remove(m->sredo, stream->id); + h2_iq_add(m->q, stream->id, NULL, NULL); } - if (stream->output) { - h2_beam_mutex_disable(stream->output); + else { + /* stream not cleaned up, stay around */ + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + H2_STRM_MSG(stream, "task_done, stream open")); + /* more data will not arrive, resume the stream */ + check_data_for(m, stream, 0); + + if (stream->input) { + h2_beam_leave(stream->input); + h2_beam_mutex_disable(stream->input); + } + if (stream->output) { + h2_beam_mutex_disable(stream->output); + } } - check_data_for(m, stream->id); } else if ((stream = h2_ihash_get(m->shold, task->stream_id)) != NULL) { + /* stream is done, was just waiting for this. */ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, H2_STRM_MSG(stream, "task_done, in hold")); - /* stream was just waiting for us. */ if (stream->input) { - h2_beam_mutex_disable(stream->input); h2_beam_leave(stream->input); + h2_beam_mutex_disable(stream->input); } if (stream->output) { h2_beam_mutex_disable(stream->output); @@ -895,21 +870,21 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask) { - int acquired; + H2_MPLX_ENTER_ALWAYS(m); + + task_done(m, task, NULL); + --m->tasks_active; - if (enter_mutex(m, &acquired) == APR_SUCCESS) { - task_done(m, task, NULL); - --m->tasks_active; - if (m->join_wait) { - apr_thread_cond_signal(m->join_wait); - } - if (ptask) { - /* caller wants another task */ - *ptask = next_stream_task(m); - } - register_if_needed(m); - leave_mutex(m, acquired); + if (m->join_wait) { + apr_thread_cond_signal(m->join_wait); + } + if (ptask) { + /* caller wants another task */ + *ptask = next_stream_task(m); } + register_if_needed(m); + + H2_MPLX_LEAVE(m); } /******************************************************************************* @@ -1001,52 +976,53 @@ apr_status_t h2_mplx_idle(h2_mplx *m) { apr_status_t status = APR_SUCCESS; apr_time_t now; - int acquired; + apr_size_t scount; - if (enter_mutex(m, &acquired) == APR_SUCCESS) { - apr_size_t scount = h2_ihash_count(m->streams); - if (scount > 0 && m->tasks_active) { - /* If we have streams in connection state 'IDLE', meaning - * all streams are ready to sent data out, but lack - * WINDOW_UPDATEs. - * - * This is ok, unless we have streams that still occupy - * h2 workers. As worker threads are a scarce resource, - * we need to take measures that we do not get DoSed. - * - * This is what we call an 'idle block'. Limit the amount - * of busy workers we allow for this connection until it - * well behaves. - */ - now = apr_time_now(); - m->last_idle_block = now; - if (m->limit_active > 2 - && now - m->last_limit_change >= m->limit_change_interval) { - if (m->limit_active > 16) { - m->limit_active = 16; - } - else if (m->limit_active > 8) { - m->limit_active = 8; - } - else if (m->limit_active > 4) { - m->limit_active = 4; - } - else if (m->limit_active > 2) { - m->limit_active = 2; - } - m->last_limit_change = now; - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): decrease worker limit to %d", - m->id, m->limit_active); + H2_MPLX_ENTER(m); + + scount = h2_ihash_count(m->streams); + if (scount > 0 && m->tasks_active) { + /* If we have streams in connection state 'IDLE', meaning + * all streams are ready to sent data out, but lack + * WINDOW_UPDATEs. + * + * This is ok, unless we have streams that still occupy + * h2 workers. As worker threads are a scarce resource, + * we need to take measures that we do not get DoSed. + * + * This is what we call an 'idle block'. Limit the amount + * of busy workers we allow for this connection until it + * well behaves. + */ + now = apr_time_now(); + m->last_idle_block = now; + if (m->limit_active > 2 + && now - m->last_limit_change >= m->limit_change_interval) { + if (m->limit_active > 16) { + m->limit_active = 16; } - - if (m->tasks_active > m->limit_active) { - status = unschedule_slow_tasks(m); + else if (m->limit_active > 8) { + m->limit_active = 8; + } + else if (m->limit_active > 4) { + m->limit_active = 4; + } + else if (m->limit_active > 2) { + m->limit_active = 2; } + m->last_limit_change = now; + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + "h2_mplx(%ld): decrease worker limit to %d", + m->id, m->limit_active); + } + + if (m->tasks_active > m->limit_active) { + status = unschedule_slow_tasks(m); } - register_if_needed(m); - leave_mutex(m, acquired); } + register_if_needed(m); + + H2_MPLX_LEAVE(m); return status; } @@ -1090,7 +1066,7 @@ apr_status_t h2_mplx_req_engine_push(const char *ngn_type, apr_status_t status; h2_mplx *m; h2_task *task; - int acquired; + h2_stream *stream; task = h2_ctx_rget_task(r); if (!task) { @@ -1098,17 +1074,17 @@ apr_status_t h2_mplx_req_engine_push(const char *ngn_type, } m = task->mplx; - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_stream *stream = h2_ihash_get(m->streams, task->stream_id); - - if (stream) { - status = h2_ngn_shed_push_request(m->ngn_shed, ngn_type, r, einit); - } - else { - status = APR_ECONNABORTED; - } - leave_mutex(m, acquired); + H2_MPLX_ENTER(m); + + stream = h2_ihash_get(m->streams, task->stream_id); + if (stream) { + status = h2_ngn_shed_push_request(m->ngn_shed, ngn_type, r, einit); + } + else { + status = APR_ECONNABORTED; } + + H2_MPLX_LEAVE(m); return status; } @@ -1120,35 +1096,36 @@ apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn, h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn); h2_mplx *m = h2_ngn_shed_get_ctx(shed); apr_status_t status; - int acquired; + int want_shutdown; - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - int want_shutdown = (block == APR_BLOCK_READ); + H2_MPLX_ENTER(m); - /* Take this opportunity to update output consummation - * for this engine */ - ngn_out_update_windows(m, ngn); - - if (want_shutdown && !h2_iq_empty(m->q)) { - /* For a blocking read, check first if requests are to be - * had and, if not, wait a short while before doing the - * blocking, and if unsuccessful, terminating read. - */ + want_shutdown = (block == APR_BLOCK_READ); + + /* Take this opportunity to update output consummation + * for this engine */ + ngn_out_update_windows(m, ngn); + + if (want_shutdown && !h2_iq_empty(m->q)) { + /* For a blocking read, check first if requests are to be + * had and, if not, wait a short while before doing the + * blocking, and if unsuccessful, terminating read. + */ + status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr); + if (APR_STATUS_IS_EAGAIN(status)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + "h2_mplx(%ld): start block engine pull", m->id); + apr_thread_cond_timedwait(m->task_thawed, m->lock, + apr_time_from_msec(20)); status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr); - if (APR_STATUS_IS_EAGAIN(status)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): start block engine pull", m->id); - apr_thread_cond_timedwait(m->task_thawed, m->lock, - apr_time_from_msec(20)); - status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr); - } - } - else { - status = h2_ngn_shed_pull_request(shed, ngn, capacity, - want_shutdown, pr); } - leave_mutex(m, acquired); } + else { + status = h2_ngn_shed_pull_request(shed, ngn, capacity, + want_shutdown, pr); + } + + H2_MPLX_LEAVE(m); return status; } @@ -1159,29 +1136,29 @@ void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn, if (task) { h2_mplx *m = task->mplx; - int acquired; + h2_stream *stream; - if (enter_mutex(m, &acquired) == APR_SUCCESS) { - h2_stream *stream = h2_ihash_get(m->streams, task->stream_id); - - ngn_out_update_windows(m, ngn); - h2_ngn_shed_done_task(m->ngn_shed, ngn, task); - - if (status != APR_SUCCESS && stream - && h2_task_can_redo(task) - && !h2_ihash_get(m->sredo, stream->id)) { - h2_ihash_add(m->sredo, stream); - } - if (task->engine) { - /* cannot report that as done until engine returns */ - } - else { - task_done(m, task, ngn); - } - /* Take this opportunity to update output consummation - * for this engine */ - leave_mutex(m, acquired); + H2_MPLX_ENTER_ALWAYS(m); + + stream = h2_ihash_get(m->streams, task->stream_id); + + ngn_out_update_windows(m, ngn); + h2_ngn_shed_done_task(m->ngn_shed, ngn, task); + + if (status != APR_SUCCESS && stream + && h2_task_can_redo(task) + && !h2_ihash_get(m->sredo, stream->id)) { + h2_ihash_add(m->sredo, stream); + } + + if (task->engine) { + /* cannot report that as done until engine returns */ } + else { + task_done(m, task, ngn); + } + + H2_MPLX_LEAVE(m); } } @@ -1198,65 +1175,47 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, stream_ev_callback *on_resume, void *on_ctx) { - apr_status_t status; - int acquired; - int ids[100]; h2_stream *stream; - size_t i, n; + int n; - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - "h2_mplx(%ld): dispatch events", m->id); - apr_atomic_set32(&m->event_pending, 0); - purge_streams(m); - - /* update input windows for streams */ - h2_ihash_iter(m->streams, report_consumption_iter, m); - - if (!h2_iq_empty(m->readyq)) { - n = h2_iq_mshift(m->readyq, ids, H2_ALEN(ids)); - for (i = 0; i < n; ++i) { - stream = h2_ihash_get(m->streams, ids[i]); - if (stream) { - leave_mutex(m, acquired); - on_resume(on_ctx, stream); - enter_mutex(m, &acquired); - } - } - } - if (!h2_iq_empty(m->readyq)) { - apr_atomic_set32(&m->event_pending, 1); - } - leave_mutex(m, acquired); + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%ld): dispatch events", m->id); + apr_atomic_set32(&m->event_pending, 0); + + /* update input windows for streams */ + h2_ihash_iter(m->streams, report_consumption_iter, m); + purge_streams(m, 1); + + n = h2_fifo_count(m->readyq); + while (n > 0 + && (h2_fifo_try_pull(m->readyq, (void**)&stream) == APR_SUCCESS)) { + --n; + on_resume(on_ctx, stream); } - return status; + + return APR_SUCCESS; } -apr_status_t h2_mplx_keep_active(h2_mplx *m, int stream_id) +apr_status_t h2_mplx_keep_active(h2_mplx *m, h2_stream *stream) { - apr_status_t status; - int acquired; - - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - check_data_for(m, stream_id); - leave_mutex(m, acquired); - } - return status; + check_data_for(m, stream, 1); + return APR_SUCCESS; } int h2_mplx_awaits_data(h2_mplx *m) { - apr_status_t status; - int acquired, waiting = 1; + int waiting = 1; - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - if (h2_ihash_empty(m->streams)) { - waiting = 0; - } - if (h2_iq_empty(m->readyq) && h2_iq_empty(m->q) && !m->tasks_active) { - waiting = 0; - } - leave_mutex(m, acquired); + H2_MPLX_ENTER_ALWAYS(m); + + if (h2_ihash_empty(m->streams)) { + waiting = 0; } + if ((h2_fifo_count(m->readyq) == 0) + && h2_iq_empty(m->q) && !m->tasks_active) { + waiting = 0; + } + + H2_MPLX_LEAVE(m); return waiting; } diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index 82a98fce0a..ed332c8bc3 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -68,7 +68,7 @@ struct h2_mplx { struct h2_ihash_t *spurge; /* all streams done, ready for destroy */ struct h2_iqueue *q; /* all stream ids that need to be started */ - struct h2_iqueue *readyq; /* all stream ids ready for output */ + struct h2_fifo *readyq; /* all streams ready for output */ struct h2_ihash_t *redo_tasks; /* all tasks that need to be redone */ @@ -158,7 +158,7 @@ apr_status_t h2_mplx_stream_cleanup(h2_mplx *m, struct h2_stream *stream); apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout, struct apr_thread_cond_t *iowait); -apr_status_t h2_mplx_keep_active(h2_mplx *m, int stream_id); +apr_status_t h2_mplx_keep_active(h2_mplx *m, struct h2_stream *stream); /******************************************************************************* * Stream processing. diff --git a/modules/http2/h2_ngn_shed.c b/modules/http2/h2_ngn_shed.c index e0c40cfb23..27474ba22d 100644 --- a/modules/http2/h2_ngn_shed.c +++ b/modules/http2/h2_ngn_shed.c @@ -151,6 +151,7 @@ static void ngn_add_task(h2_req_engine *ngn, h2_task *task, request_rec *r) entry->task = task; entry->r = r; H2_REQ_ENTRIES_INSERT_TAIL(&ngn->entries, entry); + ngn->no_assigned++; } @@ -176,6 +177,17 @@ apr_status_t h2_ngn_shed_push_request(h2_ngn_shed *shed, const char *ngn_type, task->assigned = NULL; } + if (task->engine) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c, + "h2_ngn_shed(%ld): push task(%s) hosting engine %s " + "already with %d tasks", + shed->c->id, task->id, task->engine->id, + task->engine->no_assigned); + task->assigned = task->engine; + ngn_add_task(task->engine, task, r); + return APR_SUCCESS; + } + ngn = apr_hash_get(shed->ngns, ngn_type, APR_HASH_KEY_STRING); if (ngn && !ngn->shutdown) { /* this task will be processed in another thread, @@ -187,7 +199,6 @@ apr_status_t h2_ngn_shed_push_request(h2_ngn_shed *shed, const char *ngn_type, h2_task_freeze(task); } ngn_add_task(ngn, task, r); - ngn->no_assigned++; return APR_SUCCESS; } @@ -211,11 +222,11 @@ apr_status_t h2_ngn_shed_push_request(h2_ngn_shed *shed, const char *ngn_type, status = einit(newngn, newngn->id, newngn->type, newngn->pool, shed->req_buffer_size, r, &newngn->out_consumed, &newngn->out_consumed_ctx); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, task->c, APLOGNO(03395) "h2_ngn_shed(%ld): create engine %s (%s)", shed->c->id, newngn->id, newngn->type); if (status == APR_SUCCESS) { - ap_assert(task->engine == NULL); newngn->task = task; task->engine = newngn; task->assigned = newngn; diff --git a/modules/http2/h2_proxy_session.c b/modules/http2/h2_proxy_session.c index 49476e965b..f2fed906b7 100644 --- a/modules/http2/h2_proxy_session.c +++ b/modules/http2/h2_proxy_session.c @@ -242,7 +242,6 @@ static int add_header(void *table, const char *n, const char *v) static void process_proxy_header(h2_proxy_stream *stream, const char *n, const char *v) { - request_rec *r = stream->r; static const struct { const char *name; ap_proxy_header_reverse_map_fn func; @@ -254,23 +253,26 @@ static void process_proxy_header(h2_proxy_stream *stream, const char *n, const c { "Set-Cookie", ap_proxy_cookie_reverse_map }, { NULL, NULL } }; + request_rec *r = stream->r; proxy_dir_conf *dconf; int i; - for (i = 0; transform_hdrs[i].name; ++i) { - if (!ap_cstr_casecmp(transform_hdrs[i].name, n)) { + dconf = ap_get_module_config(r->per_dir_config, &proxy_module); + if (!dconf->preserve_host) { + for (i = 0; transform_hdrs[i].name; ++i) { + if (!ap_cstr_casecmp(transform_hdrs[i].name, n)) { + apr_table_add(r->headers_out, n, + (*transform_hdrs[i].func)(r, dconf, v)); + return; + } + } + if (!ap_cstr_casecmp("Link", n)) { dconf = ap_get_module_config(r->per_dir_config, &proxy_module); apr_table_add(r->headers_out, n, - (*transform_hdrs[i].func)(r, dconf, v)); + h2_proxy_link_reverse_map(r, dconf, + stream->real_server_uri, stream->p_server_uri, v)); return; - } - } - if (!ap_cstr_casecmp("Link", n)) { - dconf = ap_get_module_config(r->per_dir_config, &proxy_module); - apr_table_add(r->headers_out, n, - h2_proxy_link_reverse_map(r, dconf, - stream->real_server_uri, stream->p_server_uri, v)); - return; + } } apr_table_add(r->headers_out, n, v); } diff --git a/modules/http2/h2_proxy_util.c b/modules/http2/h2_proxy_util.c index b92a876f42..206020fd87 100644 --- a/modules/http2/h2_proxy_util.c +++ b/modules/http2/h2_proxy_util.c @@ -16,6 +16,8 @@ #include #include #include +#include +#include #include #include @@ -1053,3 +1055,282 @@ const char *h2_proxy_link_reverse_map(request_rec *r, "link_reverse_map %s --> %s", s, ctx.s); return ctx.s; } + +/******************************************************************************* + * FIFO queue + ******************************************************************************/ + +struct h2_proxy_fifo { + void **elems; + int nelems; + int set; + int head; + int count; + int aborted; + apr_thread_mutex_t *lock; + apr_thread_cond_t *not_empty; + apr_thread_cond_t *not_full; +}; + +static int nth_index(h2_proxy_fifo *fifo, int n) +{ + return (fifo->head + n) % fifo->nelems; +} + +static apr_status_t fifo_destroy(void *data) +{ + h2_proxy_fifo *fifo = data; + + apr_thread_cond_destroy(fifo->not_empty); + apr_thread_cond_destroy(fifo->not_full); + apr_thread_mutex_destroy(fifo->lock); + + return APR_SUCCESS; +} + +static int index_of(h2_proxy_fifo *fifo, void *elem) +{ + int i; + + for (i = 0; i < fifo->count; ++i) { + if (elem == fifo->elems[nth_index(fifo, i)]) { + return i; + } + } + return -1; +} + +static apr_status_t create_int(h2_proxy_fifo **pfifo, apr_pool_t *pool, + int capacity, int as_set) +{ + apr_status_t rv; + h2_proxy_fifo *fifo; + + fifo = apr_pcalloc(pool, sizeof(*fifo)); + if (fifo == NULL) { + return APR_ENOMEM; + } + + rv = apr_thread_mutex_create(&fifo->lock, + APR_THREAD_MUTEX_UNNESTED, pool); + if (rv != APR_SUCCESS) { + return rv; + } + + rv = apr_thread_cond_create(&fifo->not_empty, pool); + if (rv != APR_SUCCESS) { + return rv; + } + + rv = apr_thread_cond_create(&fifo->not_full, pool); + if (rv != APR_SUCCESS) { + return rv; + } + + fifo->elems = apr_pcalloc(pool, capacity * sizeof(void*)); + if (fifo->elems == NULL) { + return APR_ENOMEM; + } + fifo->nelems = capacity; + fifo->set = as_set; + + *pfifo = fifo; + apr_pool_cleanup_register(pool, fifo, fifo_destroy, apr_pool_cleanup_null); + + return APR_SUCCESS; +} + +apr_status_t h2_proxy_fifo_create(h2_proxy_fifo **pfifo, apr_pool_t *pool, int capacity) +{ + return create_int(pfifo, pool, capacity, 0); +} + +apr_status_t h2_proxy_fifo_set_create(h2_proxy_fifo **pfifo, apr_pool_t *pool, int capacity) +{ + return create_int(pfifo, pool, capacity, 1); +} + +apr_status_t h2_proxy_fifo_term(h2_proxy_fifo *fifo) +{ + apr_status_t rv; + if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) { + fifo->aborted = 1; + apr_thread_mutex_unlock(fifo->lock); + } + return rv; +} + +apr_status_t h2_proxy_fifo_interrupt(h2_proxy_fifo *fifo) +{ + apr_status_t rv; + if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) { + apr_thread_cond_broadcast(fifo->not_empty); + apr_thread_cond_broadcast(fifo->not_full); + apr_thread_mutex_unlock(fifo->lock); + } + return rv; +} + +int h2_proxy_fifo_count(h2_proxy_fifo *fifo) +{ + return fifo->count; +} + +int h2_proxy_fifo_capacity(h2_proxy_fifo *fifo) +{ + return fifo->nelems; +} + +static apr_status_t check_not_empty(h2_proxy_fifo *fifo, int block) +{ + if (fifo->count == 0) { + if (!block) { + return APR_EAGAIN; + } + while (fifo->count == 0) { + if (fifo->aborted) { + return APR_EOF; + } + apr_thread_cond_wait(fifo->not_empty, fifo->lock); + } + } + return APR_SUCCESS; +} + +static apr_status_t fifo_push(h2_proxy_fifo *fifo, void *elem, int block) +{ + apr_status_t rv; + + if (fifo->aborted) { + return APR_EOF; + } + + if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) { + if (fifo->set && index_of(fifo, elem) >= 0) { + /* set mode, elem already member */ + apr_thread_mutex_unlock(fifo->lock); + return APR_EEXIST; + } + else if (fifo->count == fifo->nelems) { + if (block) { + while (fifo->count == fifo->nelems) { + if (fifo->aborted) { + apr_thread_mutex_unlock(fifo->lock); + return APR_EOF; + } + apr_thread_cond_wait(fifo->not_full, fifo->lock); + } + } + else { + apr_thread_mutex_unlock(fifo->lock); + return APR_EAGAIN; + } + } + + ap_assert(fifo->count < fifo->nelems); + fifo->elems[nth_index(fifo, fifo->count)] = elem; + ++fifo->count; + if (fifo->count == 1) { + apr_thread_cond_broadcast(fifo->not_empty); + } + apr_thread_mutex_unlock(fifo->lock); + } + return rv; +} + +apr_status_t h2_proxy_fifo_push(h2_proxy_fifo *fifo, void *elem) +{ + return fifo_push(fifo, elem, 1); +} + +apr_status_t h2_proxy_fifo_try_push(h2_proxy_fifo *fifo, void *elem) +{ + return fifo_push(fifo, elem, 0); +} + +static void *pull_head(h2_proxy_fifo *fifo) +{ + void *elem; + + ap_assert(fifo->count > 0); + elem = fifo->elems[fifo->head]; + --fifo->count; + if (fifo->count > 0) { + fifo->head = nth_index(fifo, 1); + if (fifo->count+1 == fifo->nelems) { + apr_thread_cond_broadcast(fifo->not_full); + } + } + return elem; +} + +static apr_status_t fifo_pull(h2_proxy_fifo *fifo, void **pelem, int block) +{ + apr_status_t rv; + + if (fifo->aborted) { + return APR_EOF; + } + + if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) { + if ((rv = check_not_empty(fifo, block)) != APR_SUCCESS) { + apr_thread_mutex_unlock(fifo->lock); + *pelem = NULL; + return rv; + } + + ap_assert(fifo->count > 0); + *pelem = pull_head(fifo); + + apr_thread_mutex_unlock(fifo->lock); + } + return rv; +} + +apr_status_t h2_proxy_fifo_pull(h2_proxy_fifo *fifo, void **pelem) +{ + return fifo_pull(fifo, pelem, 1); +} + +apr_status_t h2_proxy_fifo_try_pull(h2_proxy_fifo *fifo, void **pelem) +{ + return fifo_pull(fifo, pelem, 0); +} + +apr_status_t h2_proxy_fifo_remove(h2_proxy_fifo *fifo, void *elem) +{ + apr_status_t rv; + + if (fifo->aborted) { + return APR_EOF; + } + + if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) { + int i, rc; + void *e; + + rc = 0; + for (i = 0; i < fifo->count; ++i) { + e = fifo->elems[nth_index(fifo, i)]; + if (e == elem) { + ++rc; + } + else if (rc) { + fifo->elems[nth_index(fifo, i-rc)] = e; + } + } + if (rc) { + fifo->count -= rc; + if (fifo->count + rc == fifo->nelems) { + apr_thread_cond_broadcast(fifo->not_full); + } + rv = APR_SUCCESS; + } + else { + rv = APR_EAGAIN; + } + + apr_thread_mutex_unlock(fifo->lock); + } + return rv; +} diff --git a/modules/http2/h2_proxy_util.h b/modules/http2/h2_proxy_util.h index f90d14951b..ea44184256 100644 --- a/modules/http2/h2_proxy_util.h +++ b/modules/http2/h2_proxy_util.h @@ -201,4 +201,55 @@ const char *h2_proxy_link_reverse_map(request_rec *r, const char *proxy_server_uri, const char *s); +/******************************************************************************* + * FIFO queue + ******************************************************************************/ + +/** + * A thread-safe FIFO queue with some extra bells and whistles, if you + * do not need anything special, better use 'apr_queue'. + */ +typedef struct h2_proxy_fifo h2_proxy_fifo; + +/** + * Create a FIFO queue that can hold up to capacity elements. Elements can + * appear several times. + */ +apr_status_t h2_proxy_fifo_create(h2_proxy_fifo **pfifo, apr_pool_t *pool, int capacity); + +/** + * Create a FIFO set that can hold up to capacity elements. Elements only + * appear once. Pushing an element already present does not change the + * queue and is successful. + */ +apr_status_t h2_proxy_fifo_set_create(h2_proxy_fifo **pfifo, apr_pool_t *pool, int capacity); + +apr_status_t h2_proxy_fifo_term(h2_proxy_fifo *fifo); +apr_status_t h2_proxy_fifo_interrupt(h2_proxy_fifo *fifo); + +int h2_proxy_fifo_capacity(h2_proxy_fifo *fifo); +int h2_proxy_fifo_count(h2_proxy_fifo *fifo); + +/** + * Push en element into the queue. Blocks if there is no capacity left. + * + * @param fifo the FIFO queue + * @param elem the element to push + * @return APR_SUCCESS on push, APR_EAGAIN on try_push on a full queue, + * APR_EEXIST when in set mode and elem already there. + */ +apr_status_t h2_proxy_fifo_push(h2_proxy_fifo *fifo, void *elem); +apr_status_t h2_proxy_fifo_try_push(h2_proxy_fifo *fifo, void *elem); + +apr_status_t h2_proxy_fifo_pull(h2_proxy_fifo *fifo, void **pelem); +apr_status_t h2_proxy_fifo_try_pull(h2_proxy_fifo *fifo, void **pelem); + +/** + * Remove the elem from the queue, will remove multiple appearances. + * @param elem the element to remove + * @return APR_SUCCESS iff > 0 elems were removed, APR_EAGAIN otherwise. + */ +apr_status_t h2_proxy_fifo_remove(h2_proxy_fifo *fifo, void *elem); + + #endif /* defined(__mod_h2__h2_proxy_util__) */ diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c index f37741b61f..e23cb8d54b 100644 --- a/modules/http2/h2_session.c +++ b/modules/http2/h2_session.c @@ -72,7 +72,7 @@ static int h2_session_status_from_apr_status(apr_status_t rv) return NGHTTP2_ERR_PROTO; } -static h2_stream *get_stream(h2_session *session, int stream_id) +h2_stream *h2_session_stream_get(h2_session *session, int stream_id) { return nghttp2_session_get_stream_user_data(session->ngh2, stream_id); } @@ -231,7 +231,7 @@ static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags, h2_stream * stream; int rv = 0; - stream = get_stream(session, stream_id); + stream = h2_session_stream_get(session, stream_id); if (stream) { status = h2_stream_recv_DATA(stream, flags, data, len); } @@ -256,7 +256,7 @@ static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id, h2_stream *stream; (void)ngh2; - stream = get_stream(session, stream_id); + stream = h2_session_stream_get(session, stream_id); if (stream) { if (error_code) { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, @@ -278,7 +278,7 @@ static int on_begin_headers_cb(nghttp2_session *ngh2, /* We may see HEADERs at the start of a stream or after all DATA * streams to carry trailers. */ (void)ngh2; - s = get_stream(session, frame->hd.stream_id); + s = h2_session_stream_get(session, frame->hd.stream_id); if (s) { /* nop */ } @@ -299,7 +299,7 @@ static int on_header_cb(nghttp2_session *ngh2, const nghttp2_frame *frame, apr_status_t status; (void)flags; - stream = get_stream(session, frame->hd.stream_id); + stream = h2_session_stream_get(session, frame->hd.stream_id); if (!stream) { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(02920) "h2_stream(%ld-%d): on_header unknown stream", @@ -344,13 +344,13 @@ static int on_frame_recv_cb(nghttp2_session *ng2s, /* This can be HEADERS for a new stream, defining the request, * or HEADER may come after DATA at the end of a stream as in * trailers */ - stream = get_stream(session, frame->hd.stream_id); + stream = h2_session_stream_get(session, frame->hd.stream_id); if (stream) { h2_stream_recv_frame(stream, NGHTTP2_HEADERS, frame->hd.flags); } break; case NGHTTP2_DATA: - stream = get_stream(session, frame->hd.stream_id); + stream = h2_session_stream_get(session, frame->hd.stream_id); if (stream) { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, H2_STRM_LOG(APLOGNO(02923), stream, @@ -380,7 +380,7 @@ static int on_frame_recv_cb(nghttp2_session *ng2s, "h2_stream(%ld-%d): RST_STREAM by client, errror=%d", session->id, (int)frame->hd.stream_id, (int)frame->rst_stream.error_code); - stream = get_stream(session, frame->hd.stream_id); + stream = h2_session_stream_get(session, frame->hd.stream_id); if (stream && stream->initiated_on) { ++session->pushes_reset; } @@ -453,7 +453,7 @@ static int on_send_data_cb(nghttp2_session *ngh2, } padlen = (unsigned char)frame->data.padlen; - stream = get_stream(session, stream_id); + stream = h2_session_stream_get(session, stream_id); if (!stream) { ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_NOTFOUND, session->c, APLOGNO(02924) @@ -542,7 +542,7 @@ static int on_frame_send_cb(nghttp2_session *ngh2, (long)session->frames_sent); } - stream = get_stream(session, stream_id); + stream = h2_session_stream_get(session, stream_id); if (stream) { h2_stream_send_frame(stream, frame->hd.type, frame->hd.flags); } @@ -566,7 +566,7 @@ static int on_invalid_header_cb(nghttp2_session *ngh2, apr_pstrndup(session->pool, (const char *)name, namelen), apr_pstrndup(session->pool, (const char *)value, valuelen)); } - stream = get_stream(session, frame->hd.stream_id); + stream = h2_session_stream_get(session, frame->hd.stream_id); if (stream) { h2_stream_rst(stream, NGHTTP2_PROTOCOL_ERROR); } @@ -1028,7 +1028,7 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s, (void)ng2s; (void)buf; (void)source; - stream = get_stream(session, stream_id); + stream = h2_session_stream_get(session, stream_id); if (!stream) { ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c, APLOGNO(02937) @@ -1449,7 +1449,7 @@ static void h2_session_in_flush(h2_session *session) int id; while ((id = h2_iq_shift(session->in_process)) > 0) { - h2_stream *stream = get_stream(session, id); + h2_stream *stream = h2_session_stream_get(session, id); if (stream) { ap_assert(!stream->scheduled); if (h2_stream_prep_processing(stream) == APR_SUCCESS) { @@ -1462,7 +1462,7 @@ static void h2_session_in_flush(h2_session *session) } while ((id = h2_iq_shift(session->in_pending)) > 0) { - h2_stream *stream = get_stream(session, id); + h2_stream *stream = h2_session_stream_get(session, id); if (stream) { h2_stream_flush_input(stream); } diff --git a/modules/http2/h2_session.h b/modules/http2/h2_session.h index 5751aed7bd..7a3ca3ca38 100644 --- a/modules/http2/h2_session.h +++ b/modules/http2/h2_session.h @@ -194,6 +194,11 @@ void h2_session_close(h2_session *session); */ int h2_session_push_enabled(h2_session *session); +/** + * Look up the stream in this session with the given id. + */ +struct h2_stream *h2_session_stream_get(h2_session *session, int stream_id); + /** * Submit a push promise on the stream and schedule the new steam for * processing.. diff --git a/modules/http2/h2_stream.c b/modules/http2/h2_stream.c index 7bf35aa3b2..9784b4ec28 100644 --- a/modules/http2/h2_stream.c +++ b/modules/http2/h2_stream.c @@ -764,18 +764,77 @@ static apr_bucket *get_first_headers_bucket(apr_bucket_brigade *bb) return NULL; } +static apr_status_t add_data(h2_stream *stream, apr_off_t requested, + apr_off_t *plen, int *peos, int *complete, + h2_headers **pheaders) +{ + apr_bucket *b, *e; + + *peos = 0; + *plen = 0; + *complete = 0; + if (pheaders) { + *pheaders = NULL; + } + + H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "add_data"); + b = APR_BRIGADE_FIRST(stream->out_buffer); + while (b != APR_BRIGADE_SENTINEL(stream->out_buffer)) { + e = APR_BUCKET_NEXT(b); + if (APR_BUCKET_IS_METADATA(b)) { + if (APR_BUCKET_IS_FLUSH(b)) { + APR_BUCKET_REMOVE(b); + apr_bucket_destroy(b); + } + else if (APR_BUCKET_IS_EOS(b)) { + *peos = 1; + return APR_SUCCESS; + } + else if (H2_BUCKET_IS_HEADERS(b)) { + if (*plen > 0) { + /* data before the response, can only return up to here */ + return APR_SUCCESS; + } + else if (pheaders) { + *pheaders = h2_bucket_headers_get(b); + APR_BUCKET_REMOVE(b); + apr_bucket_destroy(b); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, + H2_STRM_MSG(stream, "prep, -> response %d"), + (*pheaders)->status); + return APR_SUCCESS; + } + else { + return APR_EAGAIN; + } + } + } + else if (b->length == 0) { + APR_BUCKET_REMOVE(b); + apr_bucket_destroy(b); + } + else { + ap_assert(b->length != (apr_size_t)-1); + *plen += b->length; + if (*plen >= requested) { + *plen = requested; + return APR_SUCCESS; + } + } + b = e; + } + *complete = 1; + return APR_SUCCESS; +} + apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen, - int *peos, h2_headers **presponse) + int *peos, h2_headers **pheaders) { apr_status_t status = APR_SUCCESS; - apr_off_t requested, max_chunk = H2_DATA_CHUNK_SIZE; - apr_bucket *b, *e; + apr_off_t requested, missing, max_chunk = H2_DATA_CHUNK_SIZE; conn_rec *c; + int complete; - if (presponse) { - *presponse = NULL; - } - ap_assert(stream); if (stream->rst_error) { @@ -793,15 +852,34 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen, if (stream->session->io.write_size > 0) { max_chunk = stream->session->io.write_size - 9; /* header bits */ } - *plen = requested = (*plen > 0)? H2MIN(*plen, max_chunk) : max_chunk; + requested = (*plen > 0)? H2MIN(*plen, max_chunk) : max_chunk; + + /* count the buffered data until eos or a headers bucket */ + status = add_data(stream, requested, plen, peos, &complete, pheaders); + + if (status == APR_EAGAIN) { + /* TODO: ugly, someone needs to retrieve the response first */ + h2_mplx_keep_active(stream->session->mplx, stream); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, + H2_STRM_MSG(stream, "prep, response eagain")); + return status; + } + else if (status != APR_SUCCESS) { + return status; + } - h2_util_bb_avail(stream->out_buffer, plen, peos); - if (!*peos && *plen < requested && *plen < stream->max_mem) { - H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "pre"); + if (pheaders && *pheaders) { + return APR_SUCCESS; + } + + missing = H2MIN(requested, stream->max_mem) - *plen; + if (complete && !*peos && missing > 0) { if (stream->output) { + H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "pre"); status = h2_beam_receive(stream->output, stream->out_buffer, APR_NONBLOCK_READ, stream->max_mem - *plen); + H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "post"); } else { status = APR_EOF; @@ -810,79 +888,24 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen, if (APR_STATUS_IS_EOF(status)) { apr_bucket *eos = apr_bucket_eos_create(c->bucket_alloc); APR_BRIGADE_INSERT_TAIL(stream->out_buffer, eos); + *peos = 1; status = APR_SUCCESS; } - else if (status == APR_EAGAIN) { - status = APR_SUCCESS; - } - *plen = requested; - h2_util_bb_avail(stream->out_buffer, plen, peos); - H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "post"); - } - else { - H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "ok"); - } - - b = APR_BRIGADE_FIRST(stream->out_buffer); - while (b != APR_BRIGADE_SENTINEL(stream->out_buffer)) { - e = APR_BUCKET_NEXT(b); - if (APR_BUCKET_IS_FLUSH(b) - || (!APR_BUCKET_IS_METADATA(b) && b->length == 0)) { - APR_BUCKET_REMOVE(b); - apr_bucket_destroy(b); - } - else { - break; + else if (status == APR_SUCCESS) { + /* do it again, now that we have gotten more */ + status = add_data(stream, requested, plen, peos, &complete, pheaders); } - b = e; } - - b = get_first_headers_bucket(stream->out_buffer); - if (b) { - /* there are HEADERS to submit */ - *peos = 0; - *plen = 0; - if (b == APR_BRIGADE_FIRST(stream->out_buffer)) { - if (presponse) { - *presponse = h2_bucket_headers_get(b); - APR_BUCKET_REMOVE(b); - apr_bucket_destroy(b); - status = APR_SUCCESS; - } - else { - /* someone needs to retrieve the response first */ - h2_mplx_keep_active(stream->session->mplx, stream->id); - status = APR_EAGAIN; - } - } - else { - apr_bucket *e = APR_BRIGADE_FIRST(stream->out_buffer); - while (e != APR_BRIGADE_SENTINEL(stream->out_buffer)) { - if (e == b) { - break; - } - else if (e->length != (apr_size_t)-1) { - *plen += e->length; - } - e = APR_BUCKET_NEXT(e); - } - } - } - + if (status == APR_SUCCESS) { - if (presponse && *presponse) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c, - H2_STRM_MSG(stream, "prepare, response %d"), - (*presponse)->status); - } - else if (*peos || *plen) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c, + if (*peos || *plen) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, H2_STRM_MSG(stream, "prepare, len=%ld eos=%d"), (long)*plen, *peos); } else { status = APR_EAGAIN; - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c, + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, H2_STRM_MSG(stream, "prepare, no data")); } } diff --git a/modules/http2/h2_task.c b/modules/http2/h2_task.c index 5ab485faab..1ef0d9a887 100644 --- a/modules/http2/h2_task.c +++ b/modules/http2/h2_task.c @@ -383,7 +383,7 @@ static apr_status_t h2_filter_parse_h1(ap_filter_t* f, apr_bucket_brigade* bb) /* There are cases where we need to parse a serialized http/1.1 * response. One example is a 100-continue answer in serialized mode * or via a mod_proxy setup */ - while (!task->output.sent_response) { + while (bb && !task->output.sent_response) { status = h2_from_h1_parse_response(task, f, bb); ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, f->c, "h2_task(%s): parsed response", task->id); diff --git a/modules/http2/h2_util.c b/modules/http2/h2_util.c index 0389193e88..0ac65ccf65 100644 --- a/modules/http2/h2_util.c +++ b/modules/http2/h2_util.c @@ -438,12 +438,12 @@ int h2_iq_count(h2_iqueue *q) } -void h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx) +int h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx) { int i; if (h2_iq_contains(q, sid)) { - return; + return 0; } if (q->nelts >= q->nalloc) { iq_grow(q, q->nalloc * 2); @@ -456,11 +456,12 @@ void h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx) /* bubble it to the front of the queue */ iq_bubble_up(q, i, q->head, cmp, ctx); } + return 1; } -void h2_iq_append(h2_iqueue *q, int sid) +int h2_iq_append(h2_iqueue *q, int sid) { - h2_iq_add(q, sid, NULL, NULL); + return h2_iq_add(q, sid, NULL, NULL); } int h2_iq_remove(h2_iqueue *q, int sid) @@ -612,6 +613,7 @@ int h2_iq_contains(h2_iqueue *q, int sid) struct h2_fifo { void **elems; int nelems; + int set; int head; int count; int aborted; @@ -636,7 +638,20 @@ static apr_status_t fifo_destroy(void *data) return APR_SUCCESS; } -apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity) +static int index_of(h2_fifo *fifo, void *elem) +{ + int i; + + for (i = 0; i < fifo->count; ++i) { + if (elem == fifo->elems[nth_index(fifo, i)]) { + return i; + } + } + return -1; +} + +static apr_status_t create_int(h2_fifo **pfifo, apr_pool_t *pool, + int capacity, int as_set) { apr_status_t rv; h2_fifo *fifo; @@ -667,6 +682,7 @@ apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity) return APR_ENOMEM; } fifo->nelems = capacity; + fifo->set = as_set; *pfifo = fifo; apr_pool_cleanup_register(pool, fifo, fifo_destroy, apr_pool_cleanup_null); @@ -674,6 +690,16 @@ apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity) return APR_SUCCESS; } +apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity) +{ + return create_int(pfifo, pool, capacity, 0); +} + +apr_status_t h2_fifo_set_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity) +{ + return create_int(pfifo, pool, capacity, 1); +} + apr_status_t h2_fifo_term(h2_fifo *fifo) { apr_status_t rv; @@ -725,7 +751,12 @@ static apr_status_t fifo_push(h2_fifo *fifo, void *elem, int block) } if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) { - if (fifo->count == fifo->nelems) { + if (fifo->set && index_of(fifo, elem) >= 0) { + /* set mode, elem already member */ + apr_thread_mutex_unlock(fifo->lock); + return APR_EEXIST; + } + else if (fifo->count == fifo->nelems) { if (block) { while (fifo->count == fifo->nelems) { if (fifo->aborted) { @@ -762,6 +793,22 @@ apr_status_t h2_fifo_try_push(h2_fifo *fifo, void *elem) return fifo_push(fifo, elem, 0); } +static void *pull_head(h2_fifo *fifo) +{ + void *elem; + + ap_assert(fifo->count > 0); + elem = fifo->elems[fifo->head]; + --fifo->count; + if (fifo->count > 0) { + fifo->head = nth_index(fifo, 1); + if (fifo->count+1 == fifo->nelems) { + apr_thread_cond_broadcast(fifo->not_full); + } + } + return elem; +} + static apr_status_t fifo_pull(h2_fifo *fifo, void **pelem, int block) { apr_status_t rv; @@ -778,14 +825,8 @@ static apr_status_t fifo_pull(h2_fifo *fifo, void **pelem, int block) } ap_assert(fifo->count > 0); - *pelem = fifo->elems[fifo->head]; - --fifo->count; - if (fifo->count > 0) { - fifo->head = nth_index(fifo, 1); - if (fifo->count+1 == fifo->nelems) { - apr_thread_cond_broadcast(fifo->not_full); - } - } + *pelem = pull_head(fifo); + apr_thread_mutex_unlock(fifo->lock); } return rv; @@ -817,29 +858,18 @@ static apr_status_t fifo_peek(h2_fifo *fifo, h2_fifo_peek_fn *fn, void *ctx, int } ap_assert(fifo->count > 0); - elem = fifo->elems[fifo->head]; + elem = pull_head(fifo); + apr_thread_mutex_unlock(fifo->lock); + switch (fn(elem, ctx)) { case H2_FIFO_OP_PULL: - --fifo->count; - if (fifo->count > 0) { - fifo->head = nth_index(fifo, 1); - if (fifo->count+1 == fifo->nelems) { - apr_thread_cond_broadcast(fifo->not_full); - } - } break; case H2_FIFO_OP_REPUSH: - if (fifo->count > 1) { - fifo->head = nth_index(fifo, 1); - if (fifo->count < fifo->nelems) { - fifo->elems[nth_index(fifo, fifo->count-1)] = elem; - } - } + return h2_fifo_push(fifo, elem); break; } - apr_thread_mutex_unlock(fifo->lock); } return rv; } diff --git a/modules/http2/h2_util.h b/modules/http2/h2_util.h index f6a4b9a43d..9b408fad3d 100644 --- a/modules/http2/h2_util.h +++ b/modules/http2/h2_util.h @@ -119,17 +119,19 @@ int h2_iq_count(h2_iqueue *q); * @param q the queue to append the id to * @param sid the stream id to add * @param cmp the comparator for sorting - * @param ctx user data for comparator + * @param ctx user data for comparator + * @return != 0 iff id was not already there */ -void h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx); +int h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx); /** * Append the id to the queue if not already present. * * @param q the queue to append the id to * @param sid the id to append + * @return != 0 iff id was not already there */ -void h2_iq_append(h2_iqueue *q, int sid); +int h2_iq_append(h2_iqueue *q, int sid); /** * Remove the stream id from the queue. Return != 0 iff task @@ -193,12 +195,32 @@ int h2_iq_contains(h2_iqueue *q, int sid); */ typedef struct h2_fifo h2_fifo; +/** + * Create a FIFO queue that can hold up to capacity elements. Elements can + * appear several times. + */ apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity); + +/** + * Create a FIFO set that can hold up to capacity elements. Elements only + * appear once. Pushing an element already present does not change the + * queue and is successful. + */ +apr_status_t h2_fifo_set_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity); + apr_status_t h2_fifo_term(h2_fifo *fifo); apr_status_t h2_fifo_interrupt(h2_fifo *fifo); int h2_fifo_count(h2_fifo *fifo); +/** + * Push en element into the queue. Blocks if there is no capacity left. + * + * @param fifo the FIFO queue + * @param elem the element to push + * @return APR_SUCCESS on push, APR_EAGAIN on try_push on a full queue, + * APR_EEXIST when in set mode and elem already there. + */ apr_status_t h2_fifo_push(h2_fifo *fifo, void *elem); apr_status_t h2_fifo_try_push(h2_fifo *fifo, void *elem); diff --git a/modules/http2/h2_version.h b/modules/http2/h2_version.h index e6765e5a03..528d21aed7 100644 --- a/modules/http2/h2_version.h +++ b/modules/http2/h2_version.h @@ -26,7 +26,7 @@ * @macro * Version number of the http2 module as c string */ -#define MOD_HTTP2_VERSION "1.10.0" +#define MOD_HTTP2_VERSION "1.10.1" /** * @macro @@ -34,7 +34,7 @@ * release. This is a 24 bit number with 8 bits for major number, 8 bits * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203. */ -#define MOD_HTTP2_VERSION_NUM 0x010a00 +#define MOD_HTTP2_VERSION_NUM 0x010a01 #endif /* mod_h2_h2_version_h */ diff --git a/modules/http2/h2_workers.c b/modules/http2/h2_workers.c index fa395255e9..9c7afc64e6 100644 --- a/modules/http2/h2_workers.c +++ b/modules/http2/h2_workers.c @@ -39,6 +39,7 @@ struct h2_slot { int sticks; h2_task *task; apr_thread_t *thread; + apr_thread_mutex_t *lock; apr_thread_cond_t *not_idle; }; @@ -78,6 +79,17 @@ static apr_status_t activate_slot(h2_workers *workers, h2_slot *slot) slot->workers = workers; slot->aborted = 0; slot->task = NULL; + + if (!slot->lock) { + status = apr_thread_mutex_create(&slot->lock, + APR_THREAD_MUTEX_DEFAULT, + workers->pool); + if (status != APR_SUCCESS) { + push_slot(&workers->free, slot); + return status; + } + } + if (!slot->not_idle) { status = apr_thread_cond_create(&slot->not_idle, workers->pool); if (status != APR_SUCCESS) { @@ -95,7 +107,7 @@ static apr_status_t activate_slot(h2_workers *workers, h2_slot *slot) return APR_ENOMEM; } - ++workers->worker_count; + apr_atomic_inc32(&workers->worker_count); return APR_SUCCESS; } @@ -112,9 +124,9 @@ static void wake_idle_worker(h2_workers *workers) { h2_slot *slot = pop_slot(&workers->idle); if (slot) { - apr_thread_mutex_lock(workers->lock); + apr_thread_mutex_lock(slot->lock); apr_thread_cond_signal(slot->not_idle); - apr_thread_mutex_unlock(workers->lock); + apr_thread_mutex_unlock(slot->lock); } else if (workers->dynamic) { add_worker(workers); @@ -130,7 +142,7 @@ static void cleanup_zombies(h2_workers *workers) apr_thread_join(&status, slot->thread); slot->thread = NULL; } - --workers->worker_count; + apr_atomic_dec32(&workers->worker_count); push_slot(&workers->free, slot); } } @@ -185,15 +197,12 @@ static apr_status_t get_next(h2_slot *slot) return APR_SUCCESS; } - apr_thread_mutex_lock(workers->lock); cleanup_zombies(workers); - ++workers->idle_workers; + apr_thread_mutex_lock(slot->lock); push_slot(&workers->idle, slot); - apr_thread_cond_wait(slot->not_idle, workers->lock); - --workers->idle_workers; - - apr_thread_mutex_unlock(workers->lock); + apr_thread_cond_wait(slot->not_idle, slot->lock); + apr_thread_mutex_unlock(slot->lock); } return APR_EOF; } @@ -239,24 +248,25 @@ static apr_status_t workers_pool_cleanup(void *data) h2_slot *slot; if (!workers->aborted) { - apr_thread_mutex_lock(workers->lock); workers->aborted = 1; - /* before we go, cleanup any zombies and abort the rest */ - cleanup_zombies(workers); + /* abort all idle slots */ for (;;) { slot = pop_slot(&workers->idle); if (slot) { + apr_thread_mutex_lock(slot->lock); slot->aborted = 1; apr_thread_cond_signal(slot->not_idle); + apr_thread_mutex_unlock(slot->lock); } else { break; } } - apr_thread_mutex_unlock(workers->lock); h2_fifo_term(workers->mplxs); h2_fifo_interrupt(workers->mplxs); + + cleanup_zombies(workers); } return APR_SUCCESS; } diff --git a/modules/http2/h2_workers.h b/modules/http2/h2_workers.h index 30a7514cd0..7964b3c3aa 100644 --- a/modules/http2/h2_workers.h +++ b/modules/http2/h2_workers.h @@ -40,8 +40,6 @@ struct h2_workers { int next_worker_id; int min_workers; int max_workers; - int worker_count; - int idle_workers; int max_idle_secs; int aborted; @@ -51,6 +49,8 @@ struct h2_workers { int nslots; struct h2_slot *slots; + volatile apr_uint32_t worker_count; + struct h2_slot *free; struct h2_slot *idle; struct h2_slot *zombies; diff --git a/modules/http2/mod_proxy_http2.c b/modules/http2/mod_proxy_http2.c index ef23d0c428..5b2a798996 100644 --- a/modules/http2/mod_proxy_http2.c +++ b/modules/http2/mod_proxy_http2.c @@ -26,6 +26,8 @@ #include "h2_version.h" #include "h2_proxy_session.h" +#define H2MIN(x,y) ((x) < (y) ? (x) : (y)) + static void register_hook(apr_pool_t *p); AP_DECLARE_MODULE(proxy_http2) = { @@ -65,7 +67,7 @@ typedef struct h2_proxy_ctx { const char *engine_type; apr_pool_t *engine_pool; apr_size_t req_buffer_size; - request_rec *next; + h2_proxy_fifo *requests; int capacity; unsigned standalone : 1; @@ -218,36 +220,23 @@ static apr_status_t proxy_engine_init(h2_req_engine *engine, { h2_proxy_ctx *ctx = ap_get_module_config(r->connection->conn_config, &proxy_http2_module); - if (ctx) { - conn_rec *c = ctx->owner; - h2_proxy_ctx *nctx; - - /* we need another lifetime for this. If we do not host - * an engine, the context lives in r->pool. Since we expect - * to server more than r, we need to live longer */ - nctx = apr_pcalloc(pool, sizeof(*nctx)); - if (nctx == NULL) { - return APR_ENOMEM; - } - memcpy(nctx, ctx, sizeof(*nctx)); - ctx = nctx; - ctx->pool = pool; - ctx->engine = engine; - ctx->engine_id = id; - ctx->engine_type = type; - ctx->engine_pool = pool; - ctx->req_buffer_size = req_buffer_size; - ctx->capacity = 100; - - ap_set_module_config(c->conn_config, &proxy_http2_module, ctx); - - *pconsumed = out_consumed; - *pctx = ctx; - return APR_SUCCESS; + if (!ctx) { + ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, r, APLOGNO(03368) + "h2_proxy_session, engine init, no ctx found"); + return APR_ENOTIMPL; } - ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, r, APLOGNO(03368) - "h2_proxy_session, engine init, no ctx found"); - return APR_ENOTIMPL; + + ctx->pool = pool; + ctx->engine = engine; + ctx->engine_id = id; + ctx->engine_type = type; + ctx->engine_pool = pool; + ctx->req_buffer_size = req_buffer_size; + ctx->capacity = H2MIN(100, h2_proxy_fifo_capacity(ctx->requests)); + + *pconsumed = out_consumed; + *pctx = ctx; + return APR_SUCCESS; } static apr_status_t add_request(h2_proxy_session *session, request_rec *r) @@ -270,10 +259,9 @@ static apr_status_t add_request(h2_proxy_session *session, request_rec *r) return status; } -static void request_done(h2_proxy_session *session, request_rec *r, +static void request_done(h2_proxy_ctx *ctx, request_rec *r, apr_status_t status, int touched) { - h2_proxy_ctx *ctx = session->user_data; const char *task_id = apr_table_get(r->connection->notes, H2_TASK_ID_NOTE); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, r->connection, @@ -282,35 +270,26 @@ static void request_done(h2_proxy_session *session, request_rec *r, if (status != APR_SUCCESS) { if (!touched) { /* untouched request, need rescheduling */ - if (req_engine_push && is_h2 && is_h2(ctx->owner)) { - if (req_engine_push(ctx->engine_type, r, NULL) == APR_SUCCESS) { - /* push to engine */ - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r->connection, - APLOGNO(03369) - "h2_proxy_session(%s): rescheduled request %s", - ctx->engine_id, task_id); - return; - } - } - else if (!ctx->next) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, r->connection, - "h2_proxy_session(%s): retry untouched request", - ctx->engine_id); - ctx->next = r; - } + status = h2_proxy_fifo_push(ctx->requests, r); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, r->connection, + APLOGNO(03369) + "h2_proxy_session(%s): rescheduled request %s", + ctx->engine_id, task_id); + return; } else { const char *uri; uri = apr_uri_unparse(r->pool, &r->parsed_uri, 0); ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, r->connection, APLOGNO(03471) "h2_proxy_session(%s): request %s -> %s " - "not complete, was touched", + "not complete, cannot repeat", ctx->engine_id, task_id, uri); } } if (r == ctx->rbase) { - ctx->r_status = (status == APR_SUCCESS)? APR_SUCCESS : HTTP_SERVICE_UNAVAILABLE; + ctx->r_status = ((status == APR_SUCCESS)? APR_SUCCESS + : HTTP_SERVICE_UNAVAILABLE); } if (req_engine_done && ctx->engine) { @@ -322,21 +301,32 @@ static void request_done(h2_proxy_session *session, request_rec *r, } } +static void session_req_done(h2_proxy_session *session, request_rec *r, + apr_status_t status, int touched) +{ + request_done(session->user_data, r, status, touched); +} + static apr_status_t next_request(h2_proxy_ctx *ctx, int before_leave) { - if (ctx->next) { + if (h2_proxy_fifo_count(ctx->requests) > 0) { return APR_SUCCESS; } else if (req_engine_pull && ctx->engine) { apr_status_t status; + request_rec *r = NULL; + status = req_engine_pull(ctx->engine, before_leave? APR_BLOCK_READ: APR_NONBLOCK_READ, - ctx->capacity, &ctx->next); - ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, ctx->owner, - "h2_proxy_engine(%s): pulled request (%s) %s", - ctx->engine_id, - before_leave? "before leave" : "regular", - (ctx->next? ctx->next->the_request : "NULL")); + ctx->capacity, &r); + if (status == APR_SUCCESS && r) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, ctx->owner, + "h2_proxy_engine(%s): pulled request (%s) %s", + ctx->engine_id, + before_leave? "before leave" : "regular", + r->the_request); + h2_proxy_fifo_push(ctx->requests, r); + } return APR_STATUS_IS_EAGAIN(status)? APR_SUCCESS : status; } return APR_EOF; @@ -345,6 +335,7 @@ static apr_status_t next_request(h2_proxy_ctx *ctx, int before_leave) static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) { apr_status_t status = OK; int h2_front; + request_rec *r; /* Step Four: Send the Request in a new HTTP/2 stream and * loop until we got the response or encounter errors. @@ -355,7 +346,7 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) { ctx->session = h2_proxy_session_setup(ctx->engine_id, ctx->p_conn, ctx->conf, h2_front, 30, h2_proxy_log2((int)ctx->req_buffer_size), - request_done); + session_req_done); if (!ctx->session) { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner, APLOGNO(03372) "session unavailable"); @@ -366,10 +357,9 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) { "eng(%s): run session %s", ctx->engine_id, ctx->session->id); ctx->session->user_data = ctx; - while (1) { - if (ctx->next) { - add_request(ctx->session, ctx->next); - ctx->next = NULL; + while (!ctx->owner->aborted) { + if (APR_SUCCESS == h2_proxy_fifo_try_pull(ctx->requests, (void**)&r)) { + add_request(ctx->session, r); } status = h2_proxy_session_process(ctx->session); @@ -379,7 +369,8 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) { /* ongoing processing, call again */ if (ctx->session->remote_max_concurrent > 0 && ctx->session->remote_max_concurrent != ctx->capacity) { - ctx->capacity = (int)ctx->session->remote_max_concurrent; + ctx->capacity = H2MIN((int)ctx->session->remote_max_concurrent, + h2_proxy_fifo_capacity(ctx->requests)); } s2 = next_request(ctx, 0); if (s2 == APR_ECONNABORTED) { @@ -395,7 +386,8 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) { status = ctx->r_status = APR_SUCCESS; break; } - if (!ctx->next && h2_proxy_ihash_empty(ctx->session->streams)) { + if ((h2_proxy_fifo_count(ctx->requests) == 0) + && h2_proxy_ihash_empty(ctx->session->streams)) { break; } } @@ -409,7 +401,7 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) { * a) be reopened on the new session iff safe to do so * b) reported as done (failed) otherwise */ - h2_proxy_session_cleanup(ctx->session, request_done); + h2_proxy_session_cleanup(ctx->session, session_req_done); break; } } @@ -420,7 +412,7 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) { return status; } -static h2_proxy_ctx *push_request_somewhere(h2_proxy_ctx *ctx) +static apr_status_t push_request_somewhere(h2_proxy_ctx *ctx, request_rec *r) { conn_rec *c = ctx->owner; const char *engine_type, *hostname; @@ -430,21 +422,15 @@ static h2_proxy_ctx *push_request_somewhere(h2_proxy_ctx *ctx) engine_type = apr_psprintf(ctx->pool, "proxy_http2 %s%s", hostname, ctx->server_portstr); - if (c->master && req_engine_push && ctx->next && is_h2 && is_h2(c)) { + if (c->master && req_engine_push && r && is_h2 && is_h2(c)) { /* If we are have req_engine capabilities, push the handling of this * request (e.g. slave connection) to a proxy_http2 engine which * uses the same backend. We may be called to create an engine * ourself. */ - if (req_engine_push(engine_type, ctx->next, proxy_engine_init) - == APR_SUCCESS) { - /* to renew the lifetime, we might have set a new ctx */ - ctx = ap_get_module_config(c->conn_config, &proxy_http2_module); + if (req_engine_push(engine_type, r, proxy_engine_init) == APR_SUCCESS) { if (ctx->engine == NULL) { - /* Another engine instance has taken over processing of this - * request. */ - ctx->r_status = SUSPENDED; - ctx->next = NULL; - return ctx; + /* request has been assigned to an engine in another thread */ + return SUSPENDED; } } } @@ -465,7 +451,8 @@ static h2_proxy_ctx *push_request_somewhere(h2_proxy_ctx *ctx) ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "H2: hosting engine %s", ctx->engine_id); } - return ctx; + + return h2_proxy_fifo_push(ctx->requests, r); } static int proxy_http2_handler(request_rec *r, @@ -482,7 +469,7 @@ static int proxy_http2_handler(request_rec *r, apr_status_t status; h2_proxy_ctx *ctx; apr_uri_t uri; - int reconnected = 0; + int reconnects = 0; /* find the scheme */ if ((url[0] != 'h' && url[0] != 'H') || url[1] != '2') { @@ -507,6 +494,7 @@ static int proxy_http2_handler(request_rec *r, default: return DECLINED; } + ctx = apr_pcalloc(r->pool, sizeof(*ctx)); ctx->owner = r->connection; ctx->pool = r->pool; @@ -518,8 +506,9 @@ static int proxy_http2_handler(request_rec *r, ctx->conf = conf; ctx->flushall = apr_table_get(r->subprocess_env, "proxy-flushall")? 1 : 0; ctx->r_status = HTTP_SERVICE_UNAVAILABLE; - ctx->next = r; - r = NULL; + + h2_proxy_fifo_set_create(&ctx->requests, ctx->pool, 100); + ap_set_module_config(ctx->owner->conn_config, &proxy_http2_module, ctx); /* scheme says, this is for us. */ @@ -565,10 +554,11 @@ run_connect: /* If we are not already hosting an engine, try to push the request * to an already existing engine or host a new engine here. */ - if (!ctx->engine) { - ctx = push_request_somewhere(ctx); + if (r && !ctx->engine) { + ctx->r_status = push_request_somewhere(ctx, r); + r = NULL; if (ctx->r_status == SUSPENDED) { - /* request was pushed to another engine */ + /* request was pushed to another thread, leave processing here */ goto cleanup; } } @@ -581,7 +571,7 @@ run_connect: ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner, APLOGNO(03352) "H2: failed to make connection to backend: %s", ctx->p_conn->hostname); - goto cleanup; + goto reconnect; } /* Step Three: Create conn_rec for the socket we have open now. */ @@ -593,7 +583,7 @@ run_connect: "setup new connection: is_ssl=%d %s %s %s", ctx->p_conn->is_ssl, ctx->p_conn->ssl_hostname, locurl, ctx->p_conn->hostname); - goto cleanup; + goto reconnect; } if (!ctx->p_conn->data) { @@ -628,8 +618,8 @@ run_session: ctx->engine = NULL; } -cleanup: - if (!reconnected && next_request(ctx, 1) == APR_SUCCESS) { +reconnect: + if (next_request(ctx, 1) == APR_SUCCESS) { /* Still more to do, tear down old conn and start over */ if (ctx->p_conn) { ctx->p_conn->close = 1; @@ -638,10 +628,16 @@ cleanup: ap_proxy_release_connection(ctx->proxy_func, ctx->p_conn, ctx->server); ctx->p_conn = NULL; } - reconnected = 1; /* we do this only once, then fail */ - goto run_connect; + ++reconnects; + if (reconnects < 5 && !ctx->owner->aborted) { + goto run_connect; + } + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner, APLOGNO(10023) + "giving up after %d reconnects, %d requests todo", + reconnects, h2_proxy_fifo_count(ctx->requests)); } +cleanup: if (ctx->p_conn) { if (status != APR_SUCCESS) { /* close socket when errors happened or session shut down (EOF) */ @@ -653,6 +649,11 @@ cleanup: ctx->p_conn = NULL; } + /* Any requests will still have need to fail */ + while (APR_SUCCESS == h2_proxy_fifo_try_pull(ctx->requests, (void**)&r)) { + request_done(ctx, r, HTTP_SERVICE_UNAVAILABLE, 1); + } + ap_set_module_config(ctx->owner->conn_config, &proxy_http2_module, NULL); ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, ctx->owner, APLOGNO(03377) "leaving handler"); -- 2.40.0