From 1ee834308f7d84bc10b8966d455debcf81a9edb0 Mon Sep 17 00:00:00 2001 From: Stefan Eissing Date: Thu, 27 Oct 2016 16:53:58 +0000 Subject: [PATCH] Merge of r1765328,1766424,1766691,1766851 from trunk: mod_http2: v1.7.7, connection shutdown revisited, AP_DEBUG_ASSERT transformed to real asserts git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/branches/2.4.x@1766856 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES | 7 + modules/http2/h2_bucket_beam.c | 65 +++++---- modules/http2/h2_bucket_beam.h | 13 +- modules/http2/h2_config.c | 2 +- modules/http2/h2_conn.c | 2 +- modules/http2/h2_conn_io.c | 2 +- modules/http2/h2_ctx.c | 4 +- modules/http2/h2_from_h1.c | 2 +- modules/http2/h2_mplx.c | 227 +++++++++++++------------------ modules/http2/h2_ngn_shed.c | 11 +- modules/http2/h2_ngn_shed.h | 2 + modules/http2/h2_proxy_session.c | 2 +- modules/http2/h2_proxy_util.c | 18 +-- modules/http2/h2_session.c | 10 +- modules/http2/h2_stream.c | 68 ++++----- modules/http2/h2_stream.h | 3 +- modules/http2/h2_task.c | 3 +- modules/http2/h2_task.h | 1 - modules/http2/h2_util.c | 10 +- modules/http2/h2_version.h | 4 +- modules/http2/h2_workers.c | 4 +- 21 files changed, 222 insertions(+), 238 deletions(-) diff --git a/CHANGES b/CHANGES index fb02e16103..3e8c26de2f 100644 --- a/CHANGES +++ b/CHANGES @@ -2,6 +2,13 @@ Changes with Apache 2.4.24 + *) mod_http2: connection shutdown revisited: corrected edge cases on + shutting down ongoing streams, changed log warnings to be less noisy + when waiting on long running tasks. [Stefan Eissing] + + *) mod_http2: changed all AP_DEBUG_ASSERT to ap_assert to have them + available also in normal deployments. [Stefan Eissing] + *) mod_http2/mod_proxy_http2: 100-continue handling now properly implemented up to the backend. Reused HTTP/2 proxy connections with more than a second not used will block request bodies until a PING answer is received. diff --git a/modules/http2/h2_bucket_beam.c b/modules/http2/h2_bucket_beam.c index 6c40687943..22b2e909c9 100644 --- a/modules/http2/h2_bucket_beam.c +++ b/modules/http2/h2_bucket_beam.c @@ -377,7 +377,7 @@ static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy) APLOGNO(03384) "h2_beam(%d-%s): emitted bucket not " "in hold, n=%d", beam->id, beam->tag, (int)proxy->n); - AP_DEBUG_ASSERT(!proxy->bred); + ap_assert(!proxy->bred); } } /* notify anyone waiting on space to become available */ @@ -413,6 +413,32 @@ static apr_status_t beam_close(h2_bucket_beam *beam) } static void beam_set_red_pool(h2_bucket_beam *beam, apr_pool_t *pool); +static void beam_set_green_pool(h2_bucket_beam *beam, apr_pool_t *pool); + +static apr_status_t beam_green_cleanup(void *data) +{ + h2_bucket_beam *beam = data; + + if (beam->green) { + apr_brigade_destroy(beam->green); + beam->green = NULL; + } + beam->green_pool = NULL; + return APR_SUCCESS; +} + +static void beam_set_green_pool(h2_bucket_beam *beam, apr_pool_t *pool) +{ + if (beam->green_pool != pool) { + if (beam->green_pool) { + apr_pool_cleanup_kill(beam->green_pool, beam, beam_green_cleanup); + } + beam->green_pool = pool; + if (beam->green_pool) { + apr_pool_pre_cleanup_register(beam->green_pool, beam, beam_green_cleanup); + } + } +} static apr_status_t beam_red_cleanup(void *data) { @@ -429,8 +455,7 @@ static apr_status_t beam_red_cleanup(void *data) } h2_blist_cleanup(&beam->purge); h2_blist_cleanup(&beam->hold); - beam_set_red_pool(beam, NULL); - + beam->red_pool = NULL; return APR_SUCCESS; } @@ -453,10 +478,16 @@ static apr_status_t beam_cleanup(void *data) apr_status_t status; beam_close(beam); + if (beam->green_pool) { + apr_pool_cleanup_kill(beam->green_pool, beam, beam_green_cleanup); + status = beam_green_cleanup(beam); + } + if (beam->red_pool) { + apr_pool_cleanup_kill(beam->red_pool, beam, beam_red_cleanup); status = beam_red_cleanup(beam); } - return APR_SUCCESS; + return status; } apr_status_t h2_beam_destroy(h2_bucket_beam *beam) @@ -582,27 +613,15 @@ apr_status_t h2_beam_close(h2_bucket_beam *beam) return beam->aborted? APR_ECONNABORTED : APR_SUCCESS; } -apr_status_t h2_beam_shutdown(h2_bucket_beam *beam, apr_read_type_e block, - int clear_buffers) +apr_status_t h2_beam_wait_empty(h2_bucket_beam *beam, apr_read_type_e block) { apr_status_t status; h2_beam_lock bl; if ((status = enter_yellow(beam, &bl)) == APR_SUCCESS) { - if (clear_buffers) { - r_purge_reds(beam); - h2_blist_cleanup(&beam->red); - if (!bl.mutex && beam->green) { - /* not protected, may process green in red call */ - apr_brigade_destroy(beam->green); - beam->green = NULL; - } - } - beam_close(beam); - - while (status == APR_SUCCESS - && (!H2_BPROXY_LIST_EMPTY(&beam->proxies) - || (beam->green && !APR_BRIGADE_EMPTY(beam->green)))) { + while (status == APR_SUCCESS + && !H2_BLIST_EMPTY(&beam->red) + && !H2_BPROXY_LIST_EMPTY(&beam->proxies)) { if (block == APR_NONBLOCK_READ || !bl.mutex) { status = APR_EAGAIN; break; @@ -810,6 +829,7 @@ transfer: } /* transfer enough buckets from our green brigade, if we have one */ + beam_set_green_pool(beam, bb->p); while (beam->green && !APR_BRIGADE_EMPTY(beam->green) && (readbytes <= 0 || remain >= 0)) { @@ -1068,11 +1088,6 @@ int h2_beam_holds_proxies(h2_bucket_beam *beam) return has_proxies; } -int h2_beam_closed(h2_bucket_beam *beam) -{ - return beam->closed; -} - int h2_beam_was_received(h2_bucket_beam *beam) { int happend = 0; diff --git a/modules/http2/h2_bucket_beam.h b/modules/http2/h2_bucket_beam.h index 655e03091b..4c779d1f21 100644 --- a/modules/http2/h2_bucket_beam.h +++ b/modules/http2/h2_bucket_beam.h @@ -88,7 +88,7 @@ apr_size_t h2_util_bl_print(char *buffer, apr_size_t bmax, * Care needs to be taken when terminating the beam. The beam registers at * the pool it was created with and will cleanup after itself. However, if * received buckets do still exist, already freed memory might be accessed. - * The beam does a AP_DEBUG_ASSERT on this condition. + * The beam does a assertion on this condition. * * The proper way of shutting down a beam is to first make sure there are no * more green buckets out there, then cleanup the beam to purge eventually @@ -179,6 +179,7 @@ struct h2_bucket_beam { apr_bucket_brigade *green; h2_bproxy_list proxies; apr_pool_t *red_pool; + apr_pool_t *green_pool; apr_size_t max_buf_size; apr_interval_time_t timeout; @@ -259,13 +260,6 @@ apr_status_t h2_beam_receive(h2_bucket_beam *beam, apr_read_type_e block, apr_off_t readbytes); -/** - * Determine if beam is closed. May still contain buffered data. - * - * Call from red or green side. - */ -int h2_beam_closed(h2_bucket_beam *beam); - /** * Determine if beam is empty. * @@ -305,8 +299,7 @@ apr_status_t h2_beam_close(h2_bucket_beam *beam); * * Call from the red side only. */ -apr_status_t h2_beam_shutdown(h2_bucket_beam *beam, apr_read_type_e block, - int clear_buffers); +apr_status_t h2_beam_wait_empty(h2_bucket_beam *beam, apr_read_type_e block); void h2_beam_mutex_set(h2_bucket_beam *beam, h2_beam_mutex_enter m_enter, diff --git a/modules/http2/h2_config.c b/modules/http2/h2_config.c index 251c3f05d2..5613e8a479 100644 --- a/modules/http2/h2_config.c +++ b/modules/http2/h2_config.c @@ -198,7 +198,7 @@ const h2_config *h2_config_sget(server_rec *s) { h2_config *cfg = (h2_config *)ap_get_module_config(s->module_config, &http2_module); - AP_DEBUG_ASSERT(cfg); + ap_assert(cfg); return cfg; } diff --git a/modules/http2/h2_conn.c b/modules/http2/h2_conn.c index d4c139472a..a0915c3eb4 100644 --- a/modules/http2/h2_conn.c +++ b/modules/http2/h2_conn.c @@ -248,7 +248,7 @@ conn_rec *h2_slave_create(conn_rec *master, int slave_id, conn_rec *c; void *cfg; - AP_DEBUG_ASSERT(master); + ap_assert(master); ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, master, "h2_conn(%ld): create slave", master->id); diff --git a/modules/http2/h2_conn_io.c b/modules/http2/h2_conn_io.c index 6ba24faa6f..303860eeb8 100644 --- a/modules/http2/h2_conn_io.c +++ b/modules/http2/h2_conn_io.c @@ -206,7 +206,7 @@ static apr_status_t read_to_scratch(h2_conn_io *io, apr_bucket *b) return APR_SUCCESS; } - AP_DEBUG_ASSERT(b->length <= (io->ssize - io->slen)); + ap_assert(b->length <= (io->ssize - io->slen)); if (APR_BUCKET_IS_FILE(b)) { apr_bucket_file *f = (apr_bucket_file *)b->data; apr_file_t *fd = f->fd; diff --git a/modules/http2/h2_ctx.c b/modules/http2/h2_ctx.c index 4b596a3d78..e79b5f805d 100644 --- a/modules/http2/h2_ctx.c +++ b/modules/http2/h2_ctx.c @@ -27,7 +27,7 @@ static h2_ctx *h2_ctx_create(const conn_rec *c) { h2_ctx *ctx = apr_pcalloc(c->pool, sizeof(h2_ctx)); - AP_DEBUG_ASSERT(ctx); + ap_assert(ctx); ap_set_module_config(c->conn_config, &http2_module, ctx); h2_ctx_server_set(ctx, c->base_server); return ctx; @@ -35,7 +35,7 @@ static h2_ctx *h2_ctx_create(const conn_rec *c) void h2_ctx_clear(const conn_rec *c) { - AP_DEBUG_ASSERT(c); + ap_assert(c); ap_set_module_config(c->conn_config, &http2_module, NULL); } diff --git a/modules/http2/h2_from_h1.c b/modules/http2/h2_from_h1.c index 7eb835fd90..2b4f79ac14 100644 --- a/modules/http2/h2_from_h1.c +++ b/modules/http2/h2_from_h1.c @@ -727,7 +727,7 @@ apr_status_t h2_filter_request_in(ap_filter_t* f, ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, f->r, "h2_task(%s): request filter, exp=%d", task->id, r->expecting_100); - if (!task->input.chunked) { + if (!task->request->chunked) { status = ap_get_brigade(f->next, bb, mode, block, readbytes); /* pipe data through, just take care of trailers */ for (b = APR_BRIGADE_FIRST(bb); diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index 4edd4a61c1..0cff7b60fa 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -90,14 +90,14 @@ static apr_status_t enter_mutex(h2_mplx *m, int *pacquired) * This allow recursive entering of the mutex from the saem thread, * which is what we need in certain situations involving callbacks */ - AP_DEBUG_ASSERT(m); + ap_assert(m); apr_threadkey_private_get(&mutex, thread_lock); if (mutex == m->lock) { *pacquired = 0; return APR_SUCCESS; } - AP_DEBUG_ASSERT(m->lock); + ap_assert(m->lock); status = apr_thread_mutex_lock(m->lock); *pacquired = (status == APR_SUCCESS); if (*pacquired) { @@ -221,13 +221,12 @@ static void purge_streams(h2_mplx *m) /* repeat until empty */ } h2_ihash_clear(m->spurge); - AP_DEBUG_ASSERT(h2_ihash_empty(m->spurge)); } } static void h2_mplx_destroy(h2_mplx *m) { - AP_DEBUG_ASSERT(m); + ap_assert(m); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): destroy, tasks=%d", m->id, (int)h2_ihash_count(m->tasks)); @@ -256,7 +255,7 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, apr_status_t status = APR_SUCCESS; apr_allocator_t *allocator = NULL; h2_mplx *m; - AP_DEBUG_ASSERT(conf); + ap_assert(conf); status = apr_allocator_create(&allocator); if (status != APR_SUCCESS) { @@ -353,7 +352,6 @@ static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master) { conn_rec *slave = NULL; int reuse_slave = 0; - apr_status_t status; ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, "h2_task(%s): destroy", task->id); @@ -365,22 +363,14 @@ static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master) } } - /* The pool is cleared/destroyed which also closes all - * allocated file handles. Give this count back to our - * file handle pool. */ if (task->output.beam) { - m->tx_handles_reserved += - h2_beam_get_files_beamed(task->output.beam); h2_beam_on_produced(task->output.beam, NULL, NULL); - status = h2_beam_shutdown(task->output.beam, APR_NONBLOCK_READ, 1); - if (status != APR_SUCCESS){ - ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, m->c, - APLOGNO(03385) "h2_task(%s): output shutdown " - "incomplete, beam empty=%d, holds proxies=%d", - task->id, - h2_beam_empty(task->output.beam), - h2_beam_holds_proxies(task->output.beam)); - } + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, + APLOGNO(03385) "h2_task(%s): destroy " + "output beam empty=%d, holds proxies=%d", + task->id, + h2_beam_empty(task->output.beam), + h2_beam_holds_proxies(task->output.beam)); } slave = task->c; @@ -452,6 +442,9 @@ static void stream_done(h2_mplx *m, h2_stream *stream, int rst_error) /* Remove mutex after, so that abort still finds cond to signal */ h2_beam_mutex_set(stream->input, NULL, NULL, NULL); } + if (stream->output) { + m->tx_handles_reserved += h2_beam_get_files_beamed(stream->output); + } h2_stream_cleanup(stream); task = h2_ihash_get(m->tasks, stream->id); @@ -513,7 +506,7 @@ static int task_print(void *ctx, void *val) if (task) { h2_stream *stream = h2_ihash_get(m->streams, task->stream_id); - ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */ + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */ "->03198: h2_stream(%s): %s %s %s" "[orph=%d/started=%d/done=%d/frozen=%d]", task->id, task->request->method, @@ -522,11 +515,11 @@ static int task_print(void *ctx, void *val) task->worker_done, task->frozen); } else if (task) { - ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */ + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */ "->03198: h2_stream(%ld-%d): NULL", m->id, task->stream_id); } else { - ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */ + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */ "->03198: h2_stream(%ld-NULL): NULL", m->id); } return 1; @@ -535,14 +528,16 @@ static int task_print(void *ctx, void *val) static int task_abort_connection(void *ctx, void *val) { h2_task *task = val; - if (task->c) { - task->c->aborted = 1; - } - if (task->input.beam) { - h2_beam_abort(task->input.beam); - } - if (task->output.beam) { - h2_beam_abort(task->output.beam); + if (!task->worker_done) { + if (task->c) { + task->c->aborted = 1; + } + if (task->input.beam) { + h2_beam_abort(task->input.beam); + } + if (task->output.beam) { + h2_beam_abort(task->output.beam); + } } return 1; } @@ -551,124 +546,97 @@ static int report_stream_iter(void *ctx, void *val) { h2_mplx *m = ctx; h2_stream *stream = val; ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld-%d): exists, started=%d, scheduled=%d, " - "ready=%d", + "h2_mplx(%ld-%d): exists, started=%d, scheduled=%d, ready=%d", m->id, stream->id, stream->started, stream->scheduled, h2_stream_is_ready(stream)); return 1; } +static int task_done_iter(void *ctx, void *val); + apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) { apr_status_t status; int acquired; + /* How to shut down a h2 connection: + * 1. tell the workers that no more tasks will come from us */ h2_workers_unregister(m->workers, m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - int i, wait_secs = 5; + int i, wait_secs = 60; - if (!h2_ihash_empty(m->streams) && APLOGctrace1(m->c)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): release_join with %d streams open, " - "%d streams ready, %d tasks", - m->id, (int)h2_ihash_count(m->streams), - (int)h2_ihash_count(m->sready), - (int)h2_ihash_count(m->tasks)); - h2_ihash_iter(m->streams, report_stream_iter, m); - } - - /* disable WINDOW_UPDATE callbacks */ + /* 2. disable WINDOW_UPDATEs and set the mplx to aborted, clear + * our TODO list and purge any streams we have collected */ h2_mplx_set_consumed_cb(m, NULL, NULL); - - if (!h2_ihash_empty(m->shold)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - "h2_mplx(%ld): start release_join with %d streams in hold", - m->id, (int)h2_ihash_count(m->shold)); - } - if (!h2_ihash_empty(m->spurge)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - "h2_mplx(%ld): start release_join with %d streams to purge", - m->id, (int)h2_ihash_count(m->spurge)); - } - + h2_mplx_abort(m); h2_iq_clear(m->q); + purge_streams(m); + + /* 3. mark all slave connections as aborted and wakeup all sleeping + * tasks. Mark all still active streams as 'done'. m->streams has to + * be empty afterwards with streams either in + * a) m->shold because a task is still active + * b) m->spurge because task is done, or was not started */ + h2_ihash_iter(m->tasks, task_abort_connection, m); apr_thread_cond_broadcast(m->task_thawed); while (!h2_ihash_iter(m->streams, stream_done_iter, m)) { /* iterate until all streams have been removed */ } - AP_DEBUG_ASSERT(h2_ihash_empty(m->streams)); - - if (!h2_ihash_empty(m->shold)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - "h2_mplx(%ld): 2. release_join with %d streams in " - "hold, %d workers busy, %d tasks", - m->id, (int)h2_ihash_count(m->shold), - m->workers_busy, - (int)h2_ihash_count(m->tasks)); - } - if (!h2_ihash_empty(m->spurge)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - "h2_mplx(%ld): 2. release_join with %d streams to purge", - m->id, (int)h2_ihash_count(m->spurge)); - } + ap_assert(h2_ihash_empty(m->streams)); + + /* 4. purge all streams we collected by marking them 'done' */ + purge_streams(m); - /* If we still have busy workers, we cannot release our memory - * pool yet, as tasks have references to us. - * Any operation on the task slave connection will from now on - * be errored ECONNRESET/ABORTED, so processing them should fail - * and workers *should* return in a timely fashion. - */ + /* 5. while workers are busy on this connection, meaning they + * are processing tasks from this connection, wait on them finishing + * to wake us and check again. Eventually, this has to succeed. */ + m->join_wait = wait; for (i = 0; m->workers_busy > 0; ++i) { - h2_ihash_iter(m->tasks, task_abort_connection, m); - - m->join_wait = wait; status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs)); if (APR_STATUS_IS_TIMEUP(status)) { - if (i > 0) { - /* Oh, oh. Still we wait for assigned workers to report that - * they are done. Unless we have a bug, a worker seems to be hanging. - * If we exit now, all will be deallocated and the worker, once - * it does return, will walk all over freed memory... - */ - ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03198) - "h2_mplx(%ld): release, waiting for %d seconds now for " - "%d h2_workers to return, have still %d tasks outstanding", - m->id, i*wait_secs, m->workers_busy, - (int)h2_ihash_count(m->tasks)); - if (i == 1) { - h2_ihash_iter(m->tasks, task_print, m); - } - } - h2_mplx_abort(m); - apr_thread_cond_broadcast(m->task_thawed); + /* This can happen if we have very long running requests + * that do not time out on IO. */ + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03198) + "h2_mplx(%ld): release, waiting for %d seconds now for " + "%d h2_workers to return, have still %d tasks outstanding", + m->id, i*wait_secs, m->workers_busy, + (int)h2_ihash_count(m->tasks)); + h2_ihash_iter(m->shold, report_stream_iter, m); + h2_ihash_iter(m->tasks, task_print, m); } + purge_streams(m); } + m->join_wait = NULL; - if (!h2_ihash_empty(m->tasks) && APLOGctrace1(m->c)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + /* 6. All workers for this connection are done, we are in + * single-threaded processing now effectively. */ + leave_mutex(m, acquired); + + if (!h2_ihash_empty(m->tasks)) { + /* when we are here, we lost track of the tasks still present. + * this currently happens with mod_proxy_http2 when we shut + * down a h2_req_engine with tasks assigned... */ + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03056) "h2_mplx(%ld): 3. release_join with %d tasks", m->id, (int)h2_ihash_count(m->tasks)); h2_ihash_iter(m->tasks, task_print, m); + + while (!h2_ihash_iter(m->tasks, task_done_iter, m)) { + /* iterate until all tasks have been removed */ + } } - AP_DEBUG_ASSERT(h2_ihash_empty(m->shold)); - if (!h2_ihash_empty(m->spurge)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - "h2_mplx(%ld): 3. release_join %d streams to purge", - m->id, (int)h2_ihash_count(m->spurge)); - purge_streams(m); - } + + /* 7. With all tasks done, the stream hold should be empty and all + * remaining streams are ready for purging */ + ap_assert(h2_ihash_empty(m->shold)); + purge_streams(m); - if (!h2_ihash_empty(m->tasks)) { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056) - "h2_mplx(%ld): release_join -> destroy, " - "%d tasks still present", - m->id, (int)h2_ihash_count(m->tasks)); - } - leave_mutex(m, acquired); + /* 8. close the h2_req_enginge shed and self destruct */ + h2_ngn_shed_destroy(m->ngn_shed); + m->ngn_shed = NULL; h2_mplx_destroy(m); - /* all gone */ } return status; } @@ -677,7 +645,6 @@ void h2_mplx_abort(h2_mplx *m) { int acquired; - AP_DEBUG_ASSERT(m); if (!m->aborted && enter_mutex(m, &acquired) == APR_SUCCESS) { m->aborted = 1; h2_ngn_shed_abort(m->ngn_shed); @@ -690,7 +657,6 @@ apr_status_t h2_mplx_stream_done(h2_mplx *m, h2_stream *stream) apr_status_t status = APR_SUCCESS; int acquired; - AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, "h2_mplx(%ld-%d): marking stream as done.", @@ -707,7 +673,6 @@ h2_stream *h2_mplx_stream_get(h2_mplx *m, int id) h2_stream *s = NULL; int acquired; - AP_DEBUG_ASSERT(m); if ((enter_mutex(m, &acquired)) == APR_SUCCESS) { s = h2_ihash_get(m->streams, id); leave_mutex(m, acquired); @@ -728,7 +693,6 @@ static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes) h2_stream *stream; int acquired; - AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { stream = h2_ihash_get(m->streams, beam->id); if (stream) { @@ -780,7 +744,6 @@ apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam) apr_status_t status; int acquired; - AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { if (m->aborted) { status = APR_ECONNABORTED; @@ -825,7 +788,6 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout, apr_status_t status; int acquired; - AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { if (m->aborted) { status = APR_ECONNABORTED; @@ -866,7 +828,6 @@ apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx) apr_status_t status; int acquired; - AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { if (m->aborted) { status = APR_ECONNABORTED; @@ -888,7 +849,6 @@ apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream, int do_registration = 0; int acquired; - AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { if (m->aborted) { status = APR_ECONNABORTED; @@ -981,7 +941,6 @@ h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more) apr_status_t status; int acquired; - AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { if (m->aborted) { *has_more = 0; @@ -1088,8 +1047,8 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) stream = h2_ihash_get(m->shold, task->stream_id); if (stream) { ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - "h2_mplx(%s): task_done, stream in hold", - task->id); + "h2_mplx(%s): task_done, stream %d in hold", + task->id, stream->id); /* We cannot destroy the stream here since this is * called from a worker thread and freeing memory pools * is only safe in the only thread using it (and its @@ -1103,14 +1062,16 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) task->id); task_destroy(m, task, 0); } - - if (m->join_wait) { - apr_thread_cond_signal(m->join_wait); - } } } } +static int task_done_iter(void *ctx, void *val) +{ + task_done((h2_mplx*)ctx, val, 0); + return 0; +} + void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask) { int acquired; @@ -1118,6 +1079,9 @@ void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask) if (enter_mutex(m, &acquired) == APR_SUCCESS) { task_done(m, task, NULL); --m->workers_busy; + if (m->join_wait) { + apr_thread_cond_signal(m->join_wait); + } if (ptask) { /* caller wants another task */ *ptask = next_stream_task(m); @@ -1411,7 +1375,6 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, h2_stream *stream; size_t i, n; - AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, "h2_mplx(%ld): dispatch events", m->id); @@ -1442,7 +1405,6 @@ apr_status_t h2_mplx_keep_active(h2_mplx *m, int stream_id) apr_status_t status; int acquired; - AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { h2_stream *s = h2_ihash_get(m->streams, stream_id); if (s) { @@ -1458,7 +1420,6 @@ int h2_mplx_awaits_data(h2_mplx *m) apr_status_t status; int acquired, waiting = 1; - AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { if (h2_ihash_empty(m->streams)) { waiting = 0; diff --git a/modules/http2/h2_ngn_shed.c b/modules/http2/h2_ngn_shed.c index d65fa1b68d..2f5b729617 100644 --- a/modules/http2/h2_ngn_shed.c +++ b/modules/http2/h2_ngn_shed.c @@ -215,7 +215,7 @@ apr_status_t h2_ngn_shed_push_request(h2_ngn_shed *shed, const char *ngn_type, "h2_ngn_shed(%ld): create engine %s (%s)", shed->c->id, newngn->id, newngn->type); if (status == APR_SUCCESS) { - AP_DEBUG_ASSERT(task->engine == NULL); + ap_assert(task->engine == NULL); newngn->task = task; task->engine = newngn; task->assigned = newngn; @@ -252,7 +252,7 @@ apr_status_t h2_ngn_shed_pull_request(h2_ngn_shed *shed, { h2_ngn_entry *entry; - AP_DEBUG_ASSERT(ngn); + ap_assert(ngn); *pr = NULL; ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, shed->c, APLOGNO(03396) "h2_ngn_shed(%ld): pull task for engine %s, shutdown=%d", @@ -352,6 +352,7 @@ void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn) "frozen=%d, aborting", shed->c->id, ngn->id, task->id, task->frozen); ngn_done_task(shed, ngn, task, 0, 1); + task->engine = task->assigned = NULL; } } if (!shed->aborted && (ngn->no_assigned > 1 || ngn->no_live > 1)) { @@ -371,3 +372,9 @@ void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn) apr_hash_set(shed->ngns, ngn->type, APR_HASH_KEY_STRING, NULL); ngn->done = 1; } + +void h2_ngn_shed_destroy(h2_ngn_shed *shed) +{ + ap_assert(apr_hash_count(shed->ngns) == 0); +} + diff --git a/modules/http2/h2_ngn_shed.h b/modules/http2/h2_ngn_shed.h index bcafc509b1..c6acbae253 100644 --- a/modules/http2/h2_ngn_shed.h +++ b/modules/http2/h2_ngn_shed.h @@ -51,6 +51,8 @@ h2_ngn_shed *h2_ngn_shed_create(apr_pool_t *pool, conn_rec *c, int default_capactiy, apr_size_t req_buffer_size); +void h2_ngn_shed_destroy(h2_ngn_shed *shed); + void h2_ngn_shed_set_ctx(h2_ngn_shed *shed, void *user_ctx); void *h2_ngn_shed_get_ctx(h2_ngn_shed *shed); diff --git a/modules/http2/h2_proxy_session.c b/modules/http2/h2_proxy_session.c index c5d00fe497..a79c5da479 100644 --- a/modules/http2/h2_proxy_session.c +++ b/modules/http2/h2_proxy_session.c @@ -926,7 +926,7 @@ static apr_status_t session_shutdown(h2_proxy_session *session, int reason, apr_status_t status = APR_SUCCESS; const char *err = msg; - AP_DEBUG_ASSERT(session); + ap_assert(session); if (!err && reason) { err = nghttp2_strerror(reason); } diff --git a/modules/http2/h2_proxy_util.c b/modules/http2/h2_proxy_util.c index 4c732788f3..8089dde5e5 100644 --- a/modules/http2/h2_proxy_util.c +++ b/modules/http2/h2_proxy_util.c @@ -425,11 +425,11 @@ h2_proxy_ngheader *h2_proxy_util_nghd_make_req(apr_pool_t *p, h2_proxy_ngheader *ngh; size_t n; - AP_DEBUG_ASSERT(req); - AP_DEBUG_ASSERT(req->scheme); - AP_DEBUG_ASSERT(req->authority); - AP_DEBUG_ASSERT(req->path); - AP_DEBUG_ASSERT(req->method); + ap_assert(req); + ap_assert(req->scheme); + ap_assert(req->authority); + ap_assert(req->path); + ap_assert(req->method); n = 4; apr_table_do(count_header, &n, req->headers, NULL); @@ -608,10 +608,10 @@ apr_status_t h2_proxy_req_make(h2_proxy_request *req, apr_pool_t *pool, req->authority = authority; req->path = path; - AP_DEBUG_ASSERT(req->scheme); - AP_DEBUG_ASSERT(req->authority); - AP_DEBUG_ASSERT(req->path); - AP_DEBUG_ASSERT(req->method); + ap_assert(req->scheme); + ap_assert(req->authority); + ap_assert(req->path); + ap_assert(req->method); x.pool = pool; x.headers = req->headers; diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c index 78bd7d72bd..27ed9197b5 100644 --- a/modules/http2/h2_session.c +++ b/modules/http2/h2_session.c @@ -711,7 +711,7 @@ static apr_status_t init_callbacks(conn_rec *c, nghttp2_session_callbacks **pcb) static void h2_session_destroy(h2_session *session) { - AP_DEBUG_ASSERT(session); + ap_assert(session); h2_ihash_clear(session->streams); if (session->mplx) { @@ -743,7 +743,7 @@ static apr_status_t h2_session_shutdown_notice(h2_session *session) { apr_status_t status; - AP_DEBUG_ASSERT(session); + ap_assert(session); if (!session->local.accepting) { return APR_SUCCESS; } @@ -764,7 +764,7 @@ static apr_status_t h2_session_shutdown(h2_session *session, int error, { apr_status_t status = APR_SUCCESS; - AP_DEBUG_ASSERT(session); + ap_assert(session); if (session->local.shutdown) { return APR_SUCCESS; } @@ -1034,7 +1034,7 @@ static apr_status_t h2_session_start(h2_session *session, int *rv) size_t slen; int win_size; - AP_DEBUG_ASSERT(session); + ap_assert(session); /* Start the conversation by submitting our SETTINGS frame */ *rv = 0; if (session->r) { @@ -1154,7 +1154,7 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s, int eos = 0; apr_status_t status; h2_stream *stream; - AP_DEBUG_ASSERT(session); + ap_assert(session); /* The session wants to send more DATA for the stream. We need * to find out how much of the requested length we can send without diff --git a/modules/http2/h2_stream.c b/modules/http2/h2_stream.c index 01c7b7c297..3f177cf54e 100644 --- a/modules/http2/h2_stream.c +++ b/modules/http2/h2_stream.c @@ -173,6 +173,7 @@ static apr_status_t stream_pool_cleanup(void *ctx) h2_stream *stream = ctx; apr_status_t status; + ap_assert(stream->can_be_cleaned); if (stream->files) { apr_file_t *file; int i; @@ -213,31 +214,35 @@ h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session, void h2_stream_cleanup(h2_stream *stream) { - AP_DEBUG_ASSERT(stream); + apr_status_t status; + + ap_assert(stream); if (stream->out_buffer) { + /* remove any left over output buckets that may still have + * references into request pools */ apr_brigade_cleanup(stream->out_buffer); } - if (stream->input) { - apr_status_t status; - status = h2_beam_shutdown(stream->input, APR_NONBLOCK_READ, 1); - if (status == APR_EAGAIN) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, - "h2_stream(%ld-%d): wait on input shutdown", - stream->session->id, stream->id); - status = h2_beam_shutdown(stream->input, APR_BLOCK_READ, 1); - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c, - "h2_stream(%ld-%d): input shutdown returned", - stream->session->id, stream->id); - } + h2_beam_abort(stream->input); + status = h2_beam_wait_empty(stream->input, APR_NONBLOCK_READ); + if (status == APR_EAGAIN) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, + "h2_stream(%ld-%d): wait on input drain", + stream->session->id, stream->id); + status = h2_beam_wait_empty(stream->input, APR_BLOCK_READ); + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c, + "h2_stream(%ld-%d): input drain returned", + stream->session->id, stream->id); } } void h2_stream_destroy(h2_stream *stream) { - AP_DEBUG_ASSERT(stream); + ap_assert(stream); + ap_assert(!h2_mplx_stream_get(stream->session->mplx, stream->id)); ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, stream->session->c, "h2_stream(%ld-%d): destroy", stream->session->id, stream->id); + stream->can_be_cleaned = 1; if (stream->pool) { apr_pool_destroy(stream->pool); } @@ -327,7 +332,7 @@ apr_status_t h2_stream_add_header(h2_stream *stream, const char *name, size_t nlen, const char *value, size_t vlen) { - AP_DEBUG_ASSERT(stream); + ap_assert(stream); if (!stream->has_response) { if (name[0] == ':') { @@ -383,9 +388,9 @@ apr_status_t h2_stream_schedule(h2_stream *stream, int eos, int push_enabled, h2_stream_pri_cmp *cmp, void *ctx) { apr_status_t status = APR_EINVAL; - AP_DEBUG_ASSERT(stream); - AP_DEBUG_ASSERT(stream->session); - AP_DEBUG_ASSERT(stream->session->mplx); + ap_assert(stream); + ap_assert(stream->session); + ap_assert(stream->session->mplx); if (!stream->scheduled) { if (eos) { @@ -444,7 +449,9 @@ int h2_stream_is_scheduled(const h2_stream *stream) apr_status_t h2_stream_close_input(h2_stream *stream) { conn_rec *c = stream->session->c; - apr_status_t status = APR_SUCCESS, rv; + apr_status_t status; + apr_bucket_brigade *tmp; + apr_bucket *b; ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, "h2_stream(%ld-%d): closing input", @@ -453,27 +460,20 @@ apr_status_t h2_stream_close_input(h2_stream *stream) return APR_ECONNRESET; } - if (!stream->input) { - h2_beam_create(&stream->input, stream->pool, stream->id, "input", 0); - } - + tmp = apr_brigade_create(stream->pool, c->bucket_alloc); if (stream->trailers && !apr_is_empty_table(stream->trailers)) { h2_headers *r = h2_headers_create(HTTP_OK, stream->trailers, NULL, stream->pool); - apr_bucket *b = h2_bucket_headers_create(c->bucket_alloc, r); - apr_bucket_brigade *tmp; - - tmp = apr_brigade_create(stream->pool, c->bucket_alloc); + b = h2_bucket_headers_create(c->bucket_alloc, r); APR_BRIGADE_INSERT_TAIL(tmp, b); - status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ); - apr_brigade_destroy(tmp); - stream->trailers = NULL; } - close_input(stream); - rv = h2_beam_close(stream->input); - return status ? status : rv; + b = apr_bucket_eos_create(c->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(tmp, b); + status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ); + apr_brigade_destroy(tmp); + return status; } apr_status_t h2_stream_write_data(h2_stream *stream, @@ -483,7 +483,7 @@ apr_status_t h2_stream_write_data(h2_stream *stream, apr_status_t status = APR_SUCCESS; apr_bucket_brigade *tmp; - AP_DEBUG_ASSERT(stream); + ap_assert(stream); if (!stream->input) { return APR_EOF; } diff --git a/modules/http2/h2_stream.h b/modules/http2/h2_stream.h index ecb53ba56d..57fdbba04c 100644 --- a/modules/http2/h2_stream.h +++ b/modules/http2/h2_stream.h @@ -65,6 +65,7 @@ struct h2_stream { unsigned int started : 1; /* stream has started processing */ unsigned int has_response : 1; /* response headers are known */ unsigned int push_policy; /* which push policy to use for this request */ + unsigned int can_be_cleaned : 1; /* stream pool can be cleaned */ apr_off_t out_data_frames; /* # of DATA frames sent */ apr_off_t out_data_octets; /* # of DATA octets (payload) sent */ @@ -98,7 +99,7 @@ void h2_stream_eos_destroy(h2_stream *stream); void h2_stream_destroy(h2_stream *stream); /** - * Removes stream from h2_session and destroys it. + * Cleanup references into requst processing. * * @param stream the stream to cleanup */ diff --git a/modules/http2/h2_task.c b/modules/http2/h2_task.c index ee4b7bc69d..3f70b3aa21 100644 --- a/modules/http2/h2_task.c +++ b/modules/http2/h2_task.c @@ -546,7 +546,7 @@ void h2_task_destroy(h2_task *task) apr_status_t h2_task_do(h2_task *task, apr_thread_t *thread, int worker_id) { - AP_DEBUG_ASSERT(task); + ap_assert(task); if (task->c->master) { /* Each conn_rec->id is supposed to be unique at a point in time. Since @@ -580,7 +580,6 @@ apr_status_t h2_task_do(h2_task *task, apr_thread_t *thread, int worker_id) task->c->id = (task->c->master->id << free_bits)^slave_id; } - task->input.chunked = task->request->chunked; task->input.bb = apr_brigade_create(task->pool, task->c->bucket_alloc); if (task->request->serialize) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c, diff --git a/modules/http2/h2_task.h b/modules/http2/h2_task.h index fe3dbf66a6..ad8f056596 100644 --- a/modules/http2/h2_task.h +++ b/modules/http2/h2_task.h @@ -60,7 +60,6 @@ struct h2_task { struct { struct h2_bucket_beam *beam; - unsigned int chunked : 1; unsigned int eos : 1; apr_bucket_brigade *bb; apr_bucket_brigade *bbchunk; diff --git a/modules/http2/h2_util.c b/modules/http2/h2_util.c index 095ba97328..81b94566c5 100644 --- a/modules/http2/h2_util.c +++ b/modules/http2/h2_util.c @@ -1130,11 +1130,11 @@ h2_ngheader *h2_util_ngheader_make_req(apr_pool_t *p, h2_ngheader *ngh; size_t n; - AP_DEBUG_ASSERT(req); - AP_DEBUG_ASSERT(req->scheme); - AP_DEBUG_ASSERT(req->authority); - AP_DEBUG_ASSERT(req->path); - AP_DEBUG_ASSERT(req->method); + ap_assert(req); + ap_assert(req->scheme); + ap_assert(req->authority); + ap_assert(req->path); + ap_assert(req->method); n = 4; apr_table_do(count_header, &n, req->headers, NULL); diff --git a/modules/http2/h2_version.h b/modules/http2/h2_version.h index 571cbc6801..58fcf8d97c 100644 --- a/modules/http2/h2_version.h +++ b/modules/http2/h2_version.h @@ -26,7 +26,7 @@ * @macro * Version number of the http2 module as c string */ -#define MOD_HTTP2_VERSION "1.7.6" +#define MOD_HTTP2_VERSION "1.7.7" /** * @macro @@ -34,7 +34,7 @@ * release. This is a 24 bit number with 8 bits for major number, 8 bits * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203. */ -#define MOD_HTTP2_VERSION_NUM 0x010706 +#define MOD_HTTP2_VERSION_NUM 0x010707 #endif /* mod_h2_h2_version_h */ diff --git a/modules/http2/h2_workers.c b/modules/http2/h2_workers.c index 2a1599914c..1dcfb2fcd7 100644 --- a/modules/http2/h2_workers.c +++ b/modules/http2/h2_workers.c @@ -243,8 +243,8 @@ h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool, h2_workers *workers; apr_pool_t *pool; - AP_DEBUG_ASSERT(s); - AP_DEBUG_ASSERT(server_pool); + ap_assert(s); + ap_assert(server_pool); /* let's have our own pool that will be parent to all h2_worker * instances we create. This happens in various threads, but always -- 2.40.0