From: Stefan Eissing Date: Sun, 30 Oct 2016 20:38:50 +0000 (+0000) Subject: mod_http2: using int queue instead of ihash for stream output event handling X-Git-Tag: 2.5.0-alpha~1058 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=673349a575beb79e1ff25c129c6399277eba143b;p=apache mod_http2: using int queue instead of ihash for stream output event handling git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1767180 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/CHANGES b/CHANGES index 12b5f46f77..95befc279b 100644 --- a/CHANGES +++ b/CHANGES @@ -1,7 +1,7 @@ -*- coding: utf-8 -*- Changes with Apache 2.5.0 - *) mod_http2: allocators from slave connections is released earlier, resulting + *) mod_http2: allocators from slave connections are released earlier, resulting in less overall memory use on busy, long lived connections. [Stefan Eissing] diff --git a/modules/http2/NWGNUmod_http2 b/modules/http2/NWGNUmod_http2 index b6880d7a70..10974a7ebc 100644 --- a/modules/http2/NWGNUmod_http2 +++ b/modules/http2/NWGNUmod_http2 @@ -355,18 +355,6 @@ $(OBJDIR)/mod_http2.imp : NWGNUmod_http2 @echo $(DL)GEN $@$(DL) @echo $(DL) (HTTP2)$(DL) > $@ @echo $(DL) http2_module,$(DL) >> $@ - @echo $(DL) h2_ihash_add,$(DL) >> $@ - @echo $(DL) h2_ihash_clear,$(DL) >> $@ - @echo $(DL) h2_ihash_count,$(DL) >> $@ - @echo $(DL) h2_ihash_create,$(DL) >> $@ - @echo $(DL) h2_ihash_empty,$(DL) >> $@ - @echo $(DL) h2_ihash_iter,$(DL) >> $@ - @echo $(DL) h2_ihash_remove,$(DL) >> $@ - @echo $(DL) h2_iq_add,$(DL) >> $@ - @echo $(DL) h2_iq_create,$(DL) >> $@ - @echo $(DL) h2_iq_remove,$(DL) >> $@ - @echo $(DL) h2_log2,$(DL) >> $@ - @echo $(DL) h2_headers_add_h1,$(DL) >> $@ @echo $(DL) nghttp2_is_fatal,$(DL) >> $@ @echo $(DL) nghttp2_option_del,$(DL) >> $@ @echo $(DL) nghttp2_option_new,$(DL) >> $@ diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index 72f11106ed..8890aa388e 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -301,7 +301,7 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id)); m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id)); m->q = h2_iq_create(m->pool, m->max_streams); - m->sready = h2_ihash_create(m->pool, offsetof(h2_stream,id)); + m->readyq = h2_iq_create(m->pool, m->max_streams); m->tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id)); m->redo_tasks = h2_ihash_create(m->pool, offsetof(h2_task, stream_id)); @@ -435,7 +435,6 @@ static void stream_done(h2_mplx *m, h2_stream *stream, int rst_error) * stream destruction until the task is done. */ h2_iq_remove(m->q, stream->id); - h2_ihash_remove(m->sready, stream->id); h2_ihash_remove(m->streams, stream->id); h2_stream_cleanup(stream); @@ -786,7 +785,7 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout, if (m->aborted) { status = APR_ECONNABORTED; } - else if (!h2_ihash_empty(m->sready)) { + else if (!h2_iq_empty(m->readyq)) { status = APR_SUCCESS; } else { @@ -809,11 +808,9 @@ static void have_out_data_for(h2_mplx *m, h2_stream *stream, int response) { ap_assert(m); ap_assert(stream); - if (!h2_ihash_get(m->sready, stream->id)) { - h2_ihash_add(m->sready, stream); - if (m->added_output) { - apr_thread_cond_signal(m->added_output); - } + h2_iq_append(m->readyq, stream->id); + if (m->added_output) { + apr_thread_cond_signal(m->added_output); } } @@ -850,7 +847,7 @@ apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream, else { h2_ihash_add(m->streams, stream); if (h2_stream_is_ready(stream)) { - h2_ihash_add(m->sready, stream); + h2_iq_append(m->readyq, stream->id); } else { if (!m->need_registration) { @@ -1357,7 +1354,7 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, { apr_status_t status; int acquired; - int streams[32]; + int ids[100]; h2_stream *stream; size_t i, n; @@ -1367,17 +1364,16 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, /* update input windows for streams */ h2_ihash_iter(m->streams, update_window, m); - if (on_resume && !h2_ihash_empty(m->sready)) { - n = h2_ihash_ishift(m->sready, streams, H2_ALEN(streams)); + if (on_resume && !h2_iq_empty(m->readyq)) { + n = h2_iq_mshift(m->readyq, ids, H2_ALEN(ids)); for (i = 0; i < n; ++i) { - stream = h2_ihash_get(m->streams, streams[i]); - if (!stream) { - continue; + stream = h2_ihash_get(m->streams, ids[i]); + if (stream) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, + "h2_mplx(%ld-%d): on_resume", + m->id, stream->id); + on_resume(on_ctx, stream); } - ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, - "h2_mplx(%ld-%d): on_resume", - m->id, stream->id); - on_resume(on_ctx, stream); } } @@ -1394,7 +1390,7 @@ apr_status_t h2_mplx_keep_active(h2_mplx *m, int stream_id) if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { h2_stream *s = h2_ihash_get(m->streams, stream_id); if (s) { - h2_ihash_add(m->sready, s); + h2_iq_append(m->readyq, stream_id); } leave_mutex(m, acquired); } diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index f7e3501783..cb48fd2eb0 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -75,7 +75,7 @@ struct h2_mplx { struct h2_ihash_t *spurge; /* all streams done, ready for destroy */ struct h2_iqueue *q; /* all stream ids that need to be started */ - struct h2_ihash_t *sready; /* all streams ready for output */ + struct h2_iqueue *readyq; /* all stream ids ready for output */ struct h2_ihash_t *tasks; /* all tasks started and not destroyed */ struct h2_ihash_t *redo_tasks; /* all tasks that need to be redone */ diff --git a/modules/http2/h2_task.c b/modules/http2/h2_task.c index e5e4efc6a0..a1f99bd51b 100644 --- a/modules/http2/h2_task.c +++ b/modules/http2/h2_task.c @@ -90,7 +90,7 @@ static apr_status_t open_output(h2_task *task) return h2_mplx_out_open(task->mplx, task->stream_id, task->output.beam); } -static apr_status_t send_out(h2_task *task, apr_bucket_brigade* bb) +static apr_status_t send_out(h2_task *task, apr_bucket_brigade* bb, int block) { apr_off_t written, left; apr_status_t status; @@ -99,8 +99,7 @@ static apr_status_t send_out(h2_task *task, apr_bucket_brigade* bb) H2_TASK_OUT_LOG(APLOG_TRACE2, task, bb, "h2_task send_out"); /* engines send unblocking */ status = h2_beam_send(task->output.beam, bb, - task->assigned? APR_NONBLOCK_READ - : APR_BLOCK_READ); + block? APR_BLOCK_READ : APR_NONBLOCK_READ); if (APR_STATUS_IS_EAGAIN(status)) { apr_brigade_length(bb, 0, &left); written -= left; @@ -130,13 +129,7 @@ static apr_status_t slave_out(h2_task *task, ap_filter_t* f, { apr_bucket *b; apr_status_t status = APR_SUCCESS; - int flush = 0; - - if (APR_BRIGADE_EMPTY(bb)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c, - "h2_slave_out(%s): empty write", task->id); - return APR_SUCCESS; - } + int flush = 0, blocking; if (task->frozen) { h2_util_bb_log(task->c, task->stream_id, APLOG_TRACE2, @@ -153,57 +146,46 @@ static apr_status_t slave_out(h2_task *task, ap_filter_t* f, } return APR_SUCCESS; } - - /* Attempt to write saved brigade first */ - if (task->output.bb && !APR_BRIGADE_EMPTY(task->output.bb)) { - status = send_out(task, task->output.bb); - if (status != APR_SUCCESS) { - return status; - } - } - - /* If there is nothing saved (anymore), try to write the brigade passed */ - if ((!task->output.bb || APR_BRIGADE_EMPTY(task->output.bb)) - && !APR_BRIGADE_EMPTY(bb)) { - /* check if we have a flush before the end-of-request */ - if (!task->output.opened) { - for (b = APR_BRIGADE_FIRST(bb); - b != APR_BRIGADE_SENTINEL(bb); - b = APR_BUCKET_NEXT(b)) { - if (AP_BUCKET_IS_EOR(b)) { - break; - } - else if (APR_BUCKET_IS_FLUSH(b)) { - flush = 1; - } - } - } - status = send_out(task, bb); - if (status != APR_SUCCESS) { - return status; + /* we send block once we opened the output, so someone is there + * reading it *and* the task is not assigned to a h2_req_engine */ + blocking = (!task->assigned && task->output.opened); + if (!task->output.opened) { + for (b = APR_BRIGADE_FIRST(bb); + b != APR_BRIGADE_SENTINEL(bb); + b = APR_BUCKET_NEXT(b)) { + if (APR_BUCKET_IS_FLUSH(b)) { + flush = 1; + break; + } } } - /* If the passed brigade is not empty, save it before return */ - if (!APR_BRIGADE_EMPTY(bb)) { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, task->c, APLOGNO(03405) - "h2_slave_out(%s): could not write all, saving brigade", - task->id); - if (!task->output.bb) { - task->output.bb = apr_brigade_create(task->pool, - task->c->bucket_alloc); + if (task->output.bb && !APR_BRIGADE_EMPTY(task->output.bb)) { + /* still have data buffered from previous attempt. + * setaside and append new data and try to pass the complete data */ + if (!APR_BRIGADE_EMPTY(bb)) { + status = ap_save_brigade(f, &task->output.bb, &bb, task->pool); } - status = ap_save_brigade(f, &task->output.bb, &bb, task->pool); - if (status != APR_SUCCESS) { - return status; + if (status == APR_SUCCESS) { + status = send_out(task, task->output.bb, blocking); + } + } + else { + /* no data buffered here, try to pass the brigade directly */ + status = send_out(task, bb, blocking); + if (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) { + /* could not write all, buffer the rest */ + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, task->c, APLOGNO(03405) + "h2_slave_out(%s): saving brigade", + task->id); + status = ap_save_brigade(f, &task->output.bb, &bb, task->pool); + flush = 1; } } - if (!task->output.opened - && (flush || h2_beam_get_mem_used(task->output.beam) > (32*1024))) { - /* if we have enough buffered or we got a flush bucket, open - * the response now. */ + if (status == APR_SUCCESS && !task->output.opened && flush) { + /* got a flush or could not write all, time to tell someone to read */ status = open_output(task); } ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, task->c, diff --git a/modules/http2/h2_util.c b/modules/http2/h2_util.c index 81b94566c5..c528b5f027 100644 --- a/modules/http2/h2_util.c +++ b/modules/http2/h2_util.c @@ -440,10 +440,12 @@ void h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx) { int i; + if (h2_iq_contains(q, sid)) { + return; + } if (q->nelts >= q->nalloc) { iq_grow(q, q->nalloc * 2); } - i = (q->head + q->nelts) % q->nalloc; q->elts[i] = sid; ++q->nelts; @@ -454,6 +456,11 @@ void h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx) } } +void h2_iq_append(h2_iqueue *q, int sid) +{ + h2_iq_add(q, sid, NULL, NULL); +} + int h2_iq_remove(h2_iqueue *q, int sid) { int i; @@ -522,6 +529,17 @@ int h2_iq_shift(h2_iqueue *q) return sid; } +size_t h2_iq_mshift(h2_iqueue *q, int *pint, size_t max) +{ + for (int i = 0; i < max; ++i) { + pint[i] = h2_iq_shift(q); + if (pint[i] == 0) { + break; + } + } + return i; +} + static void iq_grow(h2_iqueue *q, int nlen) { if (nlen > q->nalloc) { @@ -573,6 +591,17 @@ static int iq_bubble_down(h2_iqueue *q, int i, int bottom, return i; } +int h2_iq_contains(h2_iqueue *q, int sid) +{ + int i; + for (i = 0; i < q->nelts; ++i) { + if (sid == q->elts[(q->head + i) % q->nalloc]) { + return 1; + } + } + return 0; +} + /******************************************************************************* * h2_util for apt_table_t ******************************************************************************/ diff --git a/modules/http2/h2_util.h b/modules/http2/h2_util.h index 7c9453a6cf..7b92553445 100644 --- a/modules/http2/h2_util.h +++ b/modules/http2/h2_util.h @@ -116,13 +116,21 @@ int h2_iq_count(h2_iqueue *q); /** * Add a stream id to the queue. * - * @param q the queue to append the task to + * @param q the queue to append the id to * @param sid the stream id to add * @param cmp the comparator for sorting * @param ctx user data for comparator */ void h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx); +/** + * Append the id to the queue if not already present. + * + * @param q the queue to append the id to + * @param sid the id to append + */ +void h2_iq_append(h2_iqueue *q, int sid); + /** * Remove the stream id from the queue. Return != 0 iff task * was found in queue. @@ -148,14 +156,33 @@ void h2_iq_clear(h2_iqueue *q); void h2_iq_sort(h2_iqueue *q, h2_iq_cmp *cmp, void *ctx); /** - * Get the first stream id from the queue or NULL if the queue is empty. - * The task will be removed. + * Get the first id from the queue or 0 if the queue is empty. + * The id is being removed. * - * @param q the queue to get the first task from - * @return the first stream id of the queue, 0 if empty + * @param q the queue to get the first id from + * @return the first id of the queue, 0 if empty */ int h2_iq_shift(h2_iqueue *q); +/** + * Get the first max ids from the queue. All these ids will be removed. + * + * @param q the queue to get the first task from + * @param pint the int array to receive the values + * @param max the maximum number of ids to shift + * @return the actual number of ids shifted + */ +size_t h2_iq_mshift(h2_iqueue *q, int *pint, size_t max); + +/** + * Determine if int is in the queue already + * + * @parm q the queue + * @param sid the integer id to check for + * @return != 0 iff sid is already in the queue + */ +int h2_iq_contains(h2_iqueue *q, int sid); + /******************************************************************************* * common helpers ******************************************************************************/