]> granicus.if.org Git - apache/commitdiff
reworked synching of session shutdown with worker threads, workers now stick to a...
authorStefan Eissing <icing@apache.org>
Mon, 4 Jan 2016 15:30:36 +0000 (15:30 +0000)
committerStefan Eissing <icing@apache.org>
Mon, 4 Jan 2016 15:30:36 +0000 (15:30 +0000)
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1722899 13f79535-47bb-0310-9956-ffa450edef68

15 files changed:
CHANGES
modules/http2/h2_config.c
modules/http2/h2_conn.c
modules/http2/h2_filter.c
modules/http2/h2_from_h1.c
modules/http2/h2_io.h
modules/http2/h2_mplx.c
modules/http2/h2_mplx.h
modules/http2/h2_session.c
modules/http2/h2_session.h
modules/http2/h2_task_queue.c
modules/http2/h2_task_queue.h
modules/http2/h2_version.h
modules/http2/h2_worker.c
modules/http2/h2_workers.c

diff --git a/CHANGES b/CHANGES
index af8a8bd16409a6b118b816f2901e97a96584a985..f81104bb4af30a2527f6cc636c77aea24e787e0f 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -1,6 +1,11 @@
                                                          -*- coding: utf-8 -*-
 Changes with Apache 2.5.0
 
+  *) mod_http2: reworked synching of session shutdown with worker threads. h2 
+     workers now stick to a session until no more reuqquest are tbd, keepalive 
+     handling revisited, users report problems with connection close without
+     GOAWAY frames. [Stefan Eissing]
+  
   *) mod_http2: Fixed several errors when connections are closed in the middle
      of requests, changed H2KeepAliveTimeout defaults to be the same as H2Timeout
      for synchronous MPMs and leave keepalive timeout to async MPMs default.
index 1ddb26570e2b2abdc8ebe542fc87430295222c15..1458f5a4759fc40cc9f59ad4aa28ee3a27ec71b5 100644 (file)
@@ -59,7 +59,7 @@ static h2_config defconf = {
     1,                      /* TLS cooldown secs */
     1,                      /* HTTP/2 server push enabled */
     NULL,                   /* map of content-type to priorities */
-    5,                      /* normal connection timeout */
+    -1,                     /* connection timeout */
     -1,                     /* keepalive timeout */
     0,                      /* stream timeout */
 };
index 29baa6d90e6b158455fe596872b352302030ff1e..d0b5ada215fbda666f08010a06cedd9386f88e27 100644 (file)
@@ -210,19 +210,17 @@ static apr_status_t h2_conn_process(h2_ctx *ctx)
 apr_status_t h2_conn_run(struct h2_ctx *ctx, conn_rec *c)
 {
     int mpm_state = 0;
-    apr_status_t status;
     do {
-        status = h2_conn_process(ctx);
+        h2_conn_process(ctx);
         
         if (ap_mpm_query(AP_MPMQ_MPM_STATE, &mpm_state)) {
             break;
         }
     } while (!async_mpm
-             && status == APR_SUCCESS
              && c->keepalive == AP_CONN_KEEPALIVE 
              && mpm_state != AP_MPMQ_STOPPING);
     
-    return status;
+    return DONE;
 }
 
 
index 1155ac4dad42fd3e4bd5cfef40bb55ff24c4a229..5ad3aed5d70a55d0bc42577ddcb86604f172a82e 100644 (file)
@@ -130,7 +130,7 @@ apr_status_t h2_filter_core_input(ap_filter_t* f,
             if (cin->timeout_secs > 0) {
                 apr_time_t t = apr_time_from_sec(cin->timeout_secs);
                 apr_socket_timeout_get(cin->socket, &saved_timeout);
-                apr_socket_timeout_set(cin->socket, H2MIN(t, saved_timeout));
+                apr_socket_timeout_set(cin->socket, t);
             }
         }
         ap_update_child_status_from_conn(f->c->sbh, SERVER_BUSY_READ, f->c);
index d6170c243ba717335f2756aa05836b3d775796b7..3e372c2518ce104751bb587b03d198b4c02ab34f 100644 (file)
@@ -258,7 +258,7 @@ static int uniq_field_values(void *d, const char *key, const char *val)
          */
         for (i = 0, strpp = (char **) values->elts; i < values->nelts;
              ++i, ++strpp) {
-            if (*strpp && ap_casecmpstr(*strpp, start) == 0) {
+            if (*strpp && apr_strnatcasecmp(*strpp, start) == 0) {
                 break;
             }
         }
@@ -408,7 +408,7 @@ static h2_response *create_response(h2_from_h1 *from_h1, request_rec *r)
         
         while (field && (token = ap_get_list_item(r->pool, &field)) != NULL) {
             for (i = 0; i < r->content_languages->nelts; ++i) {
-                if (!ap_casecmpstr(token, languages[i]))
+                if (!apr_strnatcasecmp(token, languages[i]))
                     break;
             }
             if (i == r->content_languages->nelts) {
index 22c71c21b1ab49d8fe545a6c419d4469b03710e5..647d30431e44b424e7e6e800188f5a7bea4cd948 100644 (file)
@@ -49,7 +49,8 @@ struct h2_io {
     apr_bucket_brigade *tmp;         /* temporary data for chunking */
 
     unsigned int orphaned       : 1; /* h2_stream is gone for this io */    
-    unsigned int task_done      : 1; /* h2_task has finished for this io */
+    unsigned int worker_started : 1; /* h2_worker started processing for this io */
+    unsigned int worker_done    : 1; /* h2_worker finished for this io */
     unsigned int request_body   : 1; /* iff request has body */
     unsigned int eos_in         : 1; /* input eos has been seen */
     unsigned int eos_in_written : 1; /* input eos has been forwarded */
index eba7cc900a8f35e66cb314d192d0a5b166e1670f..014f3ae37d9ea09d820c91cd20c99ea66d32862d 100644 (file)
@@ -15,6 +15,7 @@
 
 #include <assert.h>
 #include <stddef.h>
+#include <stdlib.h>
 
 #include <apr_atomic.h>
 #include <apr_thread_mutex.h>
@@ -75,8 +76,8 @@ static void h2_mplx_destroy(h2_mplx *m)
 {
     AP_DEBUG_ASSERT(m);
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                  "h2_mplx(%ld): destroy, refs=%d", 
-                  m->id, m->refs);
+                  "h2_mplx(%ld): destroy, ios=%d", 
+                  m->id, (int)h2_io_set_size(m->stream_ios));
     m->aborted = 1;
     if (m->ready_ios) {
         h2_io_set_destroy(m->ready_ios);
@@ -121,7 +122,6 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
     if (m) {
         m->id = c->id;
         APR_RING_ELEM_INIT(m, link);
-        m->refs = 1;
         m->c = c;
         apr_pool_create_ex(&m->pool, parent, NULL, allocator);
         if (!m->pool) {
@@ -148,33 +148,6 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
     return m;
 }
 
-static void release(h2_mplx *m, int lock)
-{
-    if (lock) {
-        apr_thread_mutex_lock(m->lock);
-        --m->refs;
-        if (m->join_wait) {
-            apr_thread_cond_signal(m->join_wait);
-        }
-        apr_thread_mutex_unlock(m->lock);
-    }
-    else {
-        --m->refs;
-    }
-}
-
-void h2_mplx_reference(h2_mplx *m)
-{
-    apr_thread_mutex_lock(m->lock);
-    ++m->refs;
-    apr_thread_mutex_unlock(m->lock);
-}
-
-void h2_mplx_release(h2_mplx *m)
-{
-    release(m, 1);
-}
-
 static void workers_register(h2_mplx *m)
 {
     /* Initially, there was ref count increase for this as well, but
@@ -240,8 +213,9 @@ static int io_stream_done(h2_mplx *m, h2_io *io, int rst_error)
 {
     /* Remove io from ready set, we will never submit it */
     h2_io_set_remove(m->ready_ios, io);
-    if (io->task_done || h2_tq_remove(m->q, io->id)) {
+    if (!io->worker_started || io->worker_done) {
         /* already finished or not even started yet */
+        h2_tq_remove(m->q, io->id);
         io_destroy(m, io, 1);
         return 0;
     }
@@ -264,32 +238,42 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
     workers_unregister(m);
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
-        int i;
+        int i, wait_secs = 5;
         
         /* disable WINDOW_UPDATE callbacks */
         h2_mplx_set_consumed_cb(m, NULL, NULL);
+        
         while (!h2_io_set_iter(m->stream_ios, stream_done_iter, m)) {
-            /* iterator until all h2_io have been orphaned or destroyed */
+            /* iterate until all ios have been orphaned or destroyed */
         }
     
-        release(m, 0);
-        for (i = 0; m->refs > 0; ++i) {
-            
+        /* Any remaining ios have handed out requests to workers that are
+         * not done yet. Any operation they do on their assigned stream ios will
+         * be errored ECONNRESET/ABORTED, so that should find out pretty soon.
+         */
+        for (i = 0; h2_io_set_size(m->stream_ios) > 0; ++i) {
             m->join_wait = wait;
             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                          "h2_mplx(%ld): release_join, refs=%d, waiting...", 
-                          m->id, m->refs);
+                          "h2_mplx(%ld): release_join, waiting on %d worker to report back", 
+                          m->id, (int)h2_io_set_size(m->stream_ios));
                           
-            status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(2));
+            status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs));
             if (APR_STATUS_IS_TIMEUP(status)) {
-                ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, m->c,
-                              "h2_mplx(%ld): release timeup %d, refs=%d, waiting...", 
-                              m->id, i, m->refs);
+                if (i > 0) {
+                    /* Oh, oh. Still we wait for assigned  workers to report that 
+                     * they are done. Unless we have a bug, a worker seems to be hanging. 
+                     * If we exit now, all will be deallocated and the worker, once 
+                     * it does return, will walk all over freed memory...
+                     */
+                    ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+                                  "h2_mplx(%ld): release, waiting for %d seconds now for "
+                                  "all h2_workers to return, have still %d requests outstanding", 
+                                  m->id, i*wait_secs, (int)h2_io_set_size(m->stream_ios));
+                }
             }
         }
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
-                      "h2_mplx(%ld): release_join -> destroy, (#ios=%ld)", 
-                      m->id, (long)h2_io_set_size(m->stream_ios));
+                      "h2_mplx(%ld): release_join -> destroy", m->id);
         apr_thread_mutex_unlock(m->lock);
         h2_mplx_destroy(m);
         /* all gone */
@@ -329,22 +313,51 @@ apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error)
     return status;
 }
 
-void h2_mplx_task_done(h2_mplx *m, int stream_id)
+static const h2_request *pop_request(h2_mplx *m)
 {
+    const h2_request *req = NULL;
+    int sid;
+    while (!req && (sid = h2_tq_shift(m->q)) > 0) {
+        h2_io *io = h2_io_set_get(m->stream_ios, sid);
+        if (io) {
+            req = io->request;
+            io->worker_started = 1;
+        }
+    }
+    return req;
+}
+
+void h2_mplx_request_done(h2_mplx **pm, int stream_id, const h2_request **preq)
+{
+    h2_mplx *m = *pm;
+    
     apr_status_t status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
-                      "h2_mplx(%ld): task(%d) done", m->id, stream_id);
+                      "h2_mplx(%ld): request(%d) done", m->id, stream_id);
         if (io) {
-            io->task_done = 1;
+            io->worker_done = 1;
             if (io->orphaned) {
                 io_destroy(m, io, 0);
+                if (m->join_wait) {
+                    apr_thread_cond_signal(m->join_wait);
+                }
             }
             else {
                 /* hang around until the stream deregisteres */
             }
         }
+        
+        if (preq) {
+            /* someone wants another request, if we have */
+            *preq = pop_request(m);
+        }
+        if (!preq || !*preq) {
+            /* No request to hand back to the worker, NULLify reference
+             * and decrement count */
+            *pm = NULL;
+        }
         apr_thread_mutex_unlock(m->lock);
     }
 }
@@ -356,9 +369,6 @@ apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block,
 {
     apr_status_t status; 
     AP_DEBUG_ASSERT(m);
-    if (m->aborted) {
-        return APR_ECONNABORTED;
-    }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
@@ -394,9 +404,6 @@ apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id,
 {
     apr_status_t status;
     AP_DEBUG_ASSERT(m);
-    if (m->aborted) {
-        return APR_ECONNABORTED;
-    }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
@@ -408,7 +415,7 @@ apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id,
             io_process_events(m, io);
         }
         else {
-            status = APR_EOF;
+            status = APR_ECONNABORTED;
         }
         apr_thread_mutex_unlock(m->lock);
     }
@@ -419,9 +426,6 @@ apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id)
 {
     apr_status_t status;
     AP_DEBUG_ASSERT(m);
-    if (m->aborted) {
-        return APR_ECONNABORTED;
-    }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
@@ -491,9 +495,6 @@ apr_status_t h2_mplx_out_readx(h2_mplx *m, int stream_id,
 {
     apr_status_t status;
     AP_DEBUG_ASSERT(m);
-    if (m->aborted) {
-        return APR_ECONNABORTED;
-    }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
@@ -523,9 +524,6 @@ apr_status_t h2_mplx_out_read_to(h2_mplx *m, int stream_id,
 {
     apr_status_t status;
     AP_DEBUG_ASSERT(m);
-    if (m->aborted) {
-        return APR_ECONNABORTED;
-    }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
@@ -552,14 +550,12 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_stream_set *streams)
 {
     apr_status_t status;
     h2_stream *stream = NULL;
+
     AP_DEBUG_ASSERT(m);
-    if (m->aborted) {
-        return NULL;
-    }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
         h2_io *io = h2_io_set_pop_highest_prio(m->ready_ios);
-        if (io) {
+        if (io && !m->aborted) {
             stream = h2_stream_set_get(streams, io->id);
             if (stream) {
                 if (io->rst_error) {
@@ -571,7 +567,6 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_stream_set *streams)
                     h2_stream_set_response(stream, io->response, io->bbout);
                     H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_next_submit_post");
                 }
-                
             }
             else {
                 /* We have the io ready, but the stream has gone away, maybe
@@ -583,7 +578,7 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_stream_set *streams)
                               "resetting io to close request processing",
                               m->id, io->id);
                 h2_io_make_orphaned(io, H2_ERR_STREAM_CLOSED);
-                if (io->task_done) {
+                if (!io->worker_started || io->worker_done) {
                     io_destroy(m, io, 1);
                 }
                 else {
@@ -676,17 +671,16 @@ apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response,
 {
     apr_status_t status;
     AP_DEBUG_ASSERT(m);
-    if (m->aborted) {
-        return APR_ECONNABORTED;
-    }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
-        status = out_open(m, stream_id, response, f, bb, iowait);
-        if (APLOGctrace1(m->c)) {
-            h2_util_bb_log(m->c, stream_id, APLOG_TRACE1, "h2_mplx_out_open", bb);
-        }
         if (m->aborted) {
-            return APR_ECONNABORTED;
+            status = APR_ECONNABORTED;
+        }
+        else {
+            status = out_open(m, stream_id, response, f, bb, iowait);
+            if (APLOGctrace1(m->c)) {
+                h2_util_bb_log(m->c, stream_id, APLOG_TRACE1, "h2_mplx_out_open", bb);
+            }
         }
         apr_thread_mutex_unlock(m->lock);
     }
@@ -700,33 +694,25 @@ apr_status_t h2_mplx_out_write(h2_mplx *m, int stream_id,
 {
     apr_status_t status;
     AP_DEBUG_ASSERT(m);
-    if (m->aborted) {
-        return APR_ECONNABORTED;
-    }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
-        if (!m->aborted) {
-            h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
-            if (io && !io->orphaned) {
-                status = out_write(m, io, f, bb, trailers, iowait);
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
-                              "h2_mplx(%ld-%d): write with trailers=%s", 
-                              m->id, io->id, trailers? "yes" : "no");
-                H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_write");
-                
-                have_out_data_for(m, stream_id);
-                if (m->aborted) {
-                    return APR_ECONNABORTED;
-                }
-            }
-            else {
-                status = APR_ECONNABORTED;
+        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+        if (io && !io->orphaned) {
+            status = out_write(m, io, f, bb, trailers, iowait);
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
+                          "h2_mplx(%ld-%d): write with trailers=%s", 
+                          m->id, io->id, trailers? "yes" : "no");
+            H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_write");
+            
+            have_out_data_for(m, stream_id);
+            if (m->aborted) {
+                return APR_ECONNABORTED;
             }
         }
-        
-        if (m->lock) {
-            apr_thread_mutex_unlock(m->lock);
+        else {
+            status = APR_ECONNABORTED;
         }
+        apr_thread_mutex_unlock(m->lock);
     }
     return status;
 }
@@ -735,44 +721,39 @@ apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id, apr_table_t *trailers)
 {
     apr_status_t status;
     AP_DEBUG_ASSERT(m);
-    if (m->aborted) {
-        return APR_ECONNABORTED;
-    }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
-        if (!m->aborted) {
-            h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
-            if (io && !io->orphaned) {
-                if (!io->response && !io->rst_error) {
-                    /* In case a close comes before a response was created,
-                     * insert an error one so that our streams can properly
-                     * reset.
-                     */
-                    h2_response *r = h2_response_die(stream_id, APR_EGENERAL, 
-                                                     io->request, m->pool);
-                    status = out_open(m, stream_id, r, NULL, NULL, NULL);
-                    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
-                                  "h2_mplx(%ld-%d): close, no response, no rst", 
-                                  m->id, io->id);
-                }
+        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+        if (io && !io->orphaned) {
+            if (!io->response && !io->rst_error) {
+                /* In case a close comes before a response was created,
+                 * insert an error one so that our streams can properly
+                 * reset.
+                 */
+                h2_response *r = h2_response_die(stream_id, APR_EGENERAL, 
+                                                 io->request, m->pool);
+                status = out_open(m, stream_id, r, NULL, NULL, NULL);
                 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
-                              "h2_mplx(%ld-%d): close with trailers=%s", 
-                              m->id, io->id, trailers? "yes" : "no");
-                status = h2_io_out_close(io, trailers);
-                H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_close");
-                
-                have_out_data_for(m, stream_id);
-                if (m->aborted) {
-                    /* if we were the last output, the whole session might
-                     * have gone down in the meantime.
-                     */
-                    return APR_SUCCESS;
-                }
+                              "h2_mplx(%ld-%d): close, no response, no rst", 
+                              m->id, io->id);
             }
-            else {
-                status = APR_ECONNABORTED;
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
+                          "h2_mplx(%ld-%d): close with trailers=%s", 
+                          m->id, io->id, trailers? "yes" : "no");
+            status = h2_io_out_close(io, trailers);
+            H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_close");
+            
+            have_out_data_for(m, stream_id);
+            if (m->aborted) {
+                /* if we were the last output, the whole session might
+                 * have gone down in the meantime.
+                 */
+                return APR_SUCCESS;
             }
         }
+        else {
+            status = APR_ECONNABORTED;
+        }
         apr_thread_mutex_unlock(m->lock);
     }
     return status;
@@ -782,26 +763,21 @@ apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error)
 {
     apr_status_t status;
     AP_DEBUG_ASSERT(m);
-    if (m->aborted) {
-        return APR_ECONNABORTED;
-    }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
-        if (!m->aborted) {
-            h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
-            if (io && !io->rst_error && !io->orphaned) {
-                h2_io_rst(io, error);
-                if (!io->response) {
-                        h2_io_set_add(m->ready_ios, io);
-                }
-                H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_rst");
-                
-                have_out_data_for(m, stream_id);
-                h2_io_signal(io, H2_IO_WRITE);
-            }
-            else {
-                status = APR_ECONNABORTED;
+        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+        if (io && !io->rst_error && !io->orphaned) {
+            h2_io_rst(io, error);
+            if (!io->response) {
+                h2_io_set_add(m->ready_ios, io);
             }
+            H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_rst");
+            
+            have_out_data_for(m, stream_id);
+            h2_io_signal(io, H2_IO_WRITE);
+        }
+        else {
+            status = APR_ECONNABORTED;
         }
         apr_thread_mutex_unlock(m->lock);
     }
@@ -813,14 +789,14 @@ int h2_mplx_in_has_eos_for(h2_mplx *m, int stream_id)
     int has_eos = 0;
     apr_status_t status;
     AP_DEBUG_ASSERT(m);
-    if (m->aborted) {
-        return 0;
-    }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
-        if (io) {
-            has_eos = io->orphaned || h2_io_in_has_eos_for(io);
+        if (io && !io->orphaned) {
+            has_eos = h2_io_in_has_eos_for(io);
+        }
+        else {
+            has_eos = 1;
         }
         apr_thread_mutex_unlock(m->lock);
     }
@@ -832,15 +808,15 @@ int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id)
     apr_status_t status;
     int has_data = 0;
     AP_DEBUG_ASSERT(m);
-    if (m->aborted) {
-        return 0;
-    }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
-        if (io) {
+        if (io && !io->orphaned) {
             has_data = h2_io_out_has_data(io);
         }
+        else {
+            has_data = 0;
+        }
         apr_thread_mutex_unlock(m->lock);
     }
     return has_data;
@@ -851,19 +827,21 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
 {
     apr_status_t status;
     AP_DEBUG_ASSERT(m);
-    if (m->aborted) {
-        return APR_ECONNABORTED;
-    }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
-        m->added_output = iowait;
-        status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
-        if (APLOGctrace2(m->c)) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
-                          "h2_mplx(%ld): trywait on data for %f ms)",
-                          m->id, timeout/1000.0);
-        }
-        m->added_output = NULL;
+        if (m->aborted) {
+            status = APR_ECONNABORTED;
+        }
+        else {
+            m->added_output = iowait;
+            status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
+            if (APLOGctrace2(m->c)) {
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+                              "h2_mplx(%ld): trywait on data for %f ms)",
+                              m->id, timeout/1000.0);
+            }
+            m->added_output = NULL;
+        }
         apr_thread_mutex_unlock(m->lock);
     }
     return status;
@@ -883,15 +861,17 @@ apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx)
     apr_status_t status;
     
     AP_DEBUG_ASSERT(m);
-    if (m->aborted) {
-        return APR_ECONNABORTED;
-    }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
-        h2_tq_sort(m->q, cmp, ctx);
-        
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                      "h2_mplx(%ld): reprioritize tasks", m->id);
+        if (m->aborted) {
+            status = APR_ECONNABORTED;
+        }
+        else {
+            h2_tq_sort(m->q, cmp, ctx);
+            
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                          "h2_mplx(%ld): reprioritize tasks", m->id);
+        }
         apr_thread_mutex_unlock(m->lock);
     }
     workers_register(m);
@@ -923,23 +903,25 @@ apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, const h2_request *req,
     apr_status_t status;
     
     AP_DEBUG_ASSERT(m);
-    if (m->aborted) {
-        return APR_ECONNABORTED;
-    }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
-        h2_io *io = open_io(m, stream_id);
-        io->request = req;
-
-        if (!io->request->body) {
-            status = h2_io_in_close(io);
+        if (m->aborted) {
+            status = APR_ECONNABORTED;
+        }
+        else {
+            h2_io *io = open_io(m, stream_id);
+            io->request = req;
+            
+            if (!io->request->body) {
+                status = h2_io_in_close(io);
+            }
+            
+            h2_tq_add(m->q, io->id, cmp, ctx);
+            
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
+                          "h2_mplx(%ld-%d): process", m->c->id, stream_id);
+            H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_process");
         }
-        
-        h2_tq_add(m->q, io->id, cmp, ctx);
-
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
-                      "h2_mplx(%ld-%d): process", m->c->id, stream_id);
-        H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_process");
         apr_thread_mutex_unlock(m->lock);
     }
     
@@ -955,20 +937,16 @@ const h2_request *h2_mplx_pop_request(h2_mplx *m, int *has_more)
     apr_status_t status;
     
     AP_DEBUG_ASSERT(m);
-    if (m->aborted) {
-        *has_more = 0;
-        return NULL;
-    }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
-        int sid;
-        while (!req && (sid = h2_tq_shift(m->q)) > 0) {
-            h2_io *io = h2_io_set_get(m->stream_ios, sid);
-            if (io) {
-                req = io->request;
-            }
+        if (m->aborted) {
+            req = NULL;
+            *has_more = 0;
+        }
+        else {
+            req = pop_request(m);
+            *has_more = !h2_tq_empty(m->q);
         }
-        *has_more = !h2_tq_empty(m->q);
         apr_thread_mutex_unlock(m->lock);
     }
     return req;
index cc791764ffbe19417149ae595f02e09cd721ee21..419f3f0ef9b096299a5194f0fdbae6278a86fc13 100644 (file)
@@ -18,7 +18,7 @@
 
 /**
  * The stream multiplexer. It pushes buckets from the connection
- * thread to the stream task threads and vice versa. It's thread-safe
+ * thread to the stream threads and vice versa. It's thread-safe
  * to use.
  *
  * There is one h2_mplx instance for each h2_session, which sits on top
@@ -100,16 +100,6 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *master,
                         const struct h2_config *conf, 
                         struct h2_workers *workers);
 
-/**
- * Increase the reference counter of this mplx.
- */
-void h2_mplx_reference(h2_mplx *m);
-
-/**
- * Decreases the reference counter of this mplx.
- */
-void h2_mplx_release(h2_mplx *m);
-
 /**
  * Decreases the reference counter of this mplx and waits for it
  * to reached 0, destroy the mplx afterwards.
@@ -122,11 +112,11 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, struct apr_thread_cond_t *wait
 
 /**
  * Aborts the multiplexer. It will answer all future invocation with
- * APR_ECONNABORTED, leading to early termination of ongoing tasks.
+ * APR_ECONNABORTED, leading to early termination of ongoing streams.
  */
 void h2_mplx_abort(h2_mplx *mplx);
 
-void h2_mplx_task_done(h2_mplx *m, int stream_id);
+void h2_mplx_request_done(h2_mplx **pm, int stream_id, const struct h2_request **preq);
 
 /*******************************************************************************
  * IO lifetime of streams.
@@ -170,7 +160,7 @@ apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, const struct h2_request
                              h2_stream_pri_cmp *cmp, void *ctx);
 
 /**
- * Stream priorities have changed, reschedule pending tasks.
+ * Stream priorities have changed, reschedule pending requests.
  * 
  * @param m the multiplexer
  * @param cmp the stream priority compare function
index 51b2743c31bfbcc104bee68e9faf0ff4d0dc5fa0..1dea34b93437fc570e515fe0a4a063c5dd4c6941 100644 (file)
@@ -762,7 +762,7 @@ static h2_session *h2_session_create_int(conn_rec *c,
     nghttp2_option *options = NULL;
 
     apr_pool_t *pool = NULL;
-    apr_status_t status = apr_pool_create(&pool, r? r->pool : c->pool);
+    apr_status_t status = apr_pool_create(&pool, c->pool);
     h2_session *session;
     if (status != APR_SUCCESS) {
         return NULL;
@@ -787,9 +787,12 @@ static h2_session *h2_session_create_int(conn_rec *c,
         session->max_stream_count = h2_config_geti(session->config, H2_CONF_MAX_STREAMS);
         session->max_stream_mem = h2_config_geti(session->config, H2_CONF_STREAM_MAX_MEM);
         session->timeout_secs = h2_config_geti(session->config, H2_CONF_TIMEOUT_SECS);
+        if (session->timeout_secs <= 0) {
+            session->timeout_secs = apr_time_sec(session->s->timeout);
+        }
         session->keepalive_secs = h2_config_geti(session->config, H2_CONF_KEEPALIVE_SECS);
         if (session->keepalive_secs <= 0) {
-            session->keepalive_secs = session->timeout_secs;
+            session->keepalive_secs = apr_time_sec(session->s->keep_alive_timeout);
         }
         
         status = apr_thread_cond_create(&session->iowait, session->pool);
@@ -858,7 +861,14 @@ static h2_session *h2_session_create_int(conn_rec *c,
             h2_session_destroy(session);
             return NULL;
         }
-        
+            
+        if (APLOGcdebug(c)) {
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c,
+                          "session(%ld) created, timeout=%d, keepalive_timeout=%d, "
+                          "max_streams=%d, stream_mem=%d",
+                          session->id, session->timeout_secs, session->keepalive_secs,
+                          (int)session->max_stream_count, (int)session->max_stream_mem);
+        }
     }
     return session;
 }
@@ -898,6 +908,7 @@ static apr_status_t h2_session_shutdown(h2_session *session, int reason)
                                   session->max_stream_received, 
                                   reason, NULL, 0);
             nghttp2_session_send(session->ngh2);
+            session->server_goaway = 1;
             ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
                           "session(%ld): shutdown, no err", session->id);
         }
@@ -908,6 +919,7 @@ static apr_status_t h2_session_shutdown(h2_session *session, int reason)
                                   reason, (const uint8_t *)err, 
                                   strlen(err));
             nghttp2_session_send(session->ngh2);
+            session->server_goaway = 1;
             ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
                           "session(%ld): shutdown, err=%d '%s'",
                           session->id, reason, err);
@@ -1653,10 +1665,6 @@ static apr_status_t h2_session_read(h2_session *session, int block, int loops)
                         /* common status for a client that has left */
                         ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
                                       "h2_session(%ld): input gone", session->id);
-                        /* Stolen from mod_reqtimeout to speed up lingering when
-                         * a read timeout happened.
-                         */
-                        apr_table_setn(session->c->notes, "short-lingering-close", "1");
                     }
                     else {
                         /* uncommon status, log on INFO so that we see this */
@@ -1731,6 +1739,8 @@ apr_status_t h2_session_process(h2_session *session, int async)
                 if (!h2_is_acceptable_connection(c, 1)) {
                     nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, 0,
                                           NGHTTP2_INADEQUATE_SECURITY, NULL, 0);
+                    nghttp2_session_send(session->ngh2);
+                    session->server_goaway = 1;
                 } 
                 
                 status = h2_session_start(session, &rv);
@@ -1770,6 +1780,7 @@ apr_status_t h2_session_process(h2_session *session, int async)
                 
             case H2_SESSION_ST_BUSY:
                 if (nghttp2_session_want_read(session->ngh2)) {
+                    h2_filter_cin_timeout_set(session->cin, session->timeout_secs);
                     status = h2_session_read(session, 0, 10);
                     if (status == APR_SUCCESS) {
                         /* got something, continue processing */
@@ -1898,28 +1909,31 @@ apr_status_t h2_session_process(h2_session *session, int async)
                 
             case H2_SESSION_ST_KEEPALIVE:
                 /* Our normal H2Timeout has passed and we are considering to
-                 * extend that with the H2KeepAliveTimeout. This works different
-                 * for async MPMs. */
+                 * extend that with the H2KeepAliveTimeout. */
                 remain_secs = session->keepalive_secs - session->timeout_secs;
-                if (!async && remain_secs <= 0) {
-                    /* not async, keepalive is smaller than normal timeout, close the session */
+                if (remain_secs <= 0) {
+                    /* keepalive is <= normal timeout, close the session */
                     reason = "keepalive expired";
                     h2_session_shutdown(session, 0);
                     goto out;
                 }
+                session->c->keepalive = AP_CONN_KEEPALIVE;
                 ap_update_child_status_from_conn(c->sbh, SERVER_BUSY_KEEPALIVE, c);
-                if (async && session->c->cs) {
+                
+                if ((apr_time_sec(session->s->keep_alive_timeout) >= remain_secs)
+                    && async && session->c->cs
+                    && !session->r) {
                     /* Async MPMs are able to handle keep-alive connections without
                      * blocking a thread. For this to happen, we need to return from
                      * processing, indicating the IO event we are waiting for, and
                      * may be called again if the event happens.
-                     * For now, we let the MPM handle any timing on this, so we
-                     * cannot really enforce the remain_secs here.
+                     * TODO: this does not properly GOAWAY connections...
+                     * TODO: This currently does not work on upgraded requests...
                      */
                     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
-                                  "h2_session(%ld): async KEEPALIVE -> BUSY", session->id);
-                    session->state = H2_SESSION_ST_BUSY;
-                    session->c->cs->sense = CONN_SENSE_WANT_READ;
+                                  "h2_session(%ld): async KEEPALIVE -> IDLE_READ", session->id);
+                    session->state = H2_SESSION_ST_IDLE_READ;
+                    session->c->cs->state = CONN_STATE_WRITE_COMPLETION;
                     reason = "async keepalive";
                     status = APR_SUCCESS;
                     goto out;
index 7bf316c1d44da8d96c758f53669111ef898f4f4a..b0652fa76824a0d7bd492dcfe4c7f38d06934b85 100644 (file)
@@ -76,6 +76,7 @@ typedef struct h2_session {
     unsigned int aborted       : 1; /* aborted processing, emergency exit */
     unsigned int reprioritize  : 1; /* scheduled streams priority changed */
     unsigned int client_goaway : 1; /* client sent us a GOAWAY */
+    unsigned int server_goaway : 1; /* we sent a GOAWAY */
     apr_interval_time_t  wait_us;   /* timout during BUSY_WAIT state, micro secs */
     
     int unsent_submits;             /* number of submitted, but not yet sent
@@ -94,6 +95,7 @@ typedef struct h2_session {
     
     apr_size_t max_stream_count;    /* max number of open streams */
     apr_size_t max_stream_mem;      /* max buffer memory for a single stream */
+    
     int timeout_secs;               /* connection timeout (seconds) */
     int keepalive_secs;             /* connection idle timeout (seconds) */
     
index 249307d5bf0144d870c2fb3148bfcc98153d36ee..2871cabcfe92aa2179f79ee52d1ddf8700d71089 100644 (file)
@@ -19,7 +19,6 @@
 #include <httpd.h>
 #include <http_core.h>
 
-#include "h2_task.h"
 #include "h2_task_queue.h"
 
 
@@ -129,7 +128,7 @@ static void tq_grow(h2_task_queue *q, int nlen)
 {
     AP_DEBUG_ASSERT(q->nalloc <= nlen);
     if (nlen > q->nalloc) {
-        int *nq = apr_pcalloc(q->pool, sizeof(h2_task *) * nlen);
+        int *nq = apr_pcalloc(q->pool, sizeof(int) * nlen);
         if (q->nelts > 0) {
             int l = ((q->head + q->nelts) % q->nalloc) - q->head;
             
index dcc46d037af7ab4b168de0b670bd4f793c1edf07..3ff1d3967d0a3838b631c018fb872c9765f60632 100644 (file)
@@ -16,8 +16,6 @@
 #ifndef __mod_h2__h2_task_queue__
 #define __mod_h2__h2_task_queue__
 
-struct h2_task;
-
 /**
  * h2_task_queue keeps a list of sorted h2_task* in ascending order.
  */
index 9c674b94808e60678af044c1032c36204f9e731e..ebdb24430c72cd26f787450340019fe16b9050b9 100644 (file)
 #ifndef mod_h2_h2_version_h
 #define mod_h2_h2_version_h
 
+#undef PACKAGE_VERSION
+#undef PACKAGE_TARNAME
+#undef PACKAGE_STRING
+#undef PACKAGE_NAME
+#undef PACKAGE_BUGREPORT
+
 /**
  * @macro
- * Version number of the h2 module as c string
+ * Version number of the http2 module as c string
  */
-#define MOD_HTTP2_VERSION "1.0.15-DEVa"
+#define MOD_HTTP2_VERSION "1.0.17"
 
 /**
  * @macro
- * Numerical representation of the version number of the h2 module
+ * Numerical representation of the version number of the http2 module
  * release. This is a 24 bit number with 8 bits for major number, 8 bits
  * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
  */
-#define MOD_HTTP2_VERSION_NUM 0x01000f
+#define MOD_HTTP2_VERSION_NUM 0x010011
 
 
 #endif /* mod_h2_h2_version_h */
index 8f988192a569a2d3c1a82c4e8550c906390ba603..9af2cd11dc1d61ddeda623899274462a135ca3b0 100644 (file)
 static void* APR_THREAD_FUNC execute(apr_thread_t *thread, void *wctx)
 {
     h2_worker *worker = (h2_worker *)wctx;
-    apr_status_t status = APR_SUCCESS;
-    h2_mplx *m;
-    const h2_request *req;
-    h2_task *task;
-    conn_rec *c, *master;
-    int stream_id;
+    apr_status_t status;
     
     (void)thread;
-    
-    /* Furthermore, other code might want to see the socket for
-     * this connection. Allocate one without further function...
+    /* Other code might want to see a socket for this connection this
+     * worker processes. Allocate one without further function...
      */
     status = apr_socket_create(&worker->socket,
                                APR_INET, SOCK_STREAM,
@@ -57,41 +51,47 @@ static void* APR_THREAD_FUNC execute(apr_thread_t *thread, void *wctx)
         return NULL;
     }
     
-    m = NULL;
     while (!worker->aborted) {
+        h2_mplx *m;
+        const h2_request *req;
+        
+        /* Get a h2_mplx + h2_request from the main workers queue. */
         status = worker->get_next(worker, &m, &req, worker->ctx);
         
-        if (req) {
-            stream_id = req->id;
-            master = m->c;
+        while (req) {
+            conn_rec *c, *master = m->c;
+            int stream_id = req->id;
+            
             c = h2_slave_create(master, worker->task_pool, 
                                 worker->thread, worker->socket);
             if (!c) {
                 ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, c,
-                              APLOGNO(02957) "h2_task(%s): error setting up slave connection", 
-                              task->id);
-                h2_mplx_out_rst(m, task->stream_id, H2_ERR_INTERNAL_ERROR);
+                              APLOGNO(02957) "h2_request(%ld-%d): error setting up slave connection", 
+                              m->id, stream_id);
+                h2_mplx_out_rst(m, stream_id, H2_ERR_INTERNAL_ERROR);
             }
             else {
+                h2_task *task;
+                
                 task = h2_task_create(m->id, req, worker->task_pool, m);
                 h2_ctx_create_for(c, task);
                 h2_task_do(task, c, worker->io, worker->socket);
+                task = NULL;
                 
                 apr_thread_cond_signal(worker->io);
             }
+            
+            /* clean our references and report request as done. Signal
+             * that we want another unless we have been aborted */
+            /* TODO: this will keep a worker attached to this h2_mplx as
+             * long as it has requests to handle. Might no be fair to
+             * other mplx's. Perhaps leave after n requests? */
+            req = NULL;
             apr_pool_clear(worker->task_pool);
-            /* task is gone */
-            task = NULL;
-            h2_mplx_task_done(m, stream_id);
+            h2_mplx_request_done(&m, stream_id, worker->aborted? NULL : &req);
         }
     }
 
-    if (m) {
-        /* Hand "m" back to other workers */
-        status = worker->get_next(worker, &m, NULL, worker->ctx);
-        m = NULL;
-    }
-    
     if (worker->socket) {
         apr_socket_close(worker->socket);
         worker->socket = NULL;
index 8c1626994936e0b917412037ff361041e678a1ca..cc0a8dc10c89e64cc977e1669d8693ade6ccda22 100644 (file)
@@ -71,40 +71,18 @@ static apr_status_t get_mplx_next(h2_worker *worker, h2_mplx **pm,
                                   const h2_request **preq, void *ctx)
 {
     apr_status_t status;
-    h2_mplx *m = NULL;
-    const h2_request *req = NULL;
     apr_time_t max_wait, start_wait;
-    int has_more = 0;
     h2_workers *workers = (h2_workers *)ctx;
     
-    if (*pm && preq != NULL) {
-        /* We have a h2_mplx instance and the worker wants the next task. 
-         * Try to get one from the given mplx. */
-        *preq = h2_mplx_pop_request(*pm, &has_more);
-        if (*preq) {
-            return APR_SUCCESS;
-        }
-    }
-    
-    if (*pm) {
-        /* Got a mplx handed in, but did not get or want a task from it. 
-         * Release it, as the workers reference will be wiped.
-         */
-        h2_mplx_release(*pm);
-        *pm = NULL;
-    }
-    
-    if (!preq) {
-        /* the worker does not want a next task, we're done.
-         */
-        return APR_SUCCESS;
-    }
-    
     max_wait = apr_time_from_sec(apr_atomic_read32(&workers->max_idle_secs));
     start_wait = apr_time_now();
     
     status = apr_thread_mutex_lock(workers->lock);
     if (status == APR_SUCCESS) {
+        const h2_request *req = NULL;
+        h2_mplx *m = NULL;
+        int has_more = 0;
+
         ++workers->idle_worker_count;
         ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
                      "h2_worker(%d): looking for work", h2_worker_get_id(worker));
@@ -138,7 +116,7 @@ static apr_status_t get_mplx_next(h2_worker *worker, h2_mplx **pm,
             }
             
             if (!req) {
-                /* Need to wait for either a new mplx to arrive.
+                /* Need to wait for a new mplx to arrive.
                  */
                 cleanup_zombies(workers, 0);
                 
@@ -178,10 +156,6 @@ static apr_status_t get_mplx_next(h2_worker *worker, h2_mplx **pm,
             ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
                          "h2_worker(%d): start request(%ld-%d)",
                          h2_worker_get_id(worker), m->id, req->id);
-            /* Since we hand out a reference to the worker, we increase
-             * its ref count.
-             */
-            h2_mplx_reference(m);
             *pm = m;
             *preq = req;