From: Stefan Eissing Date: Mon, 27 Feb 2017 14:30:50 +0000 (+0000) Subject: On the trunk: X-Git-Tag: 2.5.0-alpha~597 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=5b6d27b08f5bd7de7508aaeed7801509dca2273c;p=apache On the trunk: mod_http2: separate mutex instances for each bucket beam, resulting in less lock contention. input beams only created when necessary. git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1784571 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/CHANGES b/CHANGES index 2affdfb7e8..cf143aa541 100644 --- a/CHANGES +++ b/CHANGES @@ -1,6 +1,10 @@ -*- coding: utf-8 -*- Changes with Apache 2.5.0 + *) mod_http2: separate mutex instances for each bucket beam, resulting in + less lock contention. input beams only created when necessary. + [Stefan Eissing] + *) mod_syslog: Support use of optional "tag" in syslog entries. PR 60525. [Ben Rubson , Jim Jagielski] diff --git a/modules/http2/h2_bucket_beam.c b/modules/http2/h2_bucket_beam.c index 3d644fd68e..46919b9285 100644 --- a/modules/http2/h2_bucket_beam.c +++ b/modules/http2/h2_bucket_beam.c @@ -195,6 +195,19 @@ static apr_bucket *h2_beam_bucket(h2_bucket_beam *beam, * bucket beam that can transport buckets across threads ******************************************************************************/ +static void mutex_leave(void *ctx, apr_thread_mutex_t *lock) +{ + apr_thread_mutex_unlock(lock); +} + +static apr_status_t mutex_enter(void *ctx, h2_beam_lock *pbl) +{ + h2_bucket_beam *beam = ctx; + pbl->mutex = beam->lock; + pbl->leave = mutex_leave; + return apr_thread_mutex_lock(pbl->mutex); +} + static apr_status_t enter_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl) { h2_beam_mutex_enter *enter = beam->m_enter; @@ -227,26 +240,37 @@ static apr_off_t bucket_mem_used(apr_bucket *b) } } -static int report_consumption(h2_bucket_beam *beam) +static int report_consumption(h2_bucket_beam *beam, h2_beam_lock *pbl) { int rv = 0; - if (beam->cons_io_cb) { - beam->cons_io_cb(beam->cons_ctx, beam, beam->received_bytes - - beam->cons_bytes_reported); + apr_off_t len = beam->received_bytes - beam->cons_bytes_reported; + h2_beam_io_callback *cb = beam->cons_io_cb; + + if (cb) { + void *ctx = beam->cons_ctx; + + if (pbl) leave_yellow(beam, pbl); + cb(ctx, beam, len); + if (pbl) enter_yellow(beam, pbl); rv = 1; } - beam->cons_bytes_reported = beam->received_bytes; + beam->cons_bytes_reported += len; return rv; } -static void report_prod_io(h2_bucket_beam *beam, int force) +static void report_prod_io(h2_bucket_beam *beam, int force, h2_beam_lock *pbl) { - if (force || beam->prod_bytes_reported != beam->sent_bytes) { - if (beam->prod_io_cb) { - beam->prod_io_cb(beam->prod_ctx, beam, beam->sent_bytes - - beam->prod_bytes_reported); + apr_off_t len = beam->sent_bytes - beam->prod_bytes_reported; + if (force || len > 0) { + h2_beam_io_callback *cb = beam->prod_io_cb; + if (cb) { + void *ctx = beam->prod_ctx; + + leave_yellow(beam, pbl); + cb(ctx, beam, len); + enter_yellow(beam, pbl); } - beam->prod_bytes_reported = beam->sent_bytes; + beam->prod_bytes_reported += len; } } @@ -293,10 +317,10 @@ static apr_size_t calc_space_left(h2_bucket_beam *beam) static apr_status_t wait_cond(h2_bucket_beam *beam, apr_thread_mutex_t *lock) { if (beam->timeout > 0) { - return apr_thread_cond_timedwait(beam->m_cond, lock, beam->timeout); + return apr_thread_cond_timedwait(beam->cond, lock, beam->timeout); } else { - return apr_thread_cond_wait(beam->m_cond, lock); + return apr_thread_cond_wait(beam->cond, lock); } } @@ -307,7 +331,7 @@ static apr_status_t r_wait_space(h2_bucket_beam *beam, apr_read_type_e block, while (!beam->aborted && *premain <= 0 && (block == APR_BLOCK_READ) && pbl->mutex) { apr_status_t status; - report_prod_io(beam, 1); + report_prod_io(beam, 1, pbl); status = wait_cond(beam, pbl->mutex); if (APR_STATUS_IS_TIMEUP(status)) { return status; @@ -378,8 +402,8 @@ static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy) if (!bl.mutex) { r_purge_sent(beam); } - else if (beam->m_cond) { - apr_thread_cond_broadcast(beam->m_cond); + else if (beam->cond) { + apr_thread_cond_broadcast(beam->cond); } leave_yellow(beam, &bl); } @@ -399,8 +423,8 @@ static apr_status_t beam_close(h2_bucket_beam *beam) { if (!beam->closed) { beam->closed = 1; - if (beam->m_cond) { - apr_thread_cond_broadcast(beam->m_cond); + if (beam->cond) { + apr_thread_cond_broadcast(beam->cond); } } return APR_SUCCESS; @@ -445,7 +469,7 @@ static apr_status_t beam_send_cleanup(void *data) /* sender is going away, clear up all references to its memory */ r_purge_sent(beam); h2_blist_cleanup(&beam->send_list); - report_consumption(beam); + report_consumption(beam, NULL); while (!H2_BPROXY_LIST_EMPTY(&beam->proxies)) { h2_beam_proxy *proxy = H2_BPROXY_LIST_FIRST(&beam->proxies); H2_BPROXY_REMOVE(proxy); @@ -555,10 +579,16 @@ apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *pool, H2_BPROXY_LIST_INIT(&beam->proxies); beam->tx_mem_limits = 1; beam->max_buf_size = max_buf_size; - apr_pool_pre_cleanup_register(pool, beam, beam_cleanup); - *pbeam = beam; - + status = apr_thread_mutex_create(&beam->lock, APR_THREAD_MUTEX_DEFAULT, + pool); + if (status == APR_SUCCESS) { + status = apr_thread_cond_create(&beam->cond, pool); + if (status == APR_SUCCESS) { + apr_pool_pre_cleanup_register(pool, beam, beam_cleanup); + *pbeam = beam; + } + } return status; } @@ -586,7 +616,6 @@ apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam) void h2_beam_mutex_set(h2_bucket_beam *beam, h2_beam_mutex_enter m_enter, - apr_thread_cond_t *cond, void *m_ctx) { h2_beam_lock bl; @@ -594,11 +623,20 @@ void h2_beam_mutex_set(h2_bucket_beam *beam, if (enter_yellow(beam, &bl) == APR_SUCCESS) { beam->m_enter = m_enter; beam->m_ctx = m_ctx; - beam->m_cond = cond; leave_yellow(beam, &bl); } } +void h2_beam_mutex_enable(h2_bucket_beam *beam) +{ + h2_beam_mutex_set(beam, mutex_enter, beam); +} + +void h2_beam_mutex_disable(h2_bucket_beam *beam) +{ + h2_beam_mutex_set(beam, NULL, NULL); +} + void h2_beam_timeout_set(h2_bucket_beam *beam, apr_interval_time_t timeout) { h2_beam_lock bl; @@ -630,10 +668,10 @@ void h2_beam_abort(h2_bucket_beam *beam) beam->aborted = 1; r_purge_sent(beam); h2_blist_cleanup(&beam->send_list); - report_consumption(beam); + report_consumption(beam, &bl); } - if (beam->m_cond) { - apr_thread_cond_broadcast(beam->m_cond); + if (beam->cond) { + apr_thread_cond_broadcast(beam->cond); } leave_yellow(beam, &bl); } @@ -646,7 +684,7 @@ apr_status_t h2_beam_close(h2_bucket_beam *beam) if (enter_yellow(beam, &bl) == APR_SUCCESS) { r_purge_sent(beam); beam_close(beam); - report_consumption(beam); + report_consumption(beam, &bl); leave_yellow(beam, &bl); } return beam->aborted? APR_ECONNABORTED : APR_SUCCESS; @@ -680,8 +718,8 @@ apr_status_t h2_beam_wait_empty(h2_bucket_beam *beam, apr_read_type_e block) status = APR_EAGAIN; break; } - if (beam->m_cond) { - apr_thread_cond_broadcast(beam->m_cond); + if (beam->cond) { + apr_thread_cond_broadcast(beam->cond); } status = wait_cond(beam, bl.mutex); } @@ -868,12 +906,12 @@ apr_status_t h2_beam_send(h2_bucket_beam *beam, b = APR_BRIGADE_FIRST(sender_bb); status = append_bucket(beam, b, block, &bl); } - report_prod_io(beam, force_report); - if (beam->m_cond) { - apr_thread_cond_broadcast(beam->m_cond); + report_prod_io(beam, force_report, &bl); + if (beam->cond) { + apr_thread_cond_broadcast(beam->cond); } } - report_consumption(beam); + report_consumption(beam, &bl); leave_yellow(beam, &bl); } return status; @@ -1040,15 +1078,15 @@ transfer: } if (transferred) { - if (beam->m_cond) { - apr_thread_cond_broadcast(beam->m_cond); + if (beam->cond) { + apr_thread_cond_broadcast(beam->cond); } status = APR_SUCCESS; } else if (beam->closed) { status = APR_EOF; } - else if (block == APR_BLOCK_READ && bl.mutex && beam->m_cond) { + else if (block == APR_BLOCK_READ && bl.mutex && beam->cond) { status = wait_cond(beam, bl.mutex); if (status != APR_SUCCESS) { goto leave; @@ -1056,8 +1094,8 @@ transfer: goto transfer; } else { - if (beam->m_cond) { - apr_thread_cond_broadcast(beam->m_cond); + if (beam->cond) { + apr_thread_cond_broadcast(beam->cond); } status = APR_EAGAIN; } @@ -1198,7 +1236,7 @@ int h2_beam_report_consumption(h2_bucket_beam *beam) h2_beam_lock bl; int rv = 0; if (enter_yellow(beam, &bl) == APR_SUCCESS) { - rv = report_consumption(beam); + rv = report_consumption(beam, &bl); leave_yellow(beam, &bl); } return rv; diff --git a/modules/http2/h2_bucket_beam.h b/modules/http2/h2_bucket_beam.h index 2b54eee8b0..0984d7b3e1 100644 --- a/modules/http2/h2_bucket_beam.h +++ b/modules/http2/h2_bucket_beam.h @@ -190,9 +190,10 @@ struct h2_bucket_beam { unsigned int close_sent : 1; unsigned int tx_mem_limits : 1; /* only memory size counts on transfers */ + struct apr_thread_mutex_t *lock; + struct apr_thread_cond_t *cond; void *m_ctx; h2_beam_mutex_enter *m_enter; - struct apr_thread_cond_t *m_cond; apr_off_t cons_bytes_reported; /* amount of bytes reported as consumed */ h2_beam_ev_callback *cons_ev_cb; @@ -315,9 +316,11 @@ apr_status_t h2_beam_wait_empty(h2_bucket_beam *beam, apr_read_type_e block); void h2_beam_mutex_set(h2_bucket_beam *beam, h2_beam_mutex_enter m_enter, - struct apr_thread_cond_t *cond, void *m_ctx); +void h2_beam_mutex_enable(h2_bucket_beam *beam); +void h2_beam_mutex_disable(h2_bucket_beam *beam); + /** * Set/get the timeout for blocking read/write operations. Only works * if a mutex has been set for the beam. diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index 465eb9bb4f..1b6680d696 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -100,33 +100,19 @@ static void leave_mutex(h2_mplx *m, int acquired) } } -static void beam_leave(void *ctx, apr_thread_mutex_t *lock) -{ - leave_mutex(ctx, 1); -} - -static apr_status_t beam_enter(void *ctx, h2_beam_lock *pbl) -{ - h2_mplx *m = ctx; - int acquired; - apr_status_t status; - - status = enter_mutex(m, &acquired); - if (status == APR_SUCCESS) { - pbl->mutex = m->lock; - pbl->leave = acquired? beam_leave : NULL; - pbl->leave_ctx = m; - } - return status; -} - static void stream_output_consumed(void *ctx, h2_bucket_beam *beam, apr_off_t length) { h2_stream *stream = ctx; + h2_mplx *m = stream->session->mplx; h2_task *task = stream->task; + int acquired; + if (length > 0 && task && task->assigned) { - h2_req_engine_out_consumed(task->assigned, task->c, length); + if (enter_mutex(m, &acquired) == APR_SUCCESS) { + h2_req_engine_out_consumed(task->assigned, task->c, length); + leave_mutex(m, acquired); + } } } @@ -139,9 +125,16 @@ static void stream_input_ev(void *ctx, h2_bucket_beam *beam) static void stream_input_consumed(void *ctx, h2_bucket_beam *beam, apr_off_t length) { - h2_mplx *m = ctx; - if (m->input_consumed && length) { - m->input_consumed(m->input_consumed_ctx, beam->id, length); + if (length > 0) { + h2_mplx *m = ctx; + int acquired; + + if (enter_mutex(m, &acquired) == APR_SUCCESS) { + if (m->input_consumed) { + m->input_consumed(m->input_consumed_ctx, beam->id, length); + } + leave_mutex(m, acquired); + } } } @@ -190,17 +183,21 @@ static void stream_joined(h2_mplx *m, h2_stream *stream) h2_ihash_remove(m->shold, stream->id); h2_ihash_add(m->spurge, stream); - m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input); + if (stream->input) { + m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input); + } m->tx_handles_reserved += h2_beam_get_files_beamed(stream->output); } static void stream_cleanup(h2_mplx *m, h2_stream *stream) { ap_assert(stream->state == H2_SS_CLEANUP); - + + if (stream->input) { + h2_beam_on_consumed(stream->input, NULL, NULL, NULL); + h2_beam_abort(stream->input); + } h2_beam_on_produced(stream->output, NULL, NULL); - h2_beam_on_consumed(stream->input, NULL, NULL, NULL); - h2_beam_abort(stream->input); h2_beam_leave(stream->output); h2_stream_cleanup(stream); @@ -376,18 +373,21 @@ static int stream_destroy_iter(void *ctx, void *val) h2_ihash_remove(m->spurge, stream->id); ap_assert(stream->state == H2_SS_CLEANUP); - if (stream->input == NULL || stream->output == NULL) { + if (stream->output == NULL) { ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, m->c, H2_STRM_MSG(stream, "already with beams==NULL")); return 0; } - /* Process outstanding events before destruction */ - input_consumed_signal(m, stream); - h2_beam_log(stream->input, m->c, APLOG_TRACE2, "stream_destroy"); + if (stream->input) { + /* Process outstanding events before destruction */ + input_consumed_signal(m, stream); + h2_beam_log(stream->input, m->c, APLOG_TRACE2, "stream_destroy"); + h2_beam_destroy(stream->input); + stream->input = NULL; + } + h2_beam_log(stream->output, m->c, APLOG_TRACE2, "stream_destroy"); - h2_beam_destroy(stream->input); - stream->input = NULL; h2_beam_destroy(stream->output); stream->output = NULL; if (stream->task) { @@ -628,7 +628,7 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam) } /* time to protect the beam against multi-threaded use */ - h2_beam_mutex_set(stream->output, beam_enter, stream->task->cond, m); + h2_beam_mutex_enable(stream->output); /* we might see some file buckets in the output, see * if we have enough handles reserved. */ @@ -812,11 +812,13 @@ static h2_task *next_stream_task(h2_mplx *m) m->max_stream_started = sid; } - h2_beam_timeout_set(stream->input, m->stream_timeout); - h2_beam_on_consumed(stream->input, stream_input_ev, - stream_input_consumed, m); - h2_beam_on_file_beam(stream->input, can_beam_file, m); - h2_beam_mutex_set(stream->input, beam_enter, stream->task->cond, m); + if (stream->input) { + h2_beam_timeout_set(stream->input, m->stream_timeout); + h2_beam_on_consumed(stream->input, stream_input_ev, + stream_input_consumed, m); + h2_beam_on_file_beam(stream->input, can_beam_file, m); + h2_beam_mutex_enable(stream->input); + } h2_beam_buffer_size_set(stream->output, m->stream_max_mem); h2_beam_timeout_set(stream->output, m->stream_timeout); @@ -931,18 +933,22 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, H2_STRM_MSG(stream, "task_done, stream open")); /* more data will not arrive, resume the stream */ - h2_beam_mutex_set(stream->input, NULL, NULL, NULL); - h2_beam_mutex_set(stream->output, NULL, NULL, NULL); - h2_beam_leave(stream->input); + if (stream->input) { + h2_beam_mutex_disable(stream->input); + h2_beam_leave(stream->input); + } + h2_beam_mutex_disable(stream->output); have_out_data_for(m, stream); } else if ((stream = h2_ihash_get(m->shold, task->stream_id)) != NULL) { ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, H2_STRM_MSG(stream, "task_done, in hold")); /* stream was just waiting for us. */ - h2_beam_mutex_set(stream->input, NULL, NULL, NULL); - h2_beam_mutex_set(stream->output, NULL, NULL, NULL); - h2_beam_leave(stream->input); + if (stream->input) { + h2_beam_mutex_disable(stream->input); + h2_beam_leave(stream->input); + } + h2_beam_mutex_disable(stream->output); stream_joined(m, stream); } else if ((stream = h2_ihash_get(m->spurge, task->stream_id)) != NULL) { diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c index 99a9c094d7..62ba81d2f9 100644 --- a/modules/http2/h2_session.c +++ b/modules/http2/h2_session.c @@ -1738,12 +1738,7 @@ static void ev_stream_open(h2_session *session, h2_stream *stream) } ap_assert(!stream->scheduled); - if (stream->request) { - const h2_request *r = stream->request; - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, - H2_STRM_MSG(stream, "schedule %s %s://%s%s chunked=%d"), - r->method, r->scheme, r->authority, r->path, r->chunked); - stream->scheduled = 1; + if (h2_stream_prep_processing(stream) == APR_SUCCESS) { h2_mplx_process(session->mplx, stream, stream_pri_cmp, session); } else { diff --git a/modules/http2/h2_stream.c b/modules/http2/h2_stream.c index c4b1227d6a..14010da4e5 100644 --- a/modules/http2/h2_stream.c +++ b/modules/http2/h2_stream.c @@ -170,14 +170,22 @@ static void H2_STREAM_OUT_LOG(int lvl, h2_stream *s, const char *tag) } } +static apr_status_t setup_input(h2_stream *stream) { + if (stream->input == NULL && !stream->input_eof) { + h2_beam_create(&stream->input, stream->pool, stream->id, + "input", H2_BEAM_OWNER_SEND, 0); + h2_beam_send_from(stream->input, stream->pool); + } + return APR_SUCCESS; +} + static apr_status_t close_input(h2_stream *stream) { conn_rec *c = stream->session->c; - apr_status_t status; - apr_bucket_brigade *tmp; - apr_bucket *b; + apr_status_t status = APR_SUCCESS; - if (h2_beam_is_closed(stream->input)) { + stream->input_eof = 1; + if (stream->input && h2_beam_is_closed(stream->input)) { return APR_SUCCESS; } @@ -187,22 +195,30 @@ static apr_status_t close_input(h2_stream *stream) return APR_ECONNRESET; } - tmp = apr_brigade_create(stream->pool, c->bucket_alloc); if (stream->trailers && !apr_is_empty_table(stream->trailers)) { - h2_headers *r = h2_headers_create(HTTP_OK, stream->trailers, - NULL, stream->pool); + apr_bucket_brigade *tmp; + apr_bucket *b; + h2_headers *r; + + tmp = apr_brigade_create(stream->pool, c->bucket_alloc); + + r = h2_headers_create(HTTP_OK, stream->trailers, NULL, stream->pool); + stream->trailers = NULL; b = h2_bucket_headers_create(c->bucket_alloc, r); APR_BRIGADE_INSERT_TAIL(tmp, b); - stream->trailers = NULL; + + b = apr_bucket_eos_create(c->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(tmp, b); + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, H2_STRM_MSG(stream, "added trailers")); + setup_input(stream); + status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ); + apr_brigade_destroy(tmp); + } + if (stream->input) { + return h2_beam_close(stream->input); } - - b = apr_bucket_eos_create(c->bucket_alloc); - APR_BRIGADE_INSERT_TAIL(tmp, b); - status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ); - apr_brigade_destroy(tmp); - h2_beam_close(stream->input); return status; } @@ -440,18 +456,16 @@ apr_status_t h2_stream_recv_DATA(h2_stream *stream, uint8_t flags, apr_bucket_brigade *tmp; ap_assert(stream); - if (!stream->input) { - return APR_EOF; + if (len > 0) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, + H2_STRM_MSG(stream, "recv DATA, len=%d"), (int)len); + + tmp = apr_brigade_create(stream->pool, session->c->bucket_alloc); + apr_brigade_write(tmp, NULL, NULL, (const char *)data, len); + setup_input(stream); + status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ); + apr_brigade_destroy(tmp); } - - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, - H2_STRM_MSG(stream, "recv DATA, len=%d"), (int)len); - - tmp = apr_brigade_create(stream->pool, session->c->bucket_alloc); - apr_brigade_write(tmp, NULL, NULL, (const char *)data, len); - status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ); - apr_brigade_destroy(tmp); - stream->in_data_frames++; stream->in_data_octets += len; return status; @@ -478,8 +492,6 @@ h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session, stream->monitor = monitor; stream->max_mem = session->max_stream_mem; - h2_beam_create(&stream->input, pool, id, "input", H2_BEAM_OWNER_SEND, 0); - h2_beam_send_from(stream->input, stream->pool); h2_beam_create(&stream->output, pool, id, "output", H2_BEAM_OWNER_RECV, 0); ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, @@ -498,14 +510,16 @@ void h2_stream_cleanup(h2_stream *stream) * references into request pools */ apr_brigade_cleanup(stream->out_buffer); } - h2_beam_abort(stream->input); - status = h2_beam_wait_empty(stream->input, APR_NONBLOCK_READ); - if (status == APR_EAGAIN) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, - H2_STRM_MSG(stream, "wait on input drain")); - status = h2_beam_wait_empty(stream->input, APR_BLOCK_READ); - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c, - H2_STRM_MSG(stream, "input drain returned")); + if (stream->input) { + h2_beam_abort(stream->input); + status = h2_beam_wait_empty(stream->input, APR_NONBLOCK_READ); + if (status == APR_EAGAIN) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, + H2_STRM_MSG(stream, "wait on input drain")); + status = h2_beam_wait_empty(stream->input, APR_BLOCK_READ); + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c, + H2_STRM_MSG(stream, "input drain returned")); + } } } @@ -527,10 +541,26 @@ apr_pool_t *h2_stream_detach_pool(h2_stream *stream) return pool; } +apr_status_t h2_stream_prep_processing(h2_stream *stream) +{ + if (stream->request) { + const h2_request *r = stream->request; + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, + H2_STRM_MSG(stream, "schedule %s %s://%s%s chunked=%d"), + r->method, r->scheme, r->authority, r->path, r->chunked); + setup_input(stream); + stream->scheduled = 1; + return APR_SUCCESS; + } + return APR_EINVAL; +} + void h2_stream_rst(h2_stream *stream, int error_code) { stream->rst_error = error_code; - h2_beam_abort(stream->input); + if (stream->input) { + h2_beam_abort(stream->input); + } h2_beam_leave(stream->output); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, H2_STRM_MSG(stream, "reset, error=%d"), error_code); diff --git a/modules/http2/h2_stream.h b/modules/http2/h2_stream.h index 86b6da898f..241c4fefe5 100644 --- a/modules/http2/h2_stream.h +++ b/modules/http2/h2_stream.h @@ -77,6 +77,7 @@ struct h2_stream { unsigned int aborted : 1; /* was aborted */ unsigned int scheduled : 1; /* stream has been scheduled */ unsigned int has_response : 1; /* response headers are known */ + unsigned int input_eof : 1; /* no more request data coming */ unsigned int push_policy; /* which push policy to use for this request */ struct h2_task *task; /* assigned task to fullfill request */ @@ -110,6 +111,8 @@ h2_stream *h2_stream_create(int id, apr_pool_t *pool, */ void h2_stream_destroy(h2_stream *stream); +apr_status_t h2_stream_prep_processing(h2_stream *stream); + /* * Set a new monitor for this stream, replacing any existing one. Can * be called with NULL to have no monitor installed. diff --git a/modules/http2/h2_task.c b/modules/http2/h2_task.c index 0190f8a889..19cbca1b2d 100644 --- a/modules/http2/h2_task.c +++ b/modules/http2/h2_task.c @@ -17,7 +17,6 @@ #include #include -#include #include #include @@ -386,7 +385,7 @@ static apr_status_t h2_filter_parse_h1(ap_filter_t* f, apr_bucket_brigade* bb) ******************************************************************************/ int h2_task_can_redo(h2_task *task) { - if (h2_beam_was_received(task->input.beam)) { + if (task->input.beam && h2_beam_was_received(task->input.beam)) { /* cannot repeat that. */ return 0; } @@ -403,7 +402,9 @@ void h2_task_redo(h2_task *task) void h2_task_rst(h2_task *task, int error) { task->rst_error = error; - h2_beam_leave(task->input.beam); + if (task->input.beam) { + h2_beam_leave(task->input.beam); + } if (!task->worker_done) { h2_beam_abort(task->output.beam); } @@ -508,7 +509,6 @@ h2_task *h2_task_create(h2_stream *stream, conn_rec *slave) task->output.beam = stream->output; h2_beam_send_from(stream->output, task->pool); - apr_thread_cond_create(&task->cond, pool); h2_ctx_create_for(slave, task); return task; diff --git a/modules/http2/h2_task.h b/modules/http2/h2_task.h index f004a4f92b..e0a426b0ad 100644 --- a/modules/http2/h2_task.h +++ b/modules/http2/h2_task.h @@ -37,7 +37,6 @@ * of our own to disble those. */ -struct apr_thread_cond_t; struct h2_bucket_beam; struct h2_conn; struct h2_mplx; @@ -76,7 +75,6 @@ struct h2_task { } output; struct h2_mplx *mplx; - struct apr_thread_cond_t *cond; unsigned int filters_set : 1; unsigned int frozen : 1; diff --git a/modules/http2/h2_version.h b/modules/http2/h2_version.h index a3f3f8528d..066487a798 100644 --- a/modules/http2/h2_version.h +++ b/modules/http2/h2_version.h @@ -26,7 +26,7 @@ * @macro * Version number of the http2 module as c string */ -#define MOD_HTTP2_VERSION "1.9.2-DEV" +#define MOD_HTTP2_VERSION "1.9.3-DEV" /** * @macro @@ -34,7 +34,7 @@ * release. This is a 24 bit number with 8 bits for major number, 8 bits * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203. */ -#define MOD_HTTP2_VERSION_NUM 0x010901 +#define MOD_HTTP2_VERSION_NUM 0x010903 #endif /* mod_h2_h2_version_h */