]> granicus.if.org Git - apache/commitdiff
mod_http2: connection shutdown revisited
authorStefan Eissing <icing@apache.org>
Thu, 27 Oct 2016 16:12:20 +0000 (16:12 +0000)
committerStefan Eissing <icing@apache.org>
Thu, 27 Oct 2016 16:12:20 +0000 (16:12 +0000)
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1766851 13f79535-47bb-0310-9956-ffa450edef68

CHANGES
modules/http2/h2_mplx.c
modules/http2/h2_ngn_shed.c
modules/http2/h2_ngn_shed.h

diff --git a/CHANGES b/CHANGES
index 63fe4d4c8ca05268af0e011f3c2a1ce1e64a9d80..3323a7e8f691d8c63879f9f969ccab172d3a94ed 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -1,6 +1,10 @@
                                                          -*- coding: utf-8 -*-
 Changes with Apache 2.5.0
 
+  *) mod_http2: connection shutdown revisited: corrected edge cases on
+     shutting down ongoing streams, changed log warnings to be less noisy
+     when waiting on long running tasks. [Stefan Eissing]
+     
   *) mod_http2: changed all AP_DEBUG_ASSERT to ap_assert to have them 
      available also in normal deployments. [Stefan Eissing]
      
index 705857b3a53936563d6c51c78a483389a3afb90c..0cff7b60fa1613153be833273d20c92bd82271de 100644 (file)
@@ -221,7 +221,6 @@ static void purge_streams(h2_mplx *m)
             /* repeat until empty */
         }
         h2_ihash_clear(m->spurge);
-        ap_assert(h2_ihash_empty(m->spurge));
     }
 }
 
@@ -507,7 +506,7 @@ static int task_print(void *ctx, void *val)
     if (task) {
         h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
 
-        ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */
                       "->03198: h2_stream(%s): %s %s %s"
                       "[orph=%d/started=%d/done=%d/frozen=%d]", 
                       task->id, task->request->method, 
@@ -516,11 +515,11 @@ static int task_print(void *ctx, void *val)
                       task->worker_done, task->frozen);
     }
     else if (task) {
-        ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */
                       "->03198: h2_stream(%ld-%d): NULL", m->id, task->stream_id);
     }
     else {
-        ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */
                       "->03198: h2_stream(%ld-NULL): NULL", m->id);
     }
     return 1;
@@ -529,14 +528,16 @@ static int task_print(void *ctx, void *val)
 static int task_abort_connection(void *ctx, void *val)
 {
     h2_task *task = val;
-    if (task->c) {
-        task->c->aborted = 1;
-    }
-    if (task->input.beam) {
-        h2_beam_abort(task->input.beam);
-    }
-    if (task->output.beam) {
-        h2_beam_abort(task->output.beam);
+    if (!task->worker_done) { 
+        if (task->c) {
+            task->c->aborted = 1;
+        }
+        if (task->input.beam) {
+            h2_beam_abort(task->input.beam);
+        }
+        if (task->output.beam) {
+            h2_beam_abort(task->output.beam);
+        }
     }
     return 1;
 }
@@ -545,124 +546,97 @@ static int report_stream_iter(void *ctx, void *val) {
     h2_mplx *m = ctx;
     h2_stream *stream = val;
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                  "h2_mplx(%ld-%d): exists, started=%d, scheduled=%d, "
-                  "ready=%d", 
+                  "h2_mplx(%ld-%d): exists, started=%d, scheduled=%d, ready=%d", 
                   m->id, stream->id, stream->started, stream->scheduled,
                   h2_stream_is_ready(stream));
     return 1;
 }
 
+static int task_done_iter(void *ctx, void *val);
+
 apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
 {
     apr_status_t status;
     int acquired;
 
+    /* How to shut down a h2 connection:
+     * 1. tell the workers that no more tasks will come from us */
     h2_workers_unregister(m->workers, m);
     
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        int i, wait_secs = 5;
+        int i, wait_secs = 60;
 
-        if (!h2_ihash_empty(m->streams) && APLOGctrace1(m->c)) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                          "h2_mplx(%ld): release_join with %d streams open, "
-                          "%d streams ready, %d tasks", 
-                          m->id, (int)h2_ihash_count(m->streams),
-                          (int)h2_ihash_count(m->sready), 
-                          (int)h2_ihash_count(m->tasks));
-            h2_ihash_iter(m->streams, report_stream_iter, m);
-        }
-        
-        /* disable WINDOW_UPDATE callbacks */
+        /* 2. disable WINDOW_UPDATEs and set the mplx to aborted, clear
+         *    our TODO list and purge any streams we have collected */
         h2_mplx_set_consumed_cb(m, NULL, NULL);
-        
-        if (!h2_ihash_empty(m->shold)) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
-                          "h2_mplx(%ld): start release_join with %d streams in hold", 
-                          m->id, (int)h2_ihash_count(m->shold));
-        }
-        if (!h2_ihash_empty(m->spurge)) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
-                          "h2_mplx(%ld): start release_join with %d streams to purge", 
-                          m->id, (int)h2_ihash_count(m->spurge));
-        }
-        
+        h2_mplx_abort(m);
         h2_iq_clear(m->q);
+        purge_streams(m);
+
+        /* 3. mark all slave connections as aborted and wakeup all sleeping 
+         *    tasks. Mark all still active streams as 'done'. m->streams has to
+         *    be empty afterwards with streams either in
+         *    a) m->shold because a task is still active
+         *    b) m->spurge because task is done, or was not started */
+        h2_ihash_iter(m->tasks, task_abort_connection, m);
         apr_thread_cond_broadcast(m->task_thawed);
         while (!h2_ihash_iter(m->streams, stream_done_iter, m)) {
             /* iterate until all streams have been removed */
         }
         ap_assert(h2_ihash_empty(m->streams));
-    
-        if (!h2_ihash_empty(m->shold)) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
-                          "h2_mplx(%ld): 2. release_join with %d streams in "
-                          "hold, %d workers busy, %d tasks", 
-                          m->id, (int)h2_ihash_count(m->shold),
-                          m->workers_busy,  
-                          (int)h2_ihash_count(m->tasks));
-        }
-        if (!h2_ihash_empty(m->spurge)) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
-                          "h2_mplx(%ld): 2. release_join with %d streams to purge", 
-                          m->id, (int)h2_ihash_count(m->spurge));
-        }
+
+        /* 4. purge all streams we collected by marking them 'done' */
+        purge_streams(m);
         
-        /* If we still have busy workers, we cannot release our memory
-         * pool yet, as tasks have references to us.
-         * Any operation on the task slave connection will from now on
-         * be errored ECONNRESET/ABORTED, so processing them should fail 
-         * and workers *should* return in a timely fashion.
-         */
+        /* 5. while workers are busy on this connection, meaning they
+         *    are processing tasks from this connection, wait on them finishing
+         *    to wake us and check again. Eventually, this has to succeed. */    
+        m->join_wait = wait;
         for (i = 0; m->workers_busy > 0; ++i) {
-            h2_ihash_iter(m->tasks, task_abort_connection, m);
-            
-            m->join_wait = wait;
             status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs));
             
             if (APR_STATUS_IS_TIMEUP(status)) {
-                if (i > 0) {
-                    /* Oh, oh. Still we wait for assigned  workers to report that 
-                     * they are done. Unless we have a bug, a worker seems to be hanging. 
-                     * If we exit now, all will be deallocated and the worker, once 
-                     * it does return, will walk all over freed memory...
-                     */
-                    ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03198)
-                                  "h2_mplx(%ld): release, waiting for %d seconds now for "
-                                  "%d h2_workers to return, have still %d tasks outstanding", 
-                                  m->id, i*wait_secs, m->workers_busy,
-                                  (int)h2_ihash_count(m->tasks));
-                    if (i == 1) {
-                        h2_ihash_iter(m->tasks, task_print, m);
-                    }
-                }
-                h2_mplx_abort(m);
-                apr_thread_cond_broadcast(m->task_thawed);
+                /* This can happen if we have very long running requests
+                 * that do not time out on IO. */
+                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03198)
+                              "h2_mplx(%ld): release, waiting for %d seconds now for "
+                              "%d h2_workers to return, have still %d tasks outstanding", 
+                              m->id, i*wait_secs, m->workers_busy,
+                              (int)h2_ihash_count(m->tasks));
+                h2_ihash_iter(m->shold, report_stream_iter, m);
+                h2_ihash_iter(m->tasks, task_print, m);
             }
+            purge_streams(m);
         }
+        m->join_wait = NULL;
         
-        if (!h2_ihash_empty(m->tasks) && APLOGctrace1(m->c)) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+        /* 6. All workers for this connection are done, we are in 
+         * single-threaded processing now effectively. */
+        leave_mutex(m, acquired);
+
+        if (!h2_ihash_empty(m->tasks)) {
+            /* when we are here, we lost track of the tasks still present.
+             * this currently happens with mod_proxy_http2 when we shut
+             * down a h2_req_engine with tasks assigned... */ 
+            ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,  APLOGNO(03056)
                           "h2_mplx(%ld): 3. release_join with %d tasks",
                           m->id, (int)h2_ihash_count(m->tasks));
             h2_ihash_iter(m->tasks, task_print, m);
+            
+            while (!h2_ihash_iter(m->tasks, task_done_iter, m)) {
+                /* iterate until all tasks have been removed */
+            }
         }
+
+        /* 7. With all tasks done, the stream hold should be empty and all
+         *    remaining streams are ready for purging */
         ap_assert(h2_ihash_empty(m->shold));
-        if (!h2_ihash_empty(m->spurge)) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
-                          "h2_mplx(%ld): 3. release_join %d streams to purge", 
-                          m->id, (int)h2_ihash_count(m->spurge));
-            purge_streams(m);
-        }
+        purge_streams(m);
         
-        if (!h2_ihash_empty(m->tasks)) {
-            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
-                          "h2_mplx(%ld): release_join -> destroy, "
-                          "%d tasks still present", 
-                          m->id, (int)h2_ihash_count(m->tasks));
-        }
-        leave_mutex(m, acquired);
+        /* 8. close the h2_req_enginge shed and self destruct */
+        h2_ngn_shed_destroy(m->ngn_shed);
+        m->ngn_shed = NULL;
         h2_mplx_destroy(m);
-        /* all gone */
     }
     return status;
 }
@@ -1073,8 +1047,8 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
             stream = h2_ihash_get(m->shold, task->stream_id);
             if (stream) {
                 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
-                              "h2_mplx(%s): task_done, stream in hold", 
-                              task->id);
+                              "h2_mplx(%s): task_done, stream %d in hold", 
+                              task->id, stream->id);
                 /* We cannot destroy the stream here since this is 
                  * called from a worker thread and freeing memory pools
                  * is only safe in the only thread using it (and its
@@ -1088,14 +1062,16 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
                               task->id);
                 task_destroy(m, task, 0);
             }
-            
-            if (m->join_wait) {
-                apr_thread_cond_signal(m->join_wait);
-            }
         }
     }
 }
 
+static int task_done_iter(void *ctx, void *val)
+{
+    task_done((h2_mplx*)ctx, val, 0);
+    return 0;
+}
+
 void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask)
 {
     int acquired;
@@ -1103,6 +1079,9 @@ void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask)
     if (enter_mutex(m, &acquired) == APR_SUCCESS) {
         task_done(m, task, NULL);
         --m->workers_busy;
+        if (m->join_wait) {
+            apr_thread_cond_signal(m->join_wait);
+        }
         if (ptask) {
             /* caller wants another task */
             *ptask = next_stream_task(m);
index 2b132f0a13f6a43aadf05f6ebe67477e81fd7040..2f5b7296177214f3129ec7441a457b6331144e34 100644 (file)
@@ -352,6 +352,7 @@ void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn)
                           "frozen=%d, aborting",
                           shed->c->id, ngn->id, task->id, task->frozen);
             ngn_done_task(shed, ngn, task, 0, 1);
+            task->engine = task->assigned = NULL;
         }
     }
     if (!shed->aborted && (ngn->no_assigned > 1 || ngn->no_live > 1)) {
@@ -371,3 +372,9 @@ void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn)
     apr_hash_set(shed->ngns, ngn->type, APR_HASH_KEY_STRING, NULL);
     ngn->done = 1;
 }
+
+void h2_ngn_shed_destroy(h2_ngn_shed *shed)
+{
+    ap_assert(apr_hash_count(shed->ngns) == 0);
+}
+
index bcafc509b18a9ee65c1fb8b0edb6776be74d68aa..c6acbae253cb712ec335ecd230434be0d96b3e75 100644 (file)
@@ -51,6 +51,8 @@ h2_ngn_shed *h2_ngn_shed_create(apr_pool_t *pool, conn_rec *c,
                                 int default_capactiy, 
                                 apr_size_t req_buffer_size); 
 
+void h2_ngn_shed_destroy(h2_ngn_shed *shed);
+
 void h2_ngn_shed_set_ctx(h2_ngn_shed *shed, void *user_ctx);
 void *h2_ngn_shed_get_ctx(h2_ngn_shed *shed);