From: Stefan Eissing Date: Mon, 19 Sep 2016 16:31:15 +0000 (+0000) Subject: Merge of 1761434,1761477 from trunk: X-Git-Tag: 2.4.24~232 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=c5813d32eea7f3bcfa0d1151cc448d01fc1decfe;p=apache Merge of 1761434,1761477 from trunk: mod_http2: fix for output blocking race condition git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/branches/2.4.x@1761478 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/CHANGES b/CHANGES index 55d4434f0d..2ad62ffc6a 100644 --- a/CHANGES +++ b/CHANGES @@ -2,6 +2,9 @@ Changes with Apache 2.4.24 + *) mod_http2: fix suspended handling for streams. Output could become + blocked in rare cases. + *) mpm_winnt: Prevent a denial of service when the 'data' AcceptFilter is in use by replacing it with the 'connect' filter. PR 59970. [Jacob Champion] diff --git a/modules/http2/h2_bucket_beam.c b/modules/http2/h2_bucket_beam.c index 1338ba68b0..b1d698e796 100644 --- a/modules/http2/h2_bucket_beam.c +++ b/modules/http2/h2_bucket_beam.c @@ -223,6 +223,28 @@ static void leave_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl) } } +static void report_consumption(h2_bucket_beam *beam, int force) +{ + if (force || beam->received_bytes != beam->reported_consumed_bytes) { + if (beam->consumed_fn) { + beam->consumed_fn(beam->consumed_ctx, beam, beam->received_bytes + - beam->reported_consumed_bytes); + } + beam->reported_consumed_bytes = beam->received_bytes; + } +} + +static void report_production(h2_bucket_beam *beam, int force) +{ + if (force || beam->sent_bytes != beam->reported_produced_bytes) { + if (beam->produced_fn) { + beam->produced_fn(beam->produced_ctx, beam, beam->sent_bytes + - beam->reported_produced_bytes); + } + beam->reported_produced_bytes = beam->sent_bytes; + } +} + static apr_off_t calc_buffered(h2_bucket_beam *beam) { apr_off_t len = 0; @@ -279,7 +301,9 @@ static apr_status_t r_wait_space(h2_bucket_beam *beam, apr_read_type_e block, *premain = calc_space_left(beam); while (!beam->aborted && *premain <= 0 && (block == APR_BLOCK_READ) && pbl->mutex) { - apr_status_t status = wait_cond(beam, pbl->mutex); + apr_status_t status; + report_production(beam, 1); + status = wait_cond(beam, pbl->mutex); if (APR_STATUS_IS_TIMEUP(status)) { return status; } @@ -356,28 +380,6 @@ static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy) } } -static void report_consumption(h2_bucket_beam *beam, int force) -{ - if (force || beam->received_bytes != beam->reported_consumed_bytes) { - if (beam->consumed_fn) { - beam->consumed_fn(beam->consumed_ctx, beam, beam->received_bytes - - beam->reported_consumed_bytes); - } - beam->reported_consumed_bytes = beam->received_bytes; - } -} - -static void report_production(h2_bucket_beam *beam, int force) -{ - if (force || beam->sent_bytes != beam->reported_produced_bytes) { - if (beam->produced_fn) { - beam->produced_fn(beam->produced_ctx, beam, beam->sent_bytes - - beam->reported_produced_bytes); - } - beam->reported_produced_bytes = beam->sent_bytes; - } -} - static void h2_blist_cleanup(h2_blist *bl) { apr_bucket *e; @@ -877,6 +879,9 @@ transfer: } if (transferred) { + if (beam->m_cond) { + apr_thread_cond_broadcast(beam->m_cond); + } status = APR_SUCCESS; } else if (beam->closed) { @@ -890,6 +895,9 @@ transfer: goto transfer; } else { + if (beam->m_cond) { + apr_thread_cond_broadcast(beam->m_cond); + } status = APR_EAGAIN; } leave: diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index e340c9a678..8c3a59280c 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -168,7 +168,7 @@ static int can_beam_file(void *ctx, h2_bucket_beam *beam, apr_file_t *file) return 0; } -static void have_out_data_for(h2_mplx *m, int stream_id); +static void have_out_data_for(h2_mplx *m, h2_stream *stream, int response); static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master); static void check_tx_reservation(h2_mplx *m) @@ -545,7 +545,7 @@ static int task_abort_connection(void *ctx, void *val) if (task->input.beam) { h2_beam_abort(task->input.beam); } - if (task->output.beam) { + if (task->worker_started && !task->worker_done && task->output.beam) { h2_beam_abort(task->output.beam); } return 1; @@ -713,6 +713,23 @@ void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx) m->input_consumed_ctx = ctx; } +static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes) +{ + h2_mplx *m = ctx; + apr_status_t status; + h2_stream *stream; + int acquired; + + AP_DEBUG_ASSERT(m); + if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { + stream = h2_ihash_get(m->streams, beam->id); + if (stream) { + have_out_data_for(m, stream, 0); + } + leave_mutex(m, acquired); + } +} + static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response) { apr_status_t status = APR_SUCCESS; @@ -735,6 +752,7 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response) h2_beam_buffer_size_set(task->output.beam, m->stream_max_mem); h2_beam_timeout_set(task->output.beam, m->stream_timeout); h2_beam_on_consumed(task->output.beam, stream_output_consumed, task); + h2_beam_on_produced(task->output.beam, output_produced, m); m->tx_handles_reserved -= h2_beam_get_files_beamed(task->output.beam); if (!task->output.copy_files) { h2_beam_on_file_beam(task->output.beam, can_beam_file, m); @@ -743,13 +761,12 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response) task->output.opened = 1; } - h2_ihash_add(m->sready, stream); if (response && response->http_status < 300) { /* we might see some file buckets in the output, see * if we have enough handles reserved. */ check_tx_reservation(m); } - have_out_data_for(m, stream_id); + have_out_data_for(m, stream, 1); return status; } @@ -803,7 +820,7 @@ static apr_status_t out_close(h2_mplx *m, h2_task *task) APLOG_TRACE2); } output_consumed_signal(m, task); - have_out_data_for(m, task->stream_id); + have_out_data_for(m, stream, 0); return status; } @@ -837,12 +854,18 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout, return status; } -static void have_out_data_for(h2_mplx *m, int stream_id) +static void have_out_data_for(h2_mplx *m, h2_stream *stream, int response) { - (void)stream_id; - AP_DEBUG_ASSERT(m); - if (m->added_output) { - apr_thread_cond_signal(m->added_output); + h2_ihash_t *set; + ap_assert(m); + ap_assert(stream); + + set = response? m->sready : m->sresume; + if (!h2_ihash_get(set, stream->id)) { + h2_ihash_add(set, stream); + if (m->added_output) { + apr_thread_cond_signal(m->added_output); + } } } @@ -1071,11 +1094,8 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, "h2_mplx(%s): task_done, stream still open", task->id); - if (h2_stream_is_suspended(stream)) { - /* more data will not arrive, resume the stream */ - h2_ihash_add(m->sresume, stream); - have_out_data_for(m, stream->id); - } + /* more data will not arrive, resume the stream */ + have_out_data_for(m, stream, 0); } else { /* stream no longer active, was it placed in hold? */ @@ -1463,6 +1483,10 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, "h2_mplx(%ld-%d): on_resume", m->id, stream->id); + task = h2_ihash_get(m->tasks, stream->id); + if (task && task->rst_error) { + h2_stream_rst(stream, task->rst_error); + } h2_stream_set_suspended(stream, 0); status = on_resume(on_ctx, stream->id); } @@ -1473,25 +1497,6 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, return status; } -static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes) -{ - h2_mplx *m = ctx; - apr_status_t status; - h2_stream *stream; - int acquired; - - AP_DEBUG_ASSERT(m); - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - stream = h2_ihash_get(m->streams, beam->id); - if (stream && h2_stream_is_suspended(stream)) { - h2_ihash_add(m->sresume, stream); - h2_beam_on_produced(beam, NULL, NULL); - have_out_data_for(m, beam->id); - } - leave_mutex(m, acquired); - } -} - apr_status_t h2_mplx_suspend_stream(h2_mplx *m, int stream_id) { apr_status_t status; @@ -1502,18 +1507,32 @@ apr_status_t h2_mplx_suspend_stream(h2_mplx *m, int stream_id) AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { stream = h2_ihash_get(m->streams, stream_id); - if (stream) { + if (stream && !h2_ihash_get(m->sresume, stream->id)) { + /* not marked for resume again already */ h2_stream_set_suspended(stream, 1); task = h2_ihash_get(m->tasks, stream->id); if (stream->started && (!task || task->worker_done)) { h2_ihash_add(m->sresume, stream); } - else if (task->output.beam) { - /* register callback so that we can resume on new output */ - h2_beam_on_produced(task->output.beam, output_produced, m); - } } leave_mutex(m, acquired); } return status; } + +int h2_mplx_is_busy(h2_mplx *m) +{ + apr_status_t status; + int acquired, busy = 1; + + if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { + if (h2_ihash_empty(m->streams)) { + busy = 0; + } + if (h2_iq_empty(m->q) && h2_ihash_empty(m->tasks)) { + busy = 0; + } + leave_mutex(m, acquired); + } + return busy; +} diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index 4af0ba3c13..b2873e8827 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -158,6 +158,8 @@ void h2_mplx_task_done(h2_mplx *m, struct h2_task *task, struct h2_task **ptask) */ apr_uint32_t h2_mplx_shutdown(h2_mplx *m); +int h2_mplx_is_busy(h2_mplx *m); + /******************************************************************************* * IO lifetime of streams. ******************************************************************************/ diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c index 31ae303c9d..fc91214967 100644 --- a/modules/http2/h2_session.c +++ b/modules/http2/h2_session.c @@ -1168,8 +1168,6 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s, return NGHTTP2_ERR_CALLBACK_FAILURE; } - AP_DEBUG_ASSERT(!h2_stream_is_suspended(stream)); - status = h2_stream_out_prepare(stream, &nread, &eos); if (nread) { *data_flags |= NGHTTP2_DATA_FLAG_NO_COPY; @@ -1179,9 +1177,10 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s, case APR_SUCCESS: break; + case APR_ECONNABORTED: case APR_ECONNRESET: return nghttp2_submit_rst_stream(ng2s, NGHTTP2_FLAG_NONE, - stream->id, stream->rst_error); + stream->id, H2_STREAM_RST(stream, H2_ERR_INTERNAL_ERROR)); case APR_EAGAIN: /* If there is no data available, our session will automatically @@ -1431,7 +1430,17 @@ static apr_status_t on_stream_resume(void *ctx, int stream_id) ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, "h2_stream(%ld-%d): on_resume", session->id, stream_id); if (stream) { - int rv = nghttp2_session_resume_data(session->ngh2, stream_id); + int rv; + if (stream->rst_error) { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO() + "h2_stream(%ld-%d): RST_STREAM, err=%d", + session->id, stream->id, stream->rst_error); + rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE, + stream->id, stream->rst_error); + } + else { + rv = nghttp2_session_resume_data(session->ngh2, stream_id); + } session->have_written = 1; ap_log_cerror(APLOG_MARK, nghttp2_is_fatal(rv)? APLOG_ERR : APLOG_DEBUG, 0, session->c, @@ -1459,7 +1468,7 @@ static apr_status_t on_stream_response(void *ctx, int stream_id) if (!stream) { return APR_NOTFOUND; } - else if (!stream->response) { + else if (stream->rst_error || !stream->response) { int err = H2_STREAM_RST(stream, H2_ERR_PROTOCOL_ERROR); ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03074) @@ -1652,40 +1661,6 @@ static apr_status_t h2_session_read(h2_session *session, int block) return rstatus; } -static int unsubmitted_iter(void *ctx, void *val) -{ - h2_stream *stream = val; - if (h2_stream_needs_submit(stream)) { - *((int *)ctx) = 1; - return 0; - } - return 1; -} - -static int has_unsubmitted_streams(h2_session *session) -{ - int has_unsubmitted = 0; - h2_ihash_iter(session->streams, unsubmitted_iter, &has_unsubmitted); - return has_unsubmitted; -} - -static int suspended_iter(void *ctx, void *val) -{ - h2_stream *stream = val; - if (h2_stream_is_suspended(stream)) { - *((int *)ctx) = 1; - return 0; - } - return 1; -} - -static int has_suspended_streams(h2_session *session) -{ - int has_suspended = 0; - h2_ihash_iter(session->streams, suspended_iter, &has_suspended); - return has_suspended; -} - static const char *StateNames[] = { "INIT", /* H2_SESSION_ST_INIT */ "DONE", /* H2_SESSION_ST_DONE */ @@ -1830,8 +1805,7 @@ static void h2_session_ev_no_io(h2_session *session, int arg, const char *msg) session->id, session->open_streams); h2_conn_io_flush(&session->io); if (session->open_streams > 0) { - if (has_unsubmitted_streams(session) - || has_suspended_streams(session)) { + if (h2_mplx_is_busy(session->mplx)) { /* waiting for at least one stream to produce data */ transit(session, "no io", H2_SESSION_ST_WAIT); } diff --git a/modules/http2/h2_stream.c b/modules/http2/h2_stream.c index f457eb49ff..eaa3d1c924 100644 --- a/modules/http2/h2_stream.c +++ b/modules/http2/h2_stream.c @@ -503,6 +503,9 @@ static apr_status_t fill_buffer(h2_stream *stream, apr_size_t amount) } status = h2_beam_receive(stream->output, stream->buffer, APR_NONBLOCK_READ, amount); + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c, + "h2_stream(%ld-%d): beam_received", + stream->session->id, stream->id); /* The buckets we reveive are using the stream->buffer pool as * lifetime which is exactly what we want since this is stream->pool. * diff --git a/modules/http2/h2_stream_set.c b/modules/http2/h2_stream_set.c deleted file mode 100644 index aa0f8c6501..0000000000 --- a/modules/http2/h2_stream_set.c +++ /dev/null @@ -1,145 +0,0 @@ -/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include - -#include -#include - -#include -#include - -#include "h2_private.h" -#include "h2_stream.h" -#include "h2_stream_set.h" - - -struct h2_stream_set { - apr_hash_t *hash; -}; - -static unsigned int stream_hash(const char *key, apr_ssize_t *klen) -{ - return (unsigned int)(*((int*)key)); -} - -h2_stream_set *h2_stream_set_create(apr_pool_t *pool, int max) -{ - h2_stream_set *sp = apr_pcalloc(pool, sizeof(h2_stream_set)); - sp->hash = apr_hash_make_custom(pool, stream_hash); - - return sp; -} - -void h2_stream_set_destroy(h2_stream_set *sp) -{ - (void)sp; -} - -h2_stream *h2_stream_set_get(h2_stream_set *sp, int stream_id) -{ - return apr_hash_get(sp->hash, &stream_id, sizeof(stream_id)); -} - -void h2_stream_set_add(h2_stream_set *sp, h2_stream *stream) -{ - apr_hash_set(sp->hash, &stream->id, sizeof(stream->id), stream); -} - -void h2_stream_set_remove(h2_stream_set *sp, int stream_id) -{ - apr_hash_set(sp->hash, &stream_id, sizeof(stream_id), NULL); -} - -int h2_stream_set_is_empty(h2_stream_set *sp) -{ - return apr_hash_count(sp->hash) == 0; -} - -apr_size_t h2_stream_set_size(h2_stream_set *sp) -{ - return apr_hash_count(sp->hash); -} - -typedef struct { - h2_stream_set_iter_fn *iter; - void *ctx; -} iter_ctx; - -static int hash_iter(void *ctx, const void *key, apr_ssize_t klen, - const void *val) -{ - iter_ctx *ictx = ctx; - return ictx->iter(ictx->ctx, (h2_stream*)val); -} - -void h2_stream_set_iter(h2_stream_set *sp, - h2_stream_set_iter_fn *iter, void *ctx) -{ - iter_ctx ictx; - ictx.iter = iter; - ictx.ctx = ctx; - apr_hash_do(hash_iter, &ictx, sp->hash); -} - -static int unsubmitted_iter(void *ctx, h2_stream *stream) -{ - if (h2_stream_needs_submit(stream)) { - *((int *)ctx) = 1; - return 0; - } - return 1; -} - -int h2_stream_set_has_unsubmitted(h2_stream_set *sp) -{ - int has_unsubmitted = 0; - h2_stream_set_iter(sp, unsubmitted_iter, &has_unsubmitted); - return has_unsubmitted; -} - -static int input_open_iter(void *ctx, h2_stream *stream) -{ - if (h2_stream_input_is_open(stream)) { - *((int *)ctx) = 1; - return 0; - } - return 1; -} - -int h2_stream_set_has_open_input(h2_stream_set *sp) -{ - int has_input_open = 0; - h2_stream_set_iter(sp, input_open_iter, &has_input_open); - return has_input_open; -} - -static int suspended_iter(void *ctx, h2_stream *stream) -{ - if (h2_stream_is_suspended(stream)) { - *((int *)ctx) = 1; - return 0; - } - return 1; -} - -int h2_stream_set_has_suspended(h2_stream_set *sp) -{ - int has_suspended = 0; - h2_stream_set_iter(sp, suspended_iter, &has_suspended); - return has_suspended; -} - diff --git a/modules/http2/h2_stream_set.h b/modules/http2/h2_stream_set.h deleted file mode 100644 index d0041c4843..0000000000 --- a/modules/http2/h2_stream_set.h +++ /dev/null @@ -1,51 +0,0 @@ -/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef __mod_h2__h2_stream_set__ -#define __mod_h2__h2_stream_set__ - -/** - * A set of h2_stream instances. Allows lookup by stream id - * and other criteria. - */ - -typedef h2_stream *h2_stream_set_match_fn(void *ctx, h2_stream *stream); -typedef int h2_stream_set_iter_fn(void *ctx, h2_stream *stream); - -typedef struct h2_stream_set h2_stream_set; - - -h2_stream_set *h2_stream_set_create(apr_pool_t *pool, int max); - -void h2_stream_set_destroy(h2_stream_set *sp); - -void h2_stream_set_add(h2_stream_set *sp, h2_stream *stream); - -h2_stream *h2_stream_set_get(h2_stream_set *sp, int stream_id); - -void h2_stream_set_remove(h2_stream_set *sp, int stream_id); - -void h2_stream_set_iter(h2_stream_set *sp, - h2_stream_set_iter_fn *iter, void *ctx); - -int h2_stream_set_is_empty(h2_stream_set *sp); - -apr_size_t h2_stream_set_size(h2_stream_set *sp); - -int h2_stream_set_has_unsubmitted(h2_stream_set *sp); -int h2_stream_set_has_open_input(h2_stream_set *sp); -int h2_stream_set_has_suspended(h2_stream_set *sp); - -#endif /* defined(__mod_h2__h2_stream_set__) */ diff --git a/modules/http2/h2_task.c b/modules/http2/h2_task.c index 318943ae4a..c240d4aa5a 100644 --- a/modules/http2/h2_task.c +++ b/modules/http2/h2_task.c @@ -383,6 +383,11 @@ static apr_status_t send_out(h2_task *task, apr_bucket_brigade* bb) h2_task_logio_add_bytes_out(task->c, written); } } + else { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, task->c, + "h2_task(%s): send_out (%ld bytes)", + task->id, (long)written); + } return status; } @@ -542,8 +547,14 @@ static apr_status_t h2_filter_stream_output(ap_filter_t* filter, apr_bucket_brigade* brigade) { h2_task *task = h2_ctx_cget_task(filter->c); - AP_DEBUG_ASSERT(task); - return output_write(task, filter, brigade); + apr_status_t status; + + ap_assert(task); + status = output_write(task, filter, brigade); + if (status != APR_SUCCESS) { + h2_task_rst(task, H2_ERR_INTERNAL_ERROR); + } + return status; } static apr_status_t h2_filter_read_response(ap_filter_t* filter, diff --git a/modules/http2/h2_version.h b/modules/http2/h2_version.h index e65ea7aa1f..90241b85c9 100644 --- a/modules/http2/h2_version.h +++ b/modules/http2/h2_version.h @@ -26,7 +26,7 @@ * @macro * Version number of the http2 module as c string */ -#define MOD_HTTP2_VERSION "1.6.1" +#define MOD_HTTP2_VERSION "1.6.2" /** * @macro @@ -34,7 +34,7 @@ * release. This is a 24 bit number with 8 bits for major number, 8 bits * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203. */ -#define MOD_HTTP2_VERSION_NUM 0x010601 +#define MOD_HTTP2_VERSION_NUM 0x010602 #endif /* mod_h2_h2_version_h */