From: Stefan Eissing Date: Thu, 27 Oct 2016 16:12:20 +0000 (+0000) Subject: mod_http2: connection shutdown revisited X-Git-Tag: 2.5.0-alpha~1063 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=6283a78c2aa083a3093f0f46d59e7b6c25452b38;p=apache mod_http2: connection shutdown revisited git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1766851 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/CHANGES b/CHANGES index 63fe4d4c8c..3323a7e8f6 100644 --- a/CHANGES +++ b/CHANGES @@ -1,6 +1,10 @@ -*- coding: utf-8 -*- Changes with Apache 2.5.0 + *) mod_http2: connection shutdown revisited: corrected edge cases on + shutting down ongoing streams, changed log warnings to be less noisy + when waiting on long running tasks. [Stefan Eissing] + *) mod_http2: changed all AP_DEBUG_ASSERT to ap_assert to have them available also in normal deployments. [Stefan Eissing] diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index 705857b3a5..0cff7b60fa 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -221,7 +221,6 @@ static void purge_streams(h2_mplx *m) /* repeat until empty */ } h2_ihash_clear(m->spurge); - ap_assert(h2_ihash_empty(m->spurge)); } } @@ -507,7 +506,7 @@ static int task_print(void *ctx, void *val) if (task) { h2_stream *stream = h2_ihash_get(m->streams, task->stream_id); - ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */ + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */ "->03198: h2_stream(%s): %s %s %s" "[orph=%d/started=%d/done=%d/frozen=%d]", task->id, task->request->method, @@ -516,11 +515,11 @@ static int task_print(void *ctx, void *val) task->worker_done, task->frozen); } else if (task) { - ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */ + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */ "->03198: h2_stream(%ld-%d): NULL", m->id, task->stream_id); } else { - ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */ + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */ "->03198: h2_stream(%ld-NULL): NULL", m->id); } return 1; @@ -529,14 +528,16 @@ static int task_print(void *ctx, void *val) static int task_abort_connection(void *ctx, void *val) { h2_task *task = val; - if (task->c) { - task->c->aborted = 1; - } - if (task->input.beam) { - h2_beam_abort(task->input.beam); - } - if (task->output.beam) { - h2_beam_abort(task->output.beam); + if (!task->worker_done) { + if (task->c) { + task->c->aborted = 1; + } + if (task->input.beam) { + h2_beam_abort(task->input.beam); + } + if (task->output.beam) { + h2_beam_abort(task->output.beam); + } } return 1; } @@ -545,124 +546,97 @@ static int report_stream_iter(void *ctx, void *val) { h2_mplx *m = ctx; h2_stream *stream = val; ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld-%d): exists, started=%d, scheduled=%d, " - "ready=%d", + "h2_mplx(%ld-%d): exists, started=%d, scheduled=%d, ready=%d", m->id, stream->id, stream->started, stream->scheduled, h2_stream_is_ready(stream)); return 1; } +static int task_done_iter(void *ctx, void *val); + apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) { apr_status_t status; int acquired; + /* How to shut down a h2 connection: + * 1. tell the workers that no more tasks will come from us */ h2_workers_unregister(m->workers, m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - int i, wait_secs = 5; + int i, wait_secs = 60; - if (!h2_ihash_empty(m->streams) && APLOGctrace1(m->c)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): release_join with %d streams open, " - "%d streams ready, %d tasks", - m->id, (int)h2_ihash_count(m->streams), - (int)h2_ihash_count(m->sready), - (int)h2_ihash_count(m->tasks)); - h2_ihash_iter(m->streams, report_stream_iter, m); - } - - /* disable WINDOW_UPDATE callbacks */ + /* 2. disable WINDOW_UPDATEs and set the mplx to aborted, clear + * our TODO list and purge any streams we have collected */ h2_mplx_set_consumed_cb(m, NULL, NULL); - - if (!h2_ihash_empty(m->shold)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - "h2_mplx(%ld): start release_join with %d streams in hold", - m->id, (int)h2_ihash_count(m->shold)); - } - if (!h2_ihash_empty(m->spurge)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - "h2_mplx(%ld): start release_join with %d streams to purge", - m->id, (int)h2_ihash_count(m->spurge)); - } - + h2_mplx_abort(m); h2_iq_clear(m->q); + purge_streams(m); + + /* 3. mark all slave connections as aborted and wakeup all sleeping + * tasks. Mark all still active streams as 'done'. m->streams has to + * be empty afterwards with streams either in + * a) m->shold because a task is still active + * b) m->spurge because task is done, or was not started */ + h2_ihash_iter(m->tasks, task_abort_connection, m); apr_thread_cond_broadcast(m->task_thawed); while (!h2_ihash_iter(m->streams, stream_done_iter, m)) { /* iterate until all streams have been removed */ } ap_assert(h2_ihash_empty(m->streams)); - - if (!h2_ihash_empty(m->shold)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - "h2_mplx(%ld): 2. release_join with %d streams in " - "hold, %d workers busy, %d tasks", - m->id, (int)h2_ihash_count(m->shold), - m->workers_busy, - (int)h2_ihash_count(m->tasks)); - } - if (!h2_ihash_empty(m->spurge)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - "h2_mplx(%ld): 2. release_join with %d streams to purge", - m->id, (int)h2_ihash_count(m->spurge)); - } + + /* 4. purge all streams we collected by marking them 'done' */ + purge_streams(m); - /* If we still have busy workers, we cannot release our memory - * pool yet, as tasks have references to us. - * Any operation on the task slave connection will from now on - * be errored ECONNRESET/ABORTED, so processing them should fail - * and workers *should* return in a timely fashion. - */ + /* 5. while workers are busy on this connection, meaning they + * are processing tasks from this connection, wait on them finishing + * to wake us and check again. Eventually, this has to succeed. */ + m->join_wait = wait; for (i = 0; m->workers_busy > 0; ++i) { - h2_ihash_iter(m->tasks, task_abort_connection, m); - - m->join_wait = wait; status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs)); if (APR_STATUS_IS_TIMEUP(status)) { - if (i > 0) { - /* Oh, oh. Still we wait for assigned workers to report that - * they are done. Unless we have a bug, a worker seems to be hanging. - * If we exit now, all will be deallocated and the worker, once - * it does return, will walk all over freed memory... - */ - ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03198) - "h2_mplx(%ld): release, waiting for %d seconds now for " - "%d h2_workers to return, have still %d tasks outstanding", - m->id, i*wait_secs, m->workers_busy, - (int)h2_ihash_count(m->tasks)); - if (i == 1) { - h2_ihash_iter(m->tasks, task_print, m); - } - } - h2_mplx_abort(m); - apr_thread_cond_broadcast(m->task_thawed); + /* This can happen if we have very long running requests + * that do not time out on IO. */ + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03198) + "h2_mplx(%ld): release, waiting for %d seconds now for " + "%d h2_workers to return, have still %d tasks outstanding", + m->id, i*wait_secs, m->workers_busy, + (int)h2_ihash_count(m->tasks)); + h2_ihash_iter(m->shold, report_stream_iter, m); + h2_ihash_iter(m->tasks, task_print, m); } + purge_streams(m); } + m->join_wait = NULL; - if (!h2_ihash_empty(m->tasks) && APLOGctrace1(m->c)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + /* 6. All workers for this connection are done, we are in + * single-threaded processing now effectively. */ + leave_mutex(m, acquired); + + if (!h2_ihash_empty(m->tasks)) { + /* when we are here, we lost track of the tasks still present. + * this currently happens with mod_proxy_http2 when we shut + * down a h2_req_engine with tasks assigned... */ + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03056) "h2_mplx(%ld): 3. release_join with %d tasks", m->id, (int)h2_ihash_count(m->tasks)); h2_ihash_iter(m->tasks, task_print, m); + + while (!h2_ihash_iter(m->tasks, task_done_iter, m)) { + /* iterate until all tasks have been removed */ + } } + + /* 7. With all tasks done, the stream hold should be empty and all + * remaining streams are ready for purging */ ap_assert(h2_ihash_empty(m->shold)); - if (!h2_ihash_empty(m->spurge)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - "h2_mplx(%ld): 3. release_join %d streams to purge", - m->id, (int)h2_ihash_count(m->spurge)); - purge_streams(m); - } + purge_streams(m); - if (!h2_ihash_empty(m->tasks)) { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056) - "h2_mplx(%ld): release_join -> destroy, " - "%d tasks still present", - m->id, (int)h2_ihash_count(m->tasks)); - } - leave_mutex(m, acquired); + /* 8. close the h2_req_enginge shed and self destruct */ + h2_ngn_shed_destroy(m->ngn_shed); + m->ngn_shed = NULL; h2_mplx_destroy(m); - /* all gone */ } return status; } @@ -1073,8 +1047,8 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) stream = h2_ihash_get(m->shold, task->stream_id); if (stream) { ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - "h2_mplx(%s): task_done, stream in hold", - task->id); + "h2_mplx(%s): task_done, stream %d in hold", + task->id, stream->id); /* We cannot destroy the stream here since this is * called from a worker thread and freeing memory pools * is only safe in the only thread using it (and its @@ -1088,14 +1062,16 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) task->id); task_destroy(m, task, 0); } - - if (m->join_wait) { - apr_thread_cond_signal(m->join_wait); - } } } } +static int task_done_iter(void *ctx, void *val) +{ + task_done((h2_mplx*)ctx, val, 0); + return 0; +} + void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask) { int acquired; @@ -1103,6 +1079,9 @@ void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask) if (enter_mutex(m, &acquired) == APR_SUCCESS) { task_done(m, task, NULL); --m->workers_busy; + if (m->join_wait) { + apr_thread_cond_signal(m->join_wait); + } if (ptask) { /* caller wants another task */ *ptask = next_stream_task(m); diff --git a/modules/http2/h2_ngn_shed.c b/modules/http2/h2_ngn_shed.c index 2b132f0a13..2f5b729617 100644 --- a/modules/http2/h2_ngn_shed.c +++ b/modules/http2/h2_ngn_shed.c @@ -352,6 +352,7 @@ void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn) "frozen=%d, aborting", shed->c->id, ngn->id, task->id, task->frozen); ngn_done_task(shed, ngn, task, 0, 1); + task->engine = task->assigned = NULL; } } if (!shed->aborted && (ngn->no_assigned > 1 || ngn->no_live > 1)) { @@ -371,3 +372,9 @@ void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn) apr_hash_set(shed->ngns, ngn->type, APR_HASH_KEY_STRING, NULL); ngn->done = 1; } + +void h2_ngn_shed_destroy(h2_ngn_shed *shed) +{ + ap_assert(apr_hash_count(shed->ngns) == 0); +} + diff --git a/modules/http2/h2_ngn_shed.h b/modules/http2/h2_ngn_shed.h index bcafc509b1..c6acbae253 100644 --- a/modules/http2/h2_ngn_shed.h +++ b/modules/http2/h2_ngn_shed.h @@ -51,6 +51,8 @@ h2_ngn_shed *h2_ngn_shed_create(apr_pool_t *pool, conn_rec *c, int default_capactiy, apr_size_t req_buffer_size); +void h2_ngn_shed_destroy(h2_ngn_shed *shed); + void h2_ngn_shed_set_ctx(h2_ngn_shed *shed, void *user_ctx); void *h2_ngn_shed_get_ctx(h2_ngn_shed *shed);