From e4611af52b8aed2dc5af6b7dccb9e6e7551bc077 Mon Sep 17 00:00:00 2001 From: Stefan Eissing Date: Wed, 5 Apr 2017 14:49:25 +0000 Subject: [PATCH] On the trunk: mod_http2: less and more granular mutex use for improved performance. git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1790284 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES | 6 ++- modules/http2/h2_mplx.c | 102 +++++++++++++++++-------------------- modules/http2/h2_mplx.h | 4 +- modules/http2/h2_stream.c | 2 +- modules/http2/h2_util.c | 43 +++++++++++++--- modules/http2/h2_util.h | 28 ++++++++-- modules/http2/h2_workers.c | 8 +-- modules/http2/h2_workers.h | 4 +- 8 files changed, 121 insertions(+), 76 deletions(-) diff --git a/CHANGES b/CHANGES index 716eaa5cfc..5f05476f64 100644 --- a/CHANGES +++ b/CHANGES @@ -1,8 +1,10 @@ -*- coding: utf-8 -*- Changes with Apache 2.5.0 - *) mod_http2/mod_proxy_http2: less read attempts on bucket beams that already - delivered EOS/headers. Fixed bug in re-attempting proxy request after + *) mod_http2: less and more granular mutex use for improved performance. + [Stefan Eissing] + + *) mod_proxy_http2: Fixed bug in re-attempting proxy requests after connection error. [Stefan Eissing] *) core: Disallow multiple Listen on the same IP:port when listener buckets diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index d044c19eab..40af1c6000 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -65,14 +65,19 @@ apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s) return rv;\ } } while(0) -#define H2_MPLX_ENTER_ALWAYS(m) \ - apr_thread_mutex_lock(m->lock) - #define H2_MPLX_LEAVE(m) \ apr_thread_mutex_unlock(m->lock) +#define H2_MPLX_ENTER_ALWAYS(m) \ + apr_thread_mutex_lock(m->lock) + +#define H2_MPLX_ENTER_MAYBE(m, lock) \ + if (lock) apr_thread_mutex_lock(m->lock) -static void check_data_for(h2_mplx *m, int stream_id); +#define H2_MPLX_LEAVE_MAYBE(m, lock) \ + if (lock) apr_thread_mutex_unlock(m->lock) + +static void check_data_for(h2_mplx *m, h2_stream *stream, int lock); static void stream_output_consumed(void *ctx, h2_bucket_beam *beam, apr_off_t length) @@ -121,6 +126,7 @@ static void stream_cleanup(h2_mplx *m, h2_stream *stream) h2_stream_cleanup(stream); h2_iq_remove(m->q, stream->id); + h2_fifo_remove(m->readyq, stream); h2_ihash_remove(m->streams, stream->id); h2_ihash_add(m->shold, stream); @@ -206,7 +212,12 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id)); m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id)); m->q = h2_iq_create(m->pool, m->max_streams); - m->readyq = h2_iq_create(m->pool, m->max_streams); + + status = h2_fifo_set_create(&m->readyq, m->pool, m->max_streams); + if (status != APR_SUCCESS) { + apr_pool_destroy(m->pool); + return NULL; + } m->workers = workers; m->max_active = workers->max_workers; @@ -481,15 +492,10 @@ h2_stream *h2_mplx_stream_get(h2_mplx *m, int id) static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes) { - h2_mplx *m = ctx; + h2_stream *stream = ctx; + h2_mplx *m = stream->session->mplx; - H2_MPLX_ENTER_ALWAYS(m); - - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - "h2_mplx(%ld-%d): output_produced", m->c->id, beam->id); - check_data_for(m, beam->id); - - H2_MPLX_LEAVE(m); + check_data_for(m, stream, 1); } static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam) @@ -513,7 +519,7 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam) } h2_beam_on_consumed(stream->output, NULL, stream_output_consumed, stream); - h2_beam_on_produced(stream->output, output_produced, m); + h2_beam_on_produced(stream->output, output_produced, stream); if (stream->task->output.copy_files) { h2_beam_on_file_beam(stream->output, h2_beam_no_files, NULL); } @@ -523,7 +529,7 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam) /* we might see some file buckets in the output, see * if we have enough handles reserved. */ - check_data_for(m, stream->id); + check_data_for(m, stream, 0); return status; } @@ -563,7 +569,7 @@ static apr_status_t out_close(h2_mplx *m, h2_task *task) status = h2_beam_close(task->output.beam); h2_beam_log(task->output.beam, m->c, APLOG_TRACE2, "out_close"); output_consumed_signal(m, task); - check_data_for(m, task->stream_id); + check_data_for(m, stream, 0); return status; } @@ -577,7 +583,7 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout, if (m->aborted) { status = APR_ECONNABORTED; } - else if (apr_atomic_read32(&m->event_pending) > 0) { + else if (h2_mplx_has_master_events(m)) { status = APR_SUCCESS; } else { @@ -597,13 +603,15 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout, return status; } -static void check_data_for(h2_mplx *m, int stream_id) +static void check_data_for(h2_mplx *m, h2_stream *stream, int lock) { - ap_assert(m); - h2_iq_append(m->readyq, stream_id); - apr_atomic_set32(&m->event_pending, 1); - if (m->added_output) { - apr_thread_cond_signal(m->added_output); + if (h2_fifo_push(m->readyq, stream) == APR_SUCCESS) { + apr_atomic_set32(&m->event_pending, 1); + H2_MPLX_ENTER_MAYBE(m, lock); + if (m->added_output) { + apr_thread_cond_signal(m->added_output); + } + H2_MPLX_LEAVE_MAYBE(m, lock); } } @@ -656,7 +664,7 @@ apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream, h2_ihash_add(m->streams, stream); if (h2_stream_is_ready(stream)) { /* already have a response */ - check_data_for(m, stream->id); + check_data_for(m, stream, 0); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, H2_STRM_MSG(stream, "process, add to readyq")); } @@ -828,7 +836,7 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) if (stream->output) { h2_beam_mutex_disable(stream->output); } - check_data_for(m, stream->id); + check_data_for(m, stream, 0); } else if ((stream = h2_ihash_get(m->shold, task->stream_id)) != NULL) { ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, @@ -1161,48 +1169,33 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, stream_ev_callback *on_resume, void *on_ctx) { - int ids[100]; h2_stream *stream; - size_t i, n; + int n; - H2_MPLX_ENTER(m); - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, "h2_mplx(%ld): dispatch events", m->id); apr_atomic_set32(&m->event_pending, 0); - purge_streams(m); - + /* update input windows for streams */ h2_ihash_iter(m->streams, report_consumption_iter, m); - if (!h2_iq_empty(m->readyq)) { - n = h2_iq_mshift(m->readyq, ids, H2_ALEN(ids)); - for (i = 0; i < n; ++i) { - stream = h2_ihash_get(m->streams, ids[i]); - if (stream) { - H2_MPLX_LEAVE(m); - - on_resume(on_ctx, stream); - - H2_MPLX_ENTER(m); - } - } + n = h2_fifo_count(m->readyq); + while (n > 0 + && (h2_fifo_try_pull(m->readyq, (void**)&stream) == APR_SUCCESS)) { + --n; + on_resume(on_ctx, stream); } - if (!h2_iq_empty(m->readyq)) { - apr_atomic_set32(&m->event_pending, 1); - } - + + H2_MPLX_ENTER(m); + purge_streams(m); H2_MPLX_LEAVE(m); + return APR_SUCCESS; } -apr_status_t h2_mplx_keep_active(h2_mplx *m, int stream_id) +apr_status_t h2_mplx_keep_active(h2_mplx *m, h2_stream *stream) { - H2_MPLX_ENTER(m); - - check_data_for(m, stream_id); - - H2_MPLX_LEAVE(m); + check_data_for(m, stream, 1); return APR_SUCCESS; } @@ -1215,7 +1208,8 @@ int h2_mplx_awaits_data(h2_mplx *m) if (h2_ihash_empty(m->streams)) { waiting = 0; } - if (h2_iq_empty(m->readyq) && h2_iq_empty(m->q) && !m->tasks_active) { + if ((h2_fifo_count(m->readyq) == 0) + && h2_iq_empty(m->q) && !m->tasks_active) { waiting = 0; } diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index 82a98fce0a..ed332c8bc3 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -68,7 +68,7 @@ struct h2_mplx { struct h2_ihash_t *spurge; /* all streams done, ready for destroy */ struct h2_iqueue *q; /* all stream ids that need to be started */ - struct h2_iqueue *readyq; /* all stream ids ready for output */ + struct h2_fifo *readyq; /* all streams ready for output */ struct h2_ihash_t *redo_tasks; /* all tasks that need to be redone */ @@ -158,7 +158,7 @@ apr_status_t h2_mplx_stream_cleanup(h2_mplx *m, struct h2_stream *stream); apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout, struct apr_thread_cond_t *iowait); -apr_status_t h2_mplx_keep_active(h2_mplx *m, int stream_id); +apr_status_t h2_mplx_keep_active(h2_mplx *m, struct h2_stream *stream); /******************************************************************************* * Stream processing. diff --git a/modules/http2/h2_stream.c b/modules/http2/h2_stream.c index 9d416cb7d7..9784b4ec28 100644 --- a/modules/http2/h2_stream.c +++ b/modules/http2/h2_stream.c @@ -859,7 +859,7 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen, if (status == APR_EAGAIN) { /* TODO: ugly, someone needs to retrieve the response first */ - h2_mplx_keep_active(stream->session->mplx, stream->id); + h2_mplx_keep_active(stream->session->mplx, stream); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, H2_STRM_MSG(stream, "prep, response eagain")); return status; diff --git a/modules/http2/h2_util.c b/modules/http2/h2_util.c index 0389193e88..a0b81fa0a8 100644 --- a/modules/http2/h2_util.c +++ b/modules/http2/h2_util.c @@ -438,12 +438,12 @@ int h2_iq_count(h2_iqueue *q) } -void h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx) +int h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx) { int i; if (h2_iq_contains(q, sid)) { - return; + return 0; } if (q->nelts >= q->nalloc) { iq_grow(q, q->nalloc * 2); @@ -456,11 +456,12 @@ void h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx) /* bubble it to the front of the queue */ iq_bubble_up(q, i, q->head, cmp, ctx); } + return 1; } -void h2_iq_append(h2_iqueue *q, int sid) +int h2_iq_append(h2_iqueue *q, int sid) { - h2_iq_add(q, sid, NULL, NULL); + return h2_iq_add(q, sid, NULL, NULL); } int h2_iq_remove(h2_iqueue *q, int sid) @@ -612,6 +613,7 @@ int h2_iq_contains(h2_iqueue *q, int sid) struct h2_fifo { void **elems; int nelems; + int set; int head; int count; int aborted; @@ -636,7 +638,20 @@ static apr_status_t fifo_destroy(void *data) return APR_SUCCESS; } -apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity) +static int index_of(h2_fifo *fifo, void *elem) +{ + int i; + + for (i = 0; i < fifo->count; ++i) { + if (elem == fifo->elems[nth_index(fifo, i)]) { + return i; + } + } + return -1; +} + +static apr_status_t create_int(h2_fifo **pfifo, apr_pool_t *pool, + int capacity, int as_set) { apr_status_t rv; h2_fifo *fifo; @@ -667,6 +682,7 @@ apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity) return APR_ENOMEM; } fifo->nelems = capacity; + fifo->set = as_set; *pfifo = fifo; apr_pool_cleanup_register(pool, fifo, fifo_destroy, apr_pool_cleanup_null); @@ -674,6 +690,16 @@ apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity) return APR_SUCCESS; } +apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity) +{ + return create_int(pfifo, pool, capacity, 0); +} + +apr_status_t h2_fifo_set_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity) +{ + return create_int(pfifo, pool, capacity, 1); +} + apr_status_t h2_fifo_term(h2_fifo *fifo) { apr_status_t rv; @@ -725,7 +751,12 @@ static apr_status_t fifo_push(h2_fifo *fifo, void *elem, int block) } if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) { - if (fifo->count == fifo->nelems) { + if (fifo->set && index_of(fifo, elem) >= 0) { + /* set mode, elem already member */ + apr_thread_mutex_unlock(fifo->lock); + return APR_EEXIST; + } + else if (fifo->count == fifo->nelems) { if (block) { while (fifo->count == fifo->nelems) { if (fifo->aborted) { diff --git a/modules/http2/h2_util.h b/modules/http2/h2_util.h index f6a4b9a43d..9b408fad3d 100644 --- a/modules/http2/h2_util.h +++ b/modules/http2/h2_util.h @@ -119,17 +119,19 @@ int h2_iq_count(h2_iqueue *q); * @param q the queue to append the id to * @param sid the stream id to add * @param cmp the comparator for sorting - * @param ctx user data for comparator + * @param ctx user data for comparator + * @return != 0 iff id was not already there */ -void h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx); +int h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx); /** * Append the id to the queue if not already present. * * @param q the queue to append the id to * @param sid the id to append + * @return != 0 iff id was not already there */ -void h2_iq_append(h2_iqueue *q, int sid); +int h2_iq_append(h2_iqueue *q, int sid); /** * Remove the stream id from the queue. Return != 0 iff task @@ -193,12 +195,32 @@ int h2_iq_contains(h2_iqueue *q, int sid); */ typedef struct h2_fifo h2_fifo; +/** + * Create a FIFO queue that can hold up to capacity elements. Elements can + * appear several times. + */ apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity); + +/** + * Create a FIFO set that can hold up to capacity elements. Elements only + * appear once. Pushing an element already present does not change the + * queue and is successful. + */ +apr_status_t h2_fifo_set_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity); + apr_status_t h2_fifo_term(h2_fifo *fifo); apr_status_t h2_fifo_interrupt(h2_fifo *fifo); int h2_fifo_count(h2_fifo *fifo); +/** + * Push en element into the queue. Blocks if there is no capacity left. + * + * @param fifo the FIFO queue + * @param elem the element to push + * @return APR_SUCCESS on push, APR_EAGAIN on try_push on a full queue, + * APR_EEXIST when in set mode and elem already there. + */ apr_status_t h2_fifo_push(h2_fifo *fifo, void *elem); apr_status_t h2_fifo_try_push(h2_fifo *fifo, void *elem); diff --git a/modules/http2/h2_workers.c b/modules/http2/h2_workers.c index 8eea35caea..d14502341a 100644 --- a/modules/http2/h2_workers.c +++ b/modules/http2/h2_workers.c @@ -107,7 +107,7 @@ static apr_status_t activate_slot(h2_workers *workers, h2_slot *slot) return APR_ENOMEM; } - ++workers->worker_count; + apr_atomic_inc32(&workers->worker_count); return APR_SUCCESS; } @@ -142,7 +142,7 @@ static void cleanup_zombies(h2_workers *workers) apr_thread_join(&status, slot->thread); slot->thread = NULL; } - --workers->worker_count; + apr_atomic_dec32(&workers->worker_count); push_slot(&workers->free, slot); } } @@ -199,14 +199,10 @@ static apr_status_t get_next(h2_slot *slot) cleanup_zombies(workers); - ++workers->idle_workers; - apr_thread_mutex_lock(slot->lock); push_slot(&workers->idle, slot); apr_thread_cond_wait(slot->not_idle, slot->lock); apr_thread_mutex_unlock(slot->lock); - - --workers->idle_workers; } return APR_EOF; } diff --git a/modules/http2/h2_workers.h b/modules/http2/h2_workers.h index 30a7514cd0..7964b3c3aa 100644 --- a/modules/http2/h2_workers.h +++ b/modules/http2/h2_workers.h @@ -40,8 +40,6 @@ struct h2_workers { int next_worker_id; int min_workers; int max_workers; - int worker_count; - int idle_workers; int max_idle_secs; int aborted; @@ -51,6 +49,8 @@ struct h2_workers { int nslots; struct h2_slot *slots; + volatile apr_uint32_t worker_count; + struct h2_slot *free; struct h2_slot *idle; struct h2_slot *zombies; -- 2.50.1