From bc40444b57f25164cebf11441c82092faf8af500 Mon Sep 17 00:00:00 2001 From: Stefan Eissing Date: Sun, 12 Mar 2017 14:20:29 +0000 Subject: [PATCH] On the 2.4.x branch: Merged /httpd/httpd/trunk:r1784571,1785672,1785683,1786512,1786575-1786576 mod_http2/mod_proxy_http2 backport git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/branches/2.4.x@1786582 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES | 15 ++ modules/http2/h2_bucket_beam.c | 127 ++++++++---- modules/http2/h2_bucket_beam.h | 11 +- modules/http2/h2_conn.c | 38 ++-- modules/http2/h2_h2.c | 3 +- modules/http2/h2_mplx.c | 143 +++++++------- modules/http2/h2_mplx.h | 2 - modules/http2/h2_session.c | 335 ++++++++++++++++++-------------- modules/http2/h2_session.h | 12 +- modules/http2/h2_stream.c | 106 ++++++---- modules/http2/h2_stream.h | 3 + modules/http2/h2_switch.c | 1 + modules/http2/h2_task.c | 18 +- modules/http2/h2_task.h | 3 +- modules/http2/h2_version.h | 4 +- modules/http2/mod_proxy_http2.c | 11 +- 16 files changed, 499 insertions(+), 333 deletions(-) diff --git a/CHANGES b/CHANGES index 6aa5c6badf..003ffe897c 100644 --- a/CHANGES +++ b/CHANGES @@ -2,6 +2,21 @@ Changes with Apache 2.4.26 + *) mod_http2: moving session cleanup to pre_close hook to avoid races with + modules already shut down and slave connections still operating. + [Stefan Eissing] + + *) mod_http2: stream timeouts now change to vhost values once the request + is parsed and processing starts. Initial values are taken from base + server or SNI host as before. [Stefan Eissing] + + *) mod_proxy_http2: fixed retry behaviour when frontend connection uses + http/1.1. [Stefan Eissing] + + *) mod_http2: separate mutex instances for each bucket beam, resulting in + less lock contention. input beams only created when necessary. + [Stefan Eissing] + *) mod_lua: Support for Lua 5.3 *) mod_proxy_http2: support for ProxyPreserverHost directive. [Stefan Eissing] diff --git a/modules/http2/h2_bucket_beam.c b/modules/http2/h2_bucket_beam.c index 3d644fd68e..53cc36f46f 100644 --- a/modules/http2/h2_bucket_beam.c +++ b/modules/http2/h2_bucket_beam.c @@ -195,6 +195,19 @@ static apr_bucket *h2_beam_bucket(h2_bucket_beam *beam, * bucket beam that can transport buckets across threads ******************************************************************************/ +static void mutex_leave(void *ctx, apr_thread_mutex_t *lock) +{ + apr_thread_mutex_unlock(lock); +} + +static apr_status_t mutex_enter(void *ctx, h2_beam_lock *pbl) +{ + h2_bucket_beam *beam = ctx; + pbl->mutex = beam->lock; + pbl->leave = mutex_leave; + return apr_thread_mutex_lock(pbl->mutex); +} + static apr_status_t enter_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl) { h2_beam_mutex_enter *enter = beam->m_enter; @@ -227,26 +240,37 @@ static apr_off_t bucket_mem_used(apr_bucket *b) } } -static int report_consumption(h2_bucket_beam *beam) +static int report_consumption(h2_bucket_beam *beam, h2_beam_lock *pbl) { int rv = 0; - if (beam->cons_io_cb) { - beam->cons_io_cb(beam->cons_ctx, beam, beam->received_bytes - - beam->cons_bytes_reported); + apr_off_t len = beam->received_bytes - beam->cons_bytes_reported; + h2_beam_io_callback *cb = beam->cons_io_cb; + + if (cb) { + void *ctx = beam->cons_ctx; + + if (pbl) leave_yellow(beam, pbl); + cb(ctx, beam, len); + if (pbl) enter_yellow(beam, pbl); rv = 1; } - beam->cons_bytes_reported = beam->received_bytes; + beam->cons_bytes_reported += len; return rv; } -static void report_prod_io(h2_bucket_beam *beam, int force) +static void report_prod_io(h2_bucket_beam *beam, int force, h2_beam_lock *pbl) { - if (force || beam->prod_bytes_reported != beam->sent_bytes) { - if (beam->prod_io_cb) { - beam->prod_io_cb(beam->prod_ctx, beam, beam->sent_bytes - - beam->prod_bytes_reported); + apr_off_t len = beam->sent_bytes - beam->prod_bytes_reported; + if (force || len > 0) { + h2_beam_io_callback *cb = beam->prod_io_cb; + if (cb) { + void *ctx = beam->prod_ctx; + + leave_yellow(beam, pbl); + cb(ctx, beam, len); + enter_yellow(beam, pbl); } - beam->prod_bytes_reported = beam->sent_bytes; + beam->prod_bytes_reported += len; } } @@ -293,10 +317,10 @@ static apr_size_t calc_space_left(h2_bucket_beam *beam) static apr_status_t wait_cond(h2_bucket_beam *beam, apr_thread_mutex_t *lock) { if (beam->timeout > 0) { - return apr_thread_cond_timedwait(beam->m_cond, lock, beam->timeout); + return apr_thread_cond_timedwait(beam->cond, lock, beam->timeout); } else { - return apr_thread_cond_wait(beam->m_cond, lock); + return apr_thread_cond_wait(beam->cond, lock); } } @@ -307,7 +331,7 @@ static apr_status_t r_wait_space(h2_bucket_beam *beam, apr_read_type_e block, while (!beam->aborted && *premain <= 0 && (block == APR_BLOCK_READ) && pbl->mutex) { apr_status_t status; - report_prod_io(beam, 1); + report_prod_io(beam, 1, pbl); status = wait_cond(beam, pbl->mutex); if (APR_STATUS_IS_TIMEUP(status)) { return status; @@ -378,8 +402,8 @@ static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy) if (!bl.mutex) { r_purge_sent(beam); } - else if (beam->m_cond) { - apr_thread_cond_broadcast(beam->m_cond); + else if (beam->cond) { + apr_thread_cond_broadcast(beam->cond); } leave_yellow(beam, &bl); } @@ -399,8 +423,8 @@ static apr_status_t beam_close(h2_bucket_beam *beam) { if (!beam->closed) { beam->closed = 1; - if (beam->m_cond) { - apr_thread_cond_broadcast(beam->m_cond); + if (beam->cond) { + apr_thread_cond_broadcast(beam->cond); } } return APR_SUCCESS; @@ -445,7 +469,7 @@ static apr_status_t beam_send_cleanup(void *data) /* sender is going away, clear up all references to its memory */ r_purge_sent(beam); h2_blist_cleanup(&beam->send_list); - report_consumption(beam); + report_consumption(beam, NULL); while (!H2_BPROXY_LIST_EMPTY(&beam->proxies)) { h2_beam_proxy *proxy = H2_BPROXY_LIST_FIRST(&beam->proxies); H2_BPROXY_REMOVE(proxy); @@ -535,7 +559,8 @@ apr_status_t h2_beam_destroy(h2_bucket_beam *beam) apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *pool, int id, const char *tag, h2_beam_owner_t owner, - apr_size_t max_buf_size) + apr_size_t max_buf_size, + apr_interval_time_t timeout) { h2_bucket_beam *beam; apr_status_t status = APR_SUCCESS; @@ -555,10 +580,17 @@ apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *pool, H2_BPROXY_LIST_INIT(&beam->proxies); beam->tx_mem_limits = 1; beam->max_buf_size = max_buf_size; - apr_pool_pre_cleanup_register(pool, beam, beam_cleanup); + beam->timeout = timeout; - *pbeam = beam; - + status = apr_thread_mutex_create(&beam->lock, APR_THREAD_MUTEX_DEFAULT, + pool); + if (status == APR_SUCCESS) { + status = apr_thread_cond_create(&beam->cond, pool); + if (status == APR_SUCCESS) { + apr_pool_pre_cleanup_register(pool, beam, beam_cleanup); + *pbeam = beam; + } + } return status; } @@ -586,7 +618,6 @@ apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam) void h2_beam_mutex_set(h2_bucket_beam *beam, h2_beam_mutex_enter m_enter, - apr_thread_cond_t *cond, void *m_ctx) { h2_beam_lock bl; @@ -594,11 +625,20 @@ void h2_beam_mutex_set(h2_bucket_beam *beam, if (enter_yellow(beam, &bl) == APR_SUCCESS) { beam->m_enter = m_enter; beam->m_ctx = m_ctx; - beam->m_cond = cond; leave_yellow(beam, &bl); } } +void h2_beam_mutex_enable(h2_bucket_beam *beam) +{ + h2_beam_mutex_set(beam, mutex_enter, beam); +} + +void h2_beam_mutex_disable(h2_bucket_beam *beam) +{ + h2_beam_mutex_set(beam, NULL, NULL); +} + void h2_beam_timeout_set(h2_bucket_beam *beam, apr_interval_time_t timeout) { h2_beam_lock bl; @@ -630,10 +670,10 @@ void h2_beam_abort(h2_bucket_beam *beam) beam->aborted = 1; r_purge_sent(beam); h2_blist_cleanup(&beam->send_list); - report_consumption(beam); + report_consumption(beam, &bl); } - if (beam->m_cond) { - apr_thread_cond_broadcast(beam->m_cond); + if (beam->cond) { + apr_thread_cond_broadcast(beam->cond); } leave_yellow(beam, &bl); } @@ -646,7 +686,7 @@ apr_status_t h2_beam_close(h2_bucket_beam *beam) if (enter_yellow(beam, &bl) == APR_SUCCESS) { r_purge_sent(beam); beam_close(beam); - report_consumption(beam); + report_consumption(beam, &bl); leave_yellow(beam, &bl); } return beam->aborted? APR_ECONNABORTED : APR_SUCCESS; @@ -680,8 +720,8 @@ apr_status_t h2_beam_wait_empty(h2_bucket_beam *beam, apr_read_type_e block) status = APR_EAGAIN; break; } - if (beam->m_cond) { - apr_thread_cond_broadcast(beam->m_cond); + if (beam->cond) { + apr_thread_cond_broadcast(beam->cond); } status = wait_cond(beam, bl.mutex); } @@ -868,12 +908,12 @@ apr_status_t h2_beam_send(h2_bucket_beam *beam, b = APR_BRIGADE_FIRST(sender_bb); status = append_bucket(beam, b, block, &bl); } - report_prod_io(beam, force_report); - if (beam->m_cond) { - apr_thread_cond_broadcast(beam->m_cond); + report_prod_io(beam, force_report, &bl); + if (beam->cond) { + apr_thread_cond_broadcast(beam->cond); } } - report_consumption(beam); + report_consumption(beam, &bl); leave_yellow(beam, &bl); } return status; @@ -940,6 +980,11 @@ transfer: bb->p, bb->bucket_alloc); } } + else if (bsender->length == 0) { + APR_BUCKET_REMOVE(bsender); + H2_BLIST_INSERT_TAIL(&beam->hold_list, bsender); + continue; + } else if (APR_BUCKET_IS_FILE(bsender)) { /* This is set aside into the target brigade pool so that * any read operation messes with that pool and not @@ -1040,15 +1085,15 @@ transfer: } if (transferred) { - if (beam->m_cond) { - apr_thread_cond_broadcast(beam->m_cond); + if (beam->cond) { + apr_thread_cond_broadcast(beam->cond); } status = APR_SUCCESS; } else if (beam->closed) { status = APR_EOF; } - else if (block == APR_BLOCK_READ && bl.mutex && beam->m_cond) { + else if (block == APR_BLOCK_READ && bl.mutex && beam->cond) { status = wait_cond(beam, bl.mutex); if (status != APR_SUCCESS) { goto leave; @@ -1056,8 +1101,8 @@ transfer: goto transfer; } else { - if (beam->m_cond) { - apr_thread_cond_broadcast(beam->m_cond); + if (beam->cond) { + apr_thread_cond_broadcast(beam->cond); } status = APR_EAGAIN; } @@ -1198,7 +1243,7 @@ int h2_beam_report_consumption(h2_bucket_beam *beam) h2_beam_lock bl; int rv = 0; if (enter_yellow(beam, &bl) == APR_SUCCESS) { - rv = report_consumption(beam); + rv = report_consumption(beam, &bl); leave_yellow(beam, &bl); } return rv; diff --git a/modules/http2/h2_bucket_beam.h b/modules/http2/h2_bucket_beam.h index 2b54eee8b0..18bc32629f 100644 --- a/modules/http2/h2_bucket_beam.h +++ b/modules/http2/h2_bucket_beam.h @@ -190,9 +190,10 @@ struct h2_bucket_beam { unsigned int close_sent : 1; unsigned int tx_mem_limits : 1; /* only memory size counts on transfers */ + struct apr_thread_mutex_t *lock; + struct apr_thread_cond_t *cond; void *m_ctx; h2_beam_mutex_enter *m_enter; - struct apr_thread_cond_t *m_cond; apr_off_t cons_bytes_reported; /* amount of bytes reported as consumed */ h2_beam_ev_callback *cons_ev_cb; @@ -222,12 +223,14 @@ struct h2_bucket_beam { * the pool owner is using this beam for sending or receiving * @param buffer_size maximum memory footprint of buckets buffered in beam, or * 0 for no limitation + * @param timeout timeout for blocking operations */ apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *pool, int id, const char *tag, h2_beam_owner_t owner, - apr_size_t buffer_size); + apr_size_t buffer_size, + apr_interval_time_t timeout); /** * Destroys the beam immediately without cleanup. @@ -315,9 +318,11 @@ apr_status_t h2_beam_wait_empty(h2_bucket_beam *beam, apr_read_type_e block); void h2_beam_mutex_set(h2_bucket_beam *beam, h2_beam_mutex_enter m_enter, - struct apr_thread_cond_t *cond, void *m_ctx); +void h2_beam_mutex_enable(h2_bucket_beam *beam); +void h2_beam_mutex_disable(h2_bucket_beam *beam); + /** * Set/get the timeout for blocking read/write operations. Only works * if a mutex has been set for the beam. diff --git a/modules/http2/h2_conn.c b/modules/http2/h2_conn.c index 0057c3ee6b..220387db2b 100644 --- a/modules/http2/h2_conn.c +++ b/modules/http2/h2_conn.c @@ -183,6 +183,7 @@ static module *h2_conn_mpm_module(void) apr_status_t h2_conn_setup(h2_ctx *ctx, conn_rec *c, request_rec *r) { h2_session *session; + apr_status_t status; if (!workers) { ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, c, APLOGNO(02911) @@ -191,15 +192,16 @@ apr_status_t h2_conn_setup(h2_ctx *ctx, conn_rec *c, request_rec *r) } if (r) { - session = h2_session_rcreate(r, ctx, workers); + status = h2_session_rcreate(&session, r, ctx, workers); } else { - session = h2_session_create(c, ctx, workers); + status = h2_session_create(&session, c, ctx, workers); } - h2_ctx_session_set(ctx, session); - - return APR_SUCCESS; + if (status == APR_SUCCESS) { + h2_ctx_session_set(ctx, session); + } + return status; } apr_status_t h2_conn_run(struct h2_ctx *ctx, conn_rec *c) @@ -235,7 +237,20 @@ apr_status_t h2_conn_run(struct h2_ctx *ctx, conn_rec *c) && mpm_state != AP_MPMQ_STOPPING); if (c->cs) { - c->cs->state = CONN_STATE_WRITE_COMPLETION; + switch (session->state) { + case H2_SESSION_ST_INIT: + case H2_SESSION_ST_CLEANUP: + case H2_SESSION_ST_DONE: + case H2_SESSION_ST_IDLE: + c->cs->state = CONN_STATE_WRITE_COMPLETION; + break; + case H2_SESSION_ST_BUSY: + case H2_SESSION_ST_WAIT: + default: + c->cs->state = CONN_STATE_HANDLER; + break; + + } } return DONE; @@ -243,13 +258,12 @@ apr_status_t h2_conn_run(struct h2_ctx *ctx, conn_rec *c) apr_status_t h2_conn_pre_close(struct h2_ctx *ctx, conn_rec *c) { - apr_status_t status; - - status = h2_session_pre_close(h2_ctx_session_get(ctx), async_mpm); - if (status == APR_SUCCESS) { - return DONE; /* This is the same, right? */ + h2_session *session = h2_ctx_session_get(ctx); + if (session) { + apr_status_t status = h2_session_pre_close(session, async_mpm); + return (status == APR_SUCCESS)? DONE : status; } - return status; + return DONE; } conn_rec *h2_slave_create(conn_rec *master, int slave_id, apr_pool_t *parent) diff --git a/modules/http2/h2_h2.c b/modules/http2/h2_h2.c index d1743386f8..ce0247c480 100644 --- a/modules/http2/h2_h2.c +++ b/modules/http2/h2_h2.c @@ -652,6 +652,7 @@ int h2_h2_process_conn(conn_rec* c) status = h2_conn_setup(ctx, c, NULL); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c, "conn_setup"); if (status != APR_SUCCESS) { + h2_ctx_clear(c); return status; } } @@ -674,7 +675,7 @@ static int h2_h2_pre_close_conn(conn_rec *c) ctx = h2_ctx_get(c, 0); if (ctx) { /* If the session has been closed correctly already, we will not - * fiond a h2_ctx here. The presence indicates that the session + * find a h2_ctx here. The presence indicates that the session * is still ongoing. */ return h2_conn_pre_close(ctx, c); } diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index 465eb9bb4f..b8244b0a77 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -100,48 +100,43 @@ static void leave_mutex(h2_mplx *m, int acquired) } } -static void beam_leave(void *ctx, apr_thread_mutex_t *lock) -{ - leave_mutex(ctx, 1); -} - -static apr_status_t beam_enter(void *ctx, h2_beam_lock *pbl) -{ - h2_mplx *m = ctx; - int acquired; - apr_status_t status; - - status = enter_mutex(m, &acquired); - if (status == APR_SUCCESS) { - pbl->mutex = m->lock; - pbl->leave = acquired? beam_leave : NULL; - pbl->leave_ctx = m; - } - return status; -} +static void check_data_for(h2_mplx *m, int stream_id); static void stream_output_consumed(void *ctx, h2_bucket_beam *beam, apr_off_t length) { h2_stream *stream = ctx; + h2_mplx *m = stream->session->mplx; h2_task *task = stream->task; + int acquired; + if (length > 0 && task && task->assigned) { - h2_req_engine_out_consumed(task->assigned, task->c, length); + if (enter_mutex(m, &acquired) == APR_SUCCESS) { + h2_req_engine_out_consumed(task->assigned, task->c, length); + leave_mutex(m, acquired); + } } } static void stream_input_ev(void *ctx, h2_bucket_beam *beam) { h2_mplx *m = ctx; - apr_atomic_set32(&m->event_pending, 1); + apr_atomic_set32(&m->event_pending, 1); } static void stream_input_consumed(void *ctx, h2_bucket_beam *beam, apr_off_t length) { - h2_mplx *m = ctx; - if (m->input_consumed && length) { - m->input_consumed(m->input_consumed_ctx, beam->id, length); + if (length > 0) { + h2_mplx *m = ctx; + int acquired; + + if (enter_mutex(m, &acquired) == APR_SUCCESS) { + if (m->input_consumed) { + m->input_consumed(m->input_consumed_ctx, beam->id, length); + } + leave_mutex(m, acquired); + } } } @@ -161,8 +156,6 @@ static int can_beam_file(void *ctx, h2_bucket_beam *beam, apr_file_t *file) return 0; } -static void have_out_data_for(h2_mplx *m, h2_stream *stream); - static void check_tx_reservation(h2_mplx *m) { if (m->tx_handles_reserved <= 0) { @@ -190,17 +183,21 @@ static void stream_joined(h2_mplx *m, h2_stream *stream) h2_ihash_remove(m->shold, stream->id); h2_ihash_add(m->spurge, stream); - m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input); + if (stream->input) { + m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input); + } m->tx_handles_reserved += h2_beam_get_files_beamed(stream->output); } static void stream_cleanup(h2_mplx *m, h2_stream *stream) { ap_assert(stream->state == H2_SS_CLEANUP); - + + if (stream->input) { + h2_beam_on_consumed(stream->input, NULL, NULL, NULL); + h2_beam_abort(stream->input); + } h2_beam_on_produced(stream->output, NULL, NULL); - h2_beam_on_consumed(stream->input, NULL, NULL, NULL); - h2_beam_abort(stream->input); h2_beam_leave(stream->output); h2_stream_cleanup(stream); @@ -231,7 +228,6 @@ static void stream_cleanup(h2_mplx *m, h2_stream *stream) */ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, const h2_config *conf, - apr_interval_time_t stream_timeout, h2_workers *workers) { apr_status_t status = APR_SUCCESS; @@ -295,7 +291,6 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, m->q = h2_iq_create(m->pool, m->max_streams); m->readyq = h2_iq_create(m->pool, m->max_streams); - m->stream_timeout = stream_timeout; m->workers = workers; m->workers_max = workers->max_workers; m->workers_limit = 6; /* the original h1 max parallel connections */ @@ -376,18 +371,21 @@ static int stream_destroy_iter(void *ctx, void *val) h2_ihash_remove(m->spurge, stream->id); ap_assert(stream->state == H2_SS_CLEANUP); - if (stream->input == NULL || stream->output == NULL) { + if (stream->output == NULL) { ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, m->c, H2_STRM_MSG(stream, "already with beams==NULL")); return 0; } - /* Process outstanding events before destruction */ - input_consumed_signal(m, stream); - h2_beam_log(stream->input, m->c, APLOG_TRACE2, "stream_destroy"); + if (stream->input) { + /* Process outstanding events before destruction */ + input_consumed_signal(m, stream); + h2_beam_log(stream->input, m->c, APLOG_TRACE2, "stream_destroy"); + h2_beam_destroy(stream->input); + stream->input = NULL; + } + h2_beam_log(stream->output, m->c, APLOG_TRACE2, "stream_destroy"); - h2_beam_destroy(stream->input); - stream->input = NULL; h2_beam_destroy(stream->output); stream->output = NULL; if (stream->task) { @@ -581,17 +579,12 @@ void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx) static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes) { h2_mplx *m = ctx; - apr_status_t status; - h2_stream *stream; int acquired; - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - stream = h2_ihash_get(m->streams, beam->id); - if (stream) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c, - "h2_mplx(%s): output_produced", stream->task->id); - have_out_data_for(m, stream); - } + if (enter_mutex(m, &acquired) == APR_SUCCESS) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%ld-%d): output_produced", m->c->id, beam->id); + check_data_for(m, beam->id); leave_mutex(m, acquired); } } @@ -628,12 +621,12 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam) } /* time to protect the beam against multi-threaded use */ - h2_beam_mutex_set(stream->output, beam_enter, stream->task->cond, m); + h2_beam_mutex_enable(stream->output); /* we might see some file buckets in the output, see * if we have enough handles reserved. */ check_tx_reservation(m); - have_out_data_for(m, stream); + check_data_for(m, stream->id); return status; } @@ -673,7 +666,7 @@ static apr_status_t out_close(h2_mplx *m, h2_task *task) status = h2_beam_close(task->output.beam); h2_beam_log(task->output.beam, m->c, APLOG_TRACE2, "out_close"); output_consumed_signal(m, task); - have_out_data_for(m, stream); + check_data_for(m, task->stream_id); return status; } @@ -706,11 +699,10 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout, return status; } -static void have_out_data_for(h2_mplx *m, h2_stream *stream) +static void check_data_for(h2_mplx *m, int stream_id) { ap_assert(m); - ap_assert(stream); - h2_iq_append(m->readyq, stream->id); + h2_iq_append(m->readyq, stream_id); apr_atomic_set32(&m->event_pending, 1); if (m->added_output) { apr_thread_cond_signal(m->added_output); @@ -751,8 +743,7 @@ apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream, h2_ihash_add(m->streams, stream); if (h2_stream_is_ready(stream)) { /* already have a response */ - apr_atomic_set32(&m->event_pending, 1); - h2_iq_append(m->readyq, stream->id); + check_data_for(m, stream->id); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c, H2_STRM_MSG(stream, "process, add to readyq")); } @@ -812,14 +803,14 @@ static h2_task *next_stream_task(h2_mplx *m) m->max_stream_started = sid; } - h2_beam_timeout_set(stream->input, m->stream_timeout); - h2_beam_on_consumed(stream->input, stream_input_ev, - stream_input_consumed, m); - h2_beam_on_file_beam(stream->input, can_beam_file, m); - h2_beam_mutex_set(stream->input, beam_enter, stream->task->cond, m); + if (stream->input) { + h2_beam_on_consumed(stream->input, stream_input_ev, + stream_input_consumed, m); + h2_beam_on_file_beam(stream->input, can_beam_file, m); + h2_beam_mutex_enable(stream->input); + } h2_beam_buffer_size_set(stream->output, m->stream_max_mem); - h2_beam_timeout_set(stream->output, m->stream_timeout); } stream->task->worker_started = 1; stream->task->started_at = apr_time_now(); @@ -931,18 +922,22 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, H2_STRM_MSG(stream, "task_done, stream open")); /* more data will not arrive, resume the stream */ - h2_beam_mutex_set(stream->input, NULL, NULL, NULL); - h2_beam_mutex_set(stream->output, NULL, NULL, NULL); - h2_beam_leave(stream->input); - have_out_data_for(m, stream); + if (stream->input) { + h2_beam_mutex_disable(stream->input); + h2_beam_leave(stream->input); + } + h2_beam_mutex_disable(stream->output); + check_data_for(m, stream->id); } else if ((stream = h2_ihash_get(m->shold, task->stream_id)) != NULL) { ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, H2_STRM_MSG(stream, "task_done, in hold")); /* stream was just waiting for us. */ - h2_beam_mutex_set(stream->input, NULL, NULL, NULL); - h2_beam_mutex_set(stream->output, NULL, NULL, NULL); - h2_beam_leave(stream->input); + if (stream->input) { + h2_beam_mutex_disable(stream->input); + h2_beam_leave(stream->input); + } + h2_beam_mutex_disable(stream->output); stream_joined(m, stream); } else if ((stream = h2_ihash_get(m->spurge, task->stream_id)) != NULL) { @@ -1016,7 +1011,7 @@ static int timed_out_busy_iter(void *data, void *val) stream_iter_ctx *ctx = data; h2_stream *stream = val; if (stream->task && !stream->task->worker_done - && (ctx->now - stream->task->started_at) > ctx->m->stream_timeout) { + && (ctx->now - stream->task->started_at) > stream->task->timeout) { /* timed out stream occupying a worker, found */ ctx->stream = stream; return 0; @@ -1273,10 +1268,6 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, h2_stream *stream; size_t i, n; - if (!h2_mplx_has_master_events(m)) { - return APR_EAGAIN; - } - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, "h2_mplx(%ld): dispatch events", m->id); @@ -1308,11 +1299,7 @@ apr_status_t h2_mplx_keep_active(h2_mplx *m, int stream_id) int acquired; if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_stream *s = h2_ihash_get(m->streams, stream_id); - if (s) { - h2_iq_append(m->readyq, stream_id); - apr_atomic_set32(&m->event_pending, 1); - } + check_data_for(m, stream_id); leave_mutex(m, acquired); } return status; diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index 7ef9af5962..992c24cbe4 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -96,7 +96,6 @@ struct h2_mplx { struct apr_thread_cond_t *join_wait; apr_size_t stream_max_mem; - apr_interval_time_t stream_timeout; apr_pool_t *spare_io_pool; apr_array_header_t *spare_slaves; /* spare slave connections */ @@ -125,7 +124,6 @@ apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s); */ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *master, const struct h2_config *conf, - apr_interval_time_t stream_timeout, struct h2_workers *workers); /** diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c index 99a9c094d7..dd16820776 100644 --- a/modules/http2/h2_session.c +++ b/modules/http2/h2_session.c @@ -31,6 +31,7 @@ #include "h2_private.h" #include "h2.h" +#include "h2_bucket_beam.h" #include "h2_bucket_eos.h" #include "h2_config.h" #include "h2_ctx.h" @@ -73,10 +74,14 @@ static int h2_session_status_from_apr_status(apr_status_t rv) static void update_window(void *ctx, int stream_id, apr_off_t bytes_read) { h2_session *session = (h2_session*)ctx; - nghttp2_session_consume(session->ngh2, stream_id, bytes_read); - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, - "h2_stream(%ld-%d): consumed %ld bytes", - session->id, stream_id, (long)bytes_read); + while (bytes_read > 0) { + int len = (bytes_read > INT_MAX)? INT_MAX : bytes_read; + nghttp2_session_consume(session->ngh2, stream_id, (int)bytes_read); + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, + "h2_stream(%ld-%d): consumed %d bytes", + session->id, stream_id, len); + bytes_read -= len; + } } static apr_status_t h2_session_receive(void *ctx, @@ -656,26 +661,29 @@ static apr_status_t h2_session_shutdown(h2_session *session, int error, * we have, but no longer accept new ones. Report the max stream * we have received and discard all new ones. */ } - nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, - session->local.accepted_max, - error, (uint8_t*)msg, msg? strlen(msg):0); + session->local.accepting = 0; session->local.shutdown = 1; - status = nghttp2_session_send(session->ngh2); - if (status == APR_SUCCESS) { - status = h2_conn_io_flush(&session->io); + if (!session->c->aborted) { + nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, + session->local.accepted_max, + error, (uint8_t*)msg, msg? strlen(msg):0); + status = nghttp2_session_send(session->ngh2); + if (status == APR_SUCCESS) { + status = h2_conn_io_flush(&session->io); + } + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, + H2_SSSN_LOG(APLOGNO(03069), session, + "sent GOAWAY, err=%d, msg=%s"), error, msg? msg : ""); } - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, - H2_SSSN_LOG(APLOGNO(03069), session, - "sent GOAWAY, err=%d, msg=%s"), error, msg? msg : ""); dispatch_event(session, H2_SESSION_EV_LOCAL_GOAWAY, error, msg); return status; } -static apr_status_t session_pool_cleanup(void *data) +static apr_status_t session_cleanup(h2_session *session, const char *trigger) { - h2_session *session = data; - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, + conn_rec *c = session->c; + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, H2_SSSN_MSG(session, "pool_cleanup")); if (session->state != H2_SESSION_ST_DONE @@ -688,13 +696,13 @@ static apr_status_t session_pool_cleanup(void *data) * connection when sending the next request, this has the effect * that at least this one request will fail. */ - ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, session->c, + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, c, H2_SSSN_LOG(APLOGNO(03199), session, "connection disappeared without proper " "goodbye, clients will be confused, should not happen")); } - transit(session, "pool cleanup", H2_SESSION_ST_CLEANUP); + transit(session, trigger, H2_SESSION_ST_CLEANUP); h2_mplx_set_consumed_cb(session->mplx, NULL, NULL); h2_mplx_release_and_join(session->mplx, session->iowait); session->mplx = NULL; @@ -702,14 +710,39 @@ static apr_status_t session_pool_cleanup(void *data) ap_assert(session->ngh2); nghttp2_session_del(session->ngh2); session->ngh2 = NULL; + h2_ctx_clear(c); + + + return APR_SUCCESS; +} +static apr_status_t session_pool_cleanup(void *data) +{ + conn_rec *c = data; + h2_session *session; + h2_ctx *ctx = h2_ctx_get(c, 0); + + if (ctx && (session = h2_ctx_session_get(ctx))) { + /* if the session is still there, now is the last chance + * to perform cleanup. Normally, cleanup should have happened + * earlier in the connection pre_close. Main reason is that + * any ongoing requests on slave connections might still access + * data which has, at this time, already been freed. An example + * is mod_ssl that uses request hooks. */ + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, c, + H2_SSSN_LOG(APLOGNO(10020), session, + "session cleanup triggered by pool cleanup. " + "this should have happened earlier already.")); + return session_cleanup(session, "pool cleanup"); + } return APR_SUCCESS; } -static h2_session *h2_session_create_int(conn_rec *c, - request_rec *r, - h2_ctx *ctx, - h2_workers *workers) +static apr_status_t h2_session_create_int(h2_session **psession, + conn_rec *c, + request_rec *r, + h2_ctx *ctx, + h2_workers *workers) { nghttp2_session_callbacks *callbacks = NULL; nghttp2_option *options = NULL; @@ -718,136 +751,146 @@ static h2_session *h2_session_create_int(conn_rec *c, uint32_t n; apr_pool_t *pool = NULL; h2_session *session; - - apr_status_t status = apr_allocator_create(&allocator); + apr_status_t status; + int rv; + + *psession = NULL; + status = apr_allocator_create(&allocator); if (status != APR_SUCCESS) { - return NULL; + return status; } apr_allocator_max_free_set(allocator, ap_max_mem_free); apr_pool_create_ex(&pool, c->pool, NULL, allocator); if (!pool) { apr_allocator_destroy(allocator); - return NULL; + return APR_ENOMEM; } apr_pool_tag(pool, "h2_session"); apr_allocator_owner_set(allocator, pool); status = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_DEFAULT, pool); if (status != APR_SUCCESS) { apr_pool_destroy(pool); - return NULL; + return APR_ENOMEM; } apr_allocator_mutex_set(allocator, mutex); - /* get h2_session a lifetime beyond its pool and everything - * connected to it. */ session = apr_pcalloc(pool, sizeof(h2_session)); - if (session) { - int rv; - session->id = c->id; - session->c = c; - session->r = r; - session->s = h2_ctx_server_get(ctx); - session->pool = pool; - session->config = h2_config_sget(session->s); - session->workers = workers; - - session->state = H2_SESSION_ST_INIT; - session->local.accepting = 1; - session->remote.accepting = 1; - - apr_pool_pre_cleanup_register(pool, session, session_pool_cleanup); - - session->max_stream_count = h2_config_geti(session->config, - H2_CONF_MAX_STREAMS); - session->max_stream_mem = h2_config_geti(session->config, - H2_CONF_STREAM_MAX_MEM); - - status = apr_thread_cond_create(&session->iowait, session->pool); - if (status != APR_SUCCESS) { - return NULL; - } - - session->monitor = apr_pcalloc(pool, sizeof(h2_stream_monitor)); - if (session->monitor == NULL) { - return NULL; - } - session->monitor->ctx = session; - session->monitor->on_state_enter = on_stream_state_enter; - session->monitor->on_state_event = on_stream_state_event; - - session->mplx = h2_mplx_create(c, session->pool, session->config, - session->s->timeout, workers); - - h2_mplx_set_consumed_cb(session->mplx, update_window, session); - - /* Install the connection input filter that feeds the session */ - session->cin = h2_filter_cin_create(session->pool, - h2_session_receive, session); - ap_add_input_filter("H2_IN", session->cin, r, c); - - h2_conn_io_init(&session->io, c, session->config); - session->bbtmp = apr_brigade_create(session->pool, c->bucket_alloc); - - status = init_callbacks(c, &callbacks); - if (status != APR_SUCCESS) { - ap_log_cerror(APLOG_MARK, APLOG_ERR, status, c, APLOGNO(02927) - "nghttp2: error in init_callbacks"); - return NULL; - } - - rv = nghttp2_option_new(&options); - if (rv != 0) { - ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EGENERAL, c, - APLOGNO(02928) "nghttp2_option_new: %s", - nghttp2_strerror(rv)); - return NULL; - } - nghttp2_option_set_peer_max_concurrent_streams( - options, (uint32_t)session->max_stream_count); - /* We need to handle window updates ourself, otherwise we - * get flooded by nghttp2. */ - nghttp2_option_set_no_auto_window_update(options, 1); - - rv = nghttp2_session_server_new2(&session->ngh2, callbacks, - session, options); - nghttp2_session_callbacks_del(callbacks); - nghttp2_option_del(options); - - if (rv != 0) { - ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EGENERAL, c, - APLOGNO(02929) "nghttp2_session_server_new: %s", - nghttp2_strerror(rv)); - return NULL; - } - - n = h2_config_geti(session->config, H2_CONF_PUSH_DIARY_SIZE); - session->push_diary = h2_push_diary_create(session->pool, n); - - if (APLOGcdebug(c)) { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, - H2_SSSN_LOG(APLOGNO(03200), session, - "created, max_streams=%d, stream_mem=%d, " - "workers_limit=%d, workers_max=%d, " - "push_diary(type=%d,N=%d)"), - (int)session->max_stream_count, - (int)session->max_stream_mem, - session->mplx->workers_limit, - session->mplx->workers_max, - session->push_diary->dtype, - (int)session->push_diary->N); - } + if (!session) { + return APR_ENOMEM; + } + + *psession = session; + session->id = c->id; + session->c = c; + session->r = r; + session->s = h2_ctx_server_get(ctx); + session->pool = pool; + session->config = h2_config_sget(session->s); + session->workers = workers; + + session->state = H2_SESSION_ST_INIT; + session->local.accepting = 1; + session->remote.accepting = 1; + + session->max_stream_count = h2_config_geti(session->config, + H2_CONF_MAX_STREAMS); + session->max_stream_mem = h2_config_geti(session->config, + H2_CONF_STREAM_MAX_MEM); + + status = apr_thread_cond_create(&session->iowait, session->pool); + if (status != APR_SUCCESS) { + apr_pool_destroy(pool); + return status; + } + + session->monitor = apr_pcalloc(pool, sizeof(h2_stream_monitor)); + if (session->monitor == NULL) { + apr_pool_destroy(pool); + return status; + } + session->monitor->ctx = session; + session->monitor->on_state_enter = on_stream_state_enter; + session->monitor->on_state_event = on_stream_state_event; + + session->mplx = h2_mplx_create(c, session->pool, session->config, + workers); + + h2_mplx_set_consumed_cb(session->mplx, update_window, session); + + /* Install the connection input filter that feeds the session */ + session->cin = h2_filter_cin_create(session->pool, + h2_session_receive, session); + ap_add_input_filter("H2_IN", session->cin, r, c); + + h2_conn_io_init(&session->io, c, session->config); + session->bbtmp = apr_brigade_create(session->pool, c->bucket_alloc); + + status = init_callbacks(c, &callbacks); + if (status != APR_SUCCESS) { + ap_log_cerror(APLOG_MARK, APLOG_ERR, status, c, APLOGNO(02927) + "nghttp2: error in init_callbacks"); + apr_pool_destroy(pool); + return status; + } + + rv = nghttp2_option_new(&options); + if (rv != 0) { + ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EGENERAL, c, + APLOGNO(02928) "nghttp2_option_new: %s", + nghttp2_strerror(rv)); + apr_pool_destroy(pool); + return status; + } + nghttp2_option_set_peer_max_concurrent_streams( + options, (uint32_t)session->max_stream_count); + /* We need to handle window updates ourself, otherwise we + * get flooded by nghttp2. */ + nghttp2_option_set_no_auto_window_update(options, 1); + + rv = nghttp2_session_server_new2(&session->ngh2, callbacks, + session, options); + nghttp2_session_callbacks_del(callbacks); + nghttp2_option_del(options); + + if (rv != 0) { + ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EGENERAL, c, + APLOGNO(02929) "nghttp2_session_server_new: %s", + nghttp2_strerror(rv)); + apr_pool_destroy(pool); + return APR_ENOMEM; + } + + n = h2_config_geti(session->config, H2_CONF_PUSH_DIARY_SIZE); + session->push_diary = h2_push_diary_create(session->pool, n); + + if (APLOGcdebug(c)) { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, + H2_SSSN_LOG(APLOGNO(03200), session, + "created, max_streams=%d, stream_mem=%d, " + "workers_limit=%d, workers_max=%d, " + "push_diary(type=%d,N=%d)"), + (int)session->max_stream_count, + (int)session->max_stream_mem, + session->mplx->workers_limit, + session->mplx->workers_max, + session->push_diary->dtype, + (int)session->push_diary->N); } - return session; + + apr_pool_pre_cleanup_register(pool, c, session_pool_cleanup); + return APR_SUCCESS; } -h2_session *h2_session_create(conn_rec *c, h2_ctx *ctx, h2_workers *workers) +apr_status_t h2_session_create(h2_session **psession, + conn_rec *c, h2_ctx *ctx, h2_workers *workers) { - return h2_session_create_int(c, NULL, ctx, workers); + return h2_session_create_int(psession, c, NULL, ctx, workers); } -h2_session *h2_session_rcreate(request_rec *r, h2_ctx *ctx, h2_workers *workers) +apr_status_t h2_session_rcreate(h2_session **psession, + request_rec *r, h2_ctx *ctx, h2_workers *workers) { - return h2_session_create_int(r->connection, r, ctx, workers); + return h2_session_create_int(psession, r->connection, r, ctx, workers); } static apr_status_t h2_session_start(h2_session *session, int *rv) @@ -1369,7 +1412,7 @@ static apr_status_t on_stream_resume(void *ctx, h2_stream *stream) ap_assert(stream); ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, H2_STRM_MSG(stream, "on_resume")); - + send_headers: headers = NULL; status = h2_stream_out_prepare(stream, &len, &eos, &headers); @@ -1738,12 +1781,7 @@ static void ev_stream_open(h2_session *session, h2_stream *stream) } ap_assert(!stream->scheduled); - if (stream->request) { - const h2_request *r = stream->request; - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, - H2_STRM_MSG(stream, "schedule %s %s://%s%s chunked=%d"), - r->method, r->scheme, r->authority, r->path, r->chunked); - stream->scheduled = 1; + if (h2_stream_prep_processing(stream) == APR_SUCCESS) { h2_mplx_process(session->mplx, stream, stream_pri_cmp, session); } else { @@ -1919,7 +1957,6 @@ apr_status_t h2_session_process(h2_session *session, int async) } while (session->state != H2_SESSION_ST_DONE) { - trace = APLOGctrace3(c); session->have_read = session->have_written = 0; if (session->local.accepting @@ -1957,8 +1994,6 @@ apr_status_t h2_session_process(h2_session *session, int async) break; case H2_SESSION_ST_IDLE: - /* make certain, we send everything before we idle */ - h2_conn_io_flush(&session->io); /* We trust our connection into the default timeout/keepalive * handling of the core filters/mpm iff: * - keep_sync_until is not set @@ -1975,6 +2010,7 @@ apr_status_t h2_session_process(h2_session *session, int async) "nonblock read, %d streams open"), session->open_streams); } + h2_conn_io_flush(&session->io); status = h2_session_read(session, 0); if (status == APR_SUCCESS) { @@ -2001,6 +2037,8 @@ apr_status_t h2_session_process(h2_session *session, int async) } } else { + /* make certain, we send everything before we idle */ + h2_conn_io_flush(&session->io); if (trace) { ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, c, H2_SSSN_MSG(session, @@ -2187,19 +2225,24 @@ out: dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL); } - status = APR_SUCCESS; - if (session->state == H2_SESSION_ST_DONE) { - status = APR_EOF; - } - - return status; + return (session->state == H2_SESSION_ST_DONE)? APR_EOF : APR_SUCCESS; } apr_status_t h2_session_pre_close(h2_session *session, int async) { + apr_status_t status; + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, H2_SSSN_MSG(session, "pre_close")); dispatch_event(session, H2_SESSION_EV_PRE_CLOSE, 0, (session->state == H2_SESSION_ST_IDLE)? "timeout" : NULL); - return APR_SUCCESS; + status = session_cleanup(session, "pre_close"); + if (status == APR_SUCCESS) { + /* no one should hold a reference to this session any longer and + * the h2_ctx was removed from the connection. + * Take the pool (and thus all subpools etc. down now, instead of + * during cleanup of main connection pool. */ + apr_pool_destroy(session->pool); + } + return status; } diff --git a/modules/http2/h2_session.h b/modules/http2/h2_session.h index 76a03f3c41..fb3cd3d573 100644 --- a/modules/http2/h2_session.h +++ b/modules/http2/h2_session.h @@ -132,24 +132,28 @@ const char *h2_session_state_str(h2_session_state state); /** * Create a new h2_session for the given connection. * The session will apply the configured parameter. + * @param psession pointer receiving the created session on success or NULL * @param c the connection to work on * @param cfg the module config to apply * @param workers the worker pool to use * @return the created session */ -h2_session *h2_session_create(conn_rec *c, struct h2_ctx *ctx, - struct h2_workers *workers); +apr_status_t h2_session_create(h2_session **psession, + conn_rec *c, struct h2_ctx *ctx, + struct h2_workers *workers); /** * Create a new h2_session for the given request. * The session will apply the configured parameter. + * @param psession pointer receiving the created session on success or NULL * @param r the request that was upgraded * @param cfg the module config to apply * @param workers the worker pool to use * @return the created session */ -h2_session *h2_session_rcreate(request_rec *r, struct h2_ctx *ctx, - struct h2_workers *workers); +apr_status_t h2_session_rcreate(h2_session **psession, + request_rec *r, struct h2_ctx *ctx, + struct h2_workers *workers); /** * Process the given HTTP/2 session until it is ended or a fatal diff --git a/modules/http2/h2_stream.c b/modules/http2/h2_stream.c index c4b1227d6a..1108e4d10d 100644 --- a/modules/http2/h2_stream.c +++ b/modules/http2/h2_stream.c @@ -170,14 +170,23 @@ static void H2_STREAM_OUT_LOG(int lvl, h2_stream *s, const char *tag) } } +static apr_status_t setup_input(h2_stream *stream) { + if (stream->input == NULL && !stream->input_eof) { + h2_beam_create(&stream->input, stream->pool, stream->id, + "input", H2_BEAM_OWNER_SEND, 0, + stream->session->s->timeout); + h2_beam_send_from(stream->input, stream->pool); + } + return APR_SUCCESS; +} + static apr_status_t close_input(h2_stream *stream) { conn_rec *c = stream->session->c; - apr_status_t status; - apr_bucket_brigade *tmp; - apr_bucket *b; + apr_status_t status = APR_SUCCESS; - if (h2_beam_is_closed(stream->input)) { + stream->input_eof = 1; + if (stream->input && h2_beam_is_closed(stream->input)) { return APR_SUCCESS; } @@ -187,22 +196,30 @@ static apr_status_t close_input(h2_stream *stream) return APR_ECONNRESET; } - tmp = apr_brigade_create(stream->pool, c->bucket_alloc); if (stream->trailers && !apr_is_empty_table(stream->trailers)) { - h2_headers *r = h2_headers_create(HTTP_OK, stream->trailers, - NULL, stream->pool); + apr_bucket_brigade *tmp; + apr_bucket *b; + h2_headers *r; + + tmp = apr_brigade_create(stream->pool, c->bucket_alloc); + + r = h2_headers_create(HTTP_OK, stream->trailers, NULL, stream->pool); + stream->trailers = NULL; b = h2_bucket_headers_create(c->bucket_alloc, r); APR_BRIGADE_INSERT_TAIL(tmp, b); - stream->trailers = NULL; + + b = apr_bucket_eos_create(c->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(tmp, b); + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, H2_STRM_MSG(stream, "added trailers")); + setup_input(stream); + status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ); + apr_brigade_destroy(tmp); + } + if (stream->input) { + return h2_beam_close(stream->input); } - - b = apr_bucket_eos_create(c->bucket_alloc); - APR_BRIGADE_INSERT_TAIL(tmp, b); - status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ); - apr_brigade_destroy(tmp); - h2_beam_close(stream->input); return status; } @@ -440,18 +457,16 @@ apr_status_t h2_stream_recv_DATA(h2_stream *stream, uint8_t flags, apr_bucket_brigade *tmp; ap_assert(stream); - if (!stream->input) { - return APR_EOF; + if (len > 0) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, + H2_STRM_MSG(stream, "recv DATA, len=%d"), (int)len); + + tmp = apr_brigade_create(stream->pool, session->c->bucket_alloc); + apr_brigade_write(tmp, NULL, NULL, (const char *)data, len); + setup_input(stream); + status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ); + apr_brigade_destroy(tmp); } - - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, - H2_STRM_MSG(stream, "recv DATA, len=%d"), (int)len); - - tmp = apr_brigade_create(stream->pool, session->c->bucket_alloc); - apr_brigade_write(tmp, NULL, NULL, (const char *)data, len); - status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ); - apr_brigade_destroy(tmp); - stream->in_data_frames++; stream->in_data_octets += len; return status; @@ -478,9 +493,8 @@ h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session, stream->monitor = monitor; stream->max_mem = session->max_stream_mem; - h2_beam_create(&stream->input, pool, id, "input", H2_BEAM_OWNER_SEND, 0); - h2_beam_send_from(stream->input, stream->pool); - h2_beam_create(&stream->output, pool, id, "output", H2_BEAM_OWNER_RECV, 0); + h2_beam_create(&stream->output, pool, id, "output", H2_BEAM_OWNER_RECV, 0, + session->s->timeout); ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, H2_STRM_LOG(APLOGNO(03082), stream, "created")); @@ -498,14 +512,16 @@ void h2_stream_cleanup(h2_stream *stream) * references into request pools */ apr_brigade_cleanup(stream->out_buffer); } - h2_beam_abort(stream->input); - status = h2_beam_wait_empty(stream->input, APR_NONBLOCK_READ); - if (status == APR_EAGAIN) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, - H2_STRM_MSG(stream, "wait on input drain")); - status = h2_beam_wait_empty(stream->input, APR_BLOCK_READ); - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c, - H2_STRM_MSG(stream, "input drain returned")); + if (stream->input) { + h2_beam_abort(stream->input); + status = h2_beam_wait_empty(stream->input, APR_NONBLOCK_READ); + if (status == APR_EAGAIN) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, + H2_STRM_MSG(stream, "wait on input drain")); + status = h2_beam_wait_empty(stream->input, APR_BLOCK_READ); + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c, + H2_STRM_MSG(stream, "input drain returned")); + } } } @@ -527,10 +543,26 @@ apr_pool_t *h2_stream_detach_pool(h2_stream *stream) return pool; } +apr_status_t h2_stream_prep_processing(h2_stream *stream) +{ + if (stream->request) { + const h2_request *r = stream->request; + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, + H2_STRM_MSG(stream, "schedule %s %s://%s%s chunked=%d"), + r->method, r->scheme, r->authority, r->path, r->chunked); + setup_input(stream); + stream->scheduled = 1; + return APR_SUCCESS; + } + return APR_EINVAL; +} + void h2_stream_rst(h2_stream *stream, int error_code) { stream->rst_error = error_code; - h2_beam_abort(stream->input); + if (stream->input) { + h2_beam_abort(stream->input); + } h2_beam_leave(stream->output); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, H2_STRM_MSG(stream, "reset, error=%d"), error_code); diff --git a/modules/http2/h2_stream.h b/modules/http2/h2_stream.h index 86b6da898f..241c4fefe5 100644 --- a/modules/http2/h2_stream.h +++ b/modules/http2/h2_stream.h @@ -77,6 +77,7 @@ struct h2_stream { unsigned int aborted : 1; /* was aborted */ unsigned int scheduled : 1; /* stream has been scheduled */ unsigned int has_response : 1; /* response headers are known */ + unsigned int input_eof : 1; /* no more request data coming */ unsigned int push_policy; /* which push policy to use for this request */ struct h2_task *task; /* assigned task to fullfill request */ @@ -110,6 +111,8 @@ h2_stream *h2_stream_create(int id, apr_pool_t *pool, */ void h2_stream_destroy(h2_stream *stream); +apr_status_t h2_stream_prep_processing(h2_stream *stream); + /* * Set a new monitor for this stream, replacing any existing one. Can * be called with NULL to have no monitor installed. diff --git a/modules/http2/h2_switch.c b/modules/http2/h2_switch.c index d1d4a60f5d..8a8d56e59e 100644 --- a/modules/http2/h2_switch.c +++ b/modules/http2/h2_switch.c @@ -160,6 +160,7 @@ static int h2_protocol_switch(conn_rec *c, request_rec *r, server_rec *s, if (status != APR_SUCCESS) { ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, APLOGNO(03088) "session setup"); + h2_ctx_clear(c); return status; } diff --git a/modules/http2/h2_task.c b/modules/http2/h2_task.c index 0190f8a889..3c2810a294 100644 --- a/modules/http2/h2_task.c +++ b/modules/http2/h2_task.c @@ -17,7 +17,6 @@ #include #include -#include #include #include @@ -386,7 +385,7 @@ static apr_status_t h2_filter_parse_h1(ap_filter_t* f, apr_bucket_brigade* bb) ******************************************************************************/ int h2_task_can_redo(h2_task *task) { - if (h2_beam_was_received(task->input.beam)) { + if (task->input.beam && h2_beam_was_received(task->input.beam)) { /* cannot repeat that. */ return 0; } @@ -403,7 +402,9 @@ void h2_task_redo(h2_task *task) void h2_task_rst(h2_task *task, int error) { task->rst_error = error; - h2_beam_leave(task->input.beam); + if (task->input.beam) { + h2_beam_leave(task->input.beam); + } if (!task->worker_done) { h2_beam_abort(task->output.beam); } @@ -506,9 +507,9 @@ h2_task *h2_task_create(h2_stream *stream, conn_rec *slave) task->request = stream->request; task->input.beam = stream->input; task->output.beam = stream->output; + task->timeout = stream->session->s->timeout; h2_beam_send_from(stream->output, task->pool); - apr_thread_cond_create(&task->cond, pool); h2_ctx_create_for(slave, task); return task; @@ -601,6 +602,15 @@ static apr_status_t h2_task_process_request(h2_task *task, conn_rec *c) "h2_task(%s): create request_rec", task->id); r = h2_request_create_rec(req, c); if (r && (r->status == HTTP_OK)) { + /* set timeouts for virtual host of request */ + if (task->timeout != r->server->timeout) { + task->timeout = r->server->timeout; + h2_beam_timeout_set(task->output.beam, task->timeout); + if (task->input.beam) { + h2_beam_timeout_set(task->input.beam, task->timeout); + } + } + ap_update_child_status(c->sbh, SERVER_BUSY_WRITE, r); if (cs) { diff --git a/modules/http2/h2_task.h b/modules/http2/h2_task.h index f004a4f92b..b2aaf80777 100644 --- a/modules/http2/h2_task.h +++ b/modules/http2/h2_task.h @@ -37,7 +37,6 @@ * of our own to disble those. */ -struct apr_thread_cond_t; struct h2_bucket_beam; struct h2_conn; struct h2_mplx; @@ -57,6 +56,7 @@ struct h2_task { apr_pool_t *pool; const struct h2_request *request; + apr_interval_time_t timeout; int rst_error; /* h2 related stream abort error */ struct { @@ -76,7 +76,6 @@ struct h2_task { } output; struct h2_mplx *mplx; - struct apr_thread_cond_t *cond; unsigned int filters_set : 1; unsigned int frozen : 1; diff --git a/modules/http2/h2_version.h b/modules/http2/h2_version.h index 0d7508ba01..6dcee88135 100644 --- a/modules/http2/h2_version.h +++ b/modules/http2/h2_version.h @@ -26,7 +26,7 @@ * @macro * Version number of the http2 module as c string */ -#define MOD_HTTP2_VERSION "1.9.2" +#define MOD_HTTP2_VERSION "1.9.3" /** * @macro @@ -34,7 +34,7 @@ * release. This is a 24 bit number with 8 bits for major number, 8 bits * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203. */ -#define MOD_HTTP2_VERSION_NUM 0x010902 +#define MOD_HTTP2_VERSION_NUM 0x010903 #endif /* mod_h2_h2_version_h */ diff --git a/modules/http2/mod_proxy_http2.c b/modules/http2/mod_proxy_http2.c index 91f09ec83a..ef23d0c428 100644 --- a/modules/http2/mod_proxy_http2.c +++ b/modules/http2/mod_proxy_http2.c @@ -276,6 +276,9 @@ static void request_done(h2_proxy_session *session, request_rec *r, h2_proxy_ctx *ctx = session->user_data; const char *task_id = apr_table_get(r->connection->notes, H2_TASK_ID_NOTE); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, r->connection, + "h2_proxy_session(%s): request done %s, touched=%d", + ctx->engine_id, task_id, touched); if (status != APR_SUCCESS) { if (!touched) { /* untouched request, need rescheduling */ @@ -289,6 +292,12 @@ static void request_done(h2_proxy_session *session, request_rec *r, return; } } + else if (!ctx->next) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, r->connection, + "h2_proxy_session(%s): retry untouched request", + ctx->engine_id); + ctx->next = r; + } } else { const char *uri; @@ -620,7 +629,7 @@ run_session: } cleanup: - if (!reconnected && ctx->engine && next_request(ctx, 1) == APR_SUCCESS) { + if (!reconnected && next_request(ctx, 1) == APR_SUCCESS) { /* Still more to do, tear down old conn and start over */ if (ctx->p_conn) { ctx->p_conn->close = 1; -- 2.40.0