]> granicus.if.org Git - apache/commitdiff
merge of 1752145,1753498,1753541,1754129,1754414,1754534,1755323,1756844,1757524...
authorStefan Eissing <icing@apache.org>
Wed, 24 Aug 2016 15:23:12 +0000 (15:23 +0000)
committerStefan Eissing <icing@apache.org>
Wed, 24 Aug 2016 15:23:12 +0000 (15:23 +0000)
mod_http2: backport of latest changes
 - intermediate responses
 - graceful shutdown of connections
 - ht debug draft update

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/branches/2.4.x@1757542 13f79535-47bb-0310-9956-ffa450edef68

17 files changed:
CHANGES
modules/http2/h2.h
modules/http2/h2_filter.c
modules/http2/h2_from_h1.c
modules/http2/h2_mplx.c
modules/http2/h2_mplx.h
modules/http2/h2_ngn_shed.c
modules/http2/h2_request.c
modules/http2/h2_response.c
modules/http2/h2_response.h
modules/http2/h2_session.c
modules/http2/h2_stream.c
modules/http2/h2_stream.h
modules/http2/h2_task.c
modules/http2/h2_task.h
modules/http2/h2_util.c
modules/http2/h2_version.h

diff --git a/CHANGES b/CHANGES
index 4cfc8ec78c2a116c52d064467fa26854e41d6bd2..d23249bf9d67ecfb2e2a5ef9856c0ca2f6bdd413 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -2,6 +2,13 @@
 
 Changes with Apache 2.4.24
 
+  *) mod_http2: h2 status resource follows latest draft, see
+     http://www.ietf.org/id/draft-benfield-http2-debug-state-01.txt
+     [Stefan Eissing]
+     
+  *) mod_http2: handling graceful shutdown gracefully, e.g. handling existing
+     streams to the end. [Stefan Eissing]
+  
   *) core: CVE-2016-5387: Mitigate [f]cgi "httpoxy" issues.
      [Dominic Scheirlinck <dominic vendhq.com>, Yann Ylavic]
 
index 9075b00a7935ca8a26a16f0581f16af6303d1589..0492fe35b6ecb2f65d3b46916cc8d60c734af4be 100644 (file)
@@ -95,8 +95,6 @@ typedef enum {
     H2_SESSION_ST_IDLE,             /* nothing to write, expecting data inc */
     H2_SESSION_ST_BUSY,             /* read/write without stop */
     H2_SESSION_ST_WAIT,             /* waiting for tasks reporting back */
-    H2_SESSION_ST_LOCAL_SHUTDOWN,   /* we announced GOAWAY */
-    H2_SESSION_ST_REMOTE_SHUTDOWN,  /* client announced GOAWAY */
 } h2_session_state;
 
 typedef struct h2_session_props {
@@ -106,6 +104,7 @@ typedef struct h2_session_props {
     apr_uint32_t emitted_max;       /* the highest local stream id sent */
     apr_uint32_t error;             /* the last session error encountered */
     unsigned int accepting : 1;     /* if the session is accepting new streams */
+    unsigned int shutdown : 1;      /* if the final GOAWAY has been sent */
 } h2_session_props;
 
 
@@ -146,6 +145,8 @@ struct h2_response {
     apr_off_t   content_length;
     apr_table_t *headers;
     apr_table_t *trailers;
+    struct h2_response *next;
+    
     const char  *sos_filter;
 };
 
index 33189de0164c4c4ee96fb0239cd8ef6404f0641d..ce94b52ed6e0ca229e0d932ca62b8ab863937793 100644 (file)
@@ -15,6 +15,7 @@
 
 #include <assert.h>
 
+#include <apr_strings.h>
 #include <httpd.h>
 #include <http_core.h>
 #include <http_log.h>
@@ -23,6 +24,7 @@
 
 #include "h2_private.h"
 #include "h2.h"
+#include "h2_config.h"
 #include "h2_conn_io.h"
 #include "h2_ctx.h"
 #include "h2_mplx.h"
@@ -31,6 +33,7 @@
 #include "h2_stream.h"
 #include "h2_request.h"
 #include "h2_response.h"
+#include "h2_stream.h"
 #include "h2_session.h"
 #include "h2_util.h"
 #include "h2_version.h"
@@ -209,15 +212,138 @@ static apr_status_t bbout(apr_bucket_brigade *bb, const char *fmt, ...)
     return rv;
 }
 
-static apr_status_t h2_status_stream_filter(h2_stream *stream)
+static void add_settings(apr_bucket_brigade *bb, h2_session *s, int last) 
 {
-    h2_session *session = stream->session;
-    h2_mplx *mplx = session->mplx;
-    conn_rec *c = session->c;
-    h2_push_diary *diary;
+    h2_mplx *m = s->mplx;
+    
+    bbout(bb, "  \"settings\": {\n");
+    bbout(bb, "    \"SETTINGS_MAX_CONCURRENT_STREAMS\": %d,\n", m->max_streams); 
+    bbout(bb, "    \"SETTINGS_MAX_FRAME_SIZE\": %d,\n", 16*1024); 
+    bbout(bb, "    \"SETTINGS_INITIAL_WINDOW_SIZE\": %d,\n",
+          h2_config_geti(s->config, H2_CONF_WIN_SIZE));
+    bbout(bb, "    \"SETTINGS_ENABLE_PUSH\": %d\n", h2_session_push_enabled(s)); 
+    bbout(bb, "  }%s\n", last? "" : ",");
+}
+
+static void add_peer_settings(apr_bucket_brigade *bb, h2_session *s, int last) 
+{
+    bbout(bb, "  \"peerSettings\": {\n");
+    bbout(bb, "    \"SETTINGS_MAX_CONCURRENT_STREAMS\": %d,\n", 
+        nghttp2_session_get_remote_settings(s->ngh2, NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS)); 
+    bbout(bb, "    \"SETTINGS_MAX_FRAME_SIZE\": %d,\n", 
+        nghttp2_session_get_remote_settings(s->ngh2, NGHTTP2_SETTINGS_MAX_FRAME_SIZE)); 
+    bbout(bb, "    \"SETTINGS_INITIAL_WINDOW_SIZE\": %d,\n", 
+        nghttp2_session_get_remote_settings(s->ngh2, NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE)); 
+    bbout(bb, "    \"SETTINGS_ENABLE_PUSH\": %d,\n", 
+        nghttp2_session_get_remote_settings(s->ngh2, NGHTTP2_SETTINGS_ENABLE_PUSH)); 
+    bbout(bb, "    \"SETTINGS_HEADER_TABLE_SIZE\": %d,\n", 
+        nghttp2_session_get_remote_settings(s->ngh2, NGHTTP2_SETTINGS_HEADER_TABLE_SIZE)); 
+    bbout(bb, "    \"SETTINGS_MAX_HEADER_LIST_SIZE\": %d\n", 
+        nghttp2_session_get_remote_settings(s->ngh2, NGHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE)); 
+    bbout(bb, "  }%s\n", last? "" : ",");
+}
+
+typedef struct {
     apr_bucket_brigade *bb;
+    h2_session *s;
+    int idx;
+} stream_ctx_t;
+
+static int add_stream(h2_stream *stream, void *ctx)
+{
+    stream_ctx_t *x = ctx;
+    int32_t flowIn, flowOut;
+    
+    flowIn = nghttp2_session_get_stream_effective_local_window_size(x->s->ngh2, stream->id); 
+    flowOut = nghttp2_session_get_stream_remote_window_size(x->s->ngh2, stream->id);
+    bbout(x->bb, "%s\n    \"%d\": {\n", (x->idx? "," : ""), stream->id);
+    bbout(x->bb, "    \"state\": \"%s\",\n", h2_stream_state_str(stream));
+    bbout(x->bb, "    \"created\": %f,\n", ((double)stream->created)/APR_USEC_PER_SEC);
+    bbout(x->bb, "    \"flowIn\": %d,\n", flowIn);
+    bbout(x->bb, "    \"flowOut\": %d,\n", flowOut);
+    bbout(x->bb, "    \"dataIn\": %"APR_UINT64_T_FMT",\n", stream->in_data_octets);  
+    bbout(x->bb, "    \"dataOut\": %"APR_UINT64_T_FMT"\n", stream->out_data_octets);  
+    bbout(x->bb, "    }");
+    
+    ++x->idx;
+    return 1;
+} 
+
+static void add_streams(apr_bucket_brigade *bb, h2_session *s, int last) 
+{
+    stream_ctx_t x;
+    
+    x.bb = bb;
+    x.s = s;
+    x.idx = 0;
+    bbout(bb, "  \"streams\": {");
+    h2_mplx_stream_do(s->mplx, add_stream, &x);
+    bbout(bb, "\n  }%s\n", last? "" : ",");
+}
+
+static void add_push(apr_bucket_brigade *bb, h2_session *s, 
+                     h2_stream *stream, int last) 
+{
+    h2_push_diary *diary;
     apr_status_t status;
     
+    bbout(bb, "    \"push\": {\n");
+    diary = s->push_diary;
+    if (diary) {
+        const char *data;
+        const char *base64_digest;
+        apr_size_t len;
+        
+        status = h2_push_diary_digest_get(diary, bb->p, 256, 
+                                          stream->request->authority, 
+                                          &data, &len);
+        if (status == APR_SUCCESS) {
+            base64_digest = h2_util_base64url_encode(data, len, bb->p);
+            bbout(bb, "      \"cacheDigest\": \"%s\",\n", base64_digest);
+        }
+    }
+    bbout(bb, "      \"promises\": %d,\n", s->pushes_promised);
+    bbout(bb, "      \"submits\": %d,\n", s->pushes_submitted);
+    bbout(bb, "      \"resets\": %d\n", s->pushes_reset);
+    bbout(bb, "    }%s\n", last? "" : ",");
+}
+
+static void add_in(apr_bucket_brigade *bb, h2_session *s, int last) 
+{
+    bbout(bb, "    \"in\": {\n");
+    bbout(bb, "      \"requests\": %d,\n", s->remote.emitted_count);
+    bbout(bb, "      \"resets\": %d, \n", s->streams_reset);
+    bbout(bb, "      \"frames\": %ld,\n", (long)s->frames_received);
+    bbout(bb, "      \"octets\": %"APR_UINT64_T_FMT"\n", s->io.bytes_read);
+    bbout(bb, "    }%s\n", last? "" : ",");
+}
+
+static void add_out(apr_bucket_brigade *bb, h2_session *s, int last) 
+{
+    bbout(bb, "    \"out\": {\n");
+    bbout(bb, "      \"responses\": %d,\n", s->responses_submitted);
+    bbout(bb, "      \"frames\": %ld,\n", (long)s->frames_sent);
+    bbout(bb, "      \"octets\": %"APR_UINT64_T_FMT"\n", s->io.bytes_written);
+    bbout(bb, "    }%s\n", last? "" : ",");
+}
+
+static void add_stats(apr_bucket_brigade *bb, h2_session *s, 
+                     h2_stream *stream, int last) 
+{
+    bbout(bb, "  \"stats\": {\n");
+    add_in(bb, s, 0);
+    add_out(bb, s, 0);
+    add_push(bb, s, stream, 1);
+    bbout(bb, "  }%s\n", last? "" : ",");
+}
+
+static apr_status_t h2_status_stream_filter(h2_stream *stream)
+{
+    h2_session *s = stream->session;
+    conn_rec *c = s->c;
+    apr_bucket_brigade *bb;
+    int32_t connFlowIn, connFlowOut;
+    
     if (!stream->response) {
         return APR_EINVAL;
     }
@@ -230,50 +356,24 @@ static apr_status_t h2_status_stream_filter(h2_stream *stream)
     apr_table_unset(stream->response->headers, "Content-Length");
     stream->response->content_length = -1;
     
+    connFlowIn = nghttp2_session_get_effective_local_window_size(s->ngh2); 
+    connFlowOut = nghttp2_session_get_remote_window_size(s->ngh2);
+    apr_table_setn(stream->response->headers, "conn-flow-in", 
+                   apr_itoa(stream->pool, connFlowIn));
+    apr_table_setn(stream->response->headers, "conn-flow-out", 
+                   apr_itoa(stream->pool, connFlowOut));
+     
     bbout(bb, "{\n");
-    bbout(bb, "  \"HTTP2\": \"on\",\n");
-    bbout(bb, "  \"H2PUSH\": \"%s\",\n", h2_session_push_enabled(session)? "on" : "off");
-    bbout(bb, "  \"mod_http2_version\": \"%s\",\n", MOD_HTTP2_VERSION);
-    bbout(bb, "  \"session_id\": %ld,\n", (long)session->id);
-    bbout(bb, "  \"streams_max\": %d,\n", (int)session->max_stream_count);
-    bbout(bb, "  \"this_stream\": %d,\n", stream->id);
-    bbout(bb, "  \"streams_open\": %d,\n", (int)h2_ihash_count(session->streams));
-    bbout(bb, "  \"max_stream_started\": %d,\n", mplx->max_stream_started);
-    bbout(bb, "  \"requests_received\": %d,\n", session->remote.emitted_count);
-    bbout(bb, "  \"responses_submitted\": %d,\n", session->responses_submitted);
-    bbout(bb, "  \"streams_reset\": %d, \n", session->streams_reset);
-    bbout(bb, "  \"pushes_promised\": %d,\n", session->pushes_promised);
-    bbout(bb, "  \"pushes_submitted\": %d,\n", session->pushes_submitted);
-    bbout(bb, "  \"pushes_reset\": %d,\n", session->pushes_reset);
+    bbout(bb, "  \"version\": \"draft-01\",\n");
+    add_settings(bb, s, 0);
+    add_peer_settings(bb, s, 0);
+    bbout(bb, "  \"connFlowIn\": %d,\n", connFlowIn);
+    bbout(bb, "  \"connFlowOut\": %d,\n", connFlowOut);
+    bbout(bb, "  \"sentGoAway\": %d,\n", s->local.shutdown);
+
+    add_streams(bb, s, 0);
     
-    diary = session->push_diary;
-    if (diary) {
-        const char *data;
-        const char *base64_digest;
-        apr_size_t len;
-        
-        status = h2_push_diary_digest_get(diary, stream->pool, 256, 
-                                          stream->request->authority, &data, &len);
-        if (status == APR_SUCCESS) {
-            base64_digest = h2_util_base64url_encode(data, len, stream->pool);
-            bbout(bb, "  \"cache_digest\": \"%s\",\n", base64_digest);
-        }
-        
-        /* try the reverse for testing purposes */
-        status = h2_push_diary_digest_set(diary, stream->request->authority, data, len);
-        if (status == APR_SUCCESS) {
-            status = h2_push_diary_digest_get(diary, stream->pool, 256, 
-                                              stream->request->authority, &data, &len);
-            if (status == APR_SUCCESS) {
-                base64_digest = h2_util_base64url_encode(data, len, stream->pool);
-                bbout(bb, "  \"cache_digest^2\": \"%s\",\n", base64_digest);
-            }
-        }
-    }
-    bbout(bb, "  \"frames_received\": %ld,\n", (long)session->frames_received);
-    bbout(bb, "  \"frames_sent\": %ld,\n", (long)session->frames_sent);
-    bbout(bb, "  \"bytes_received\": %"APR_UINT64_T_FMT",\n", session->io.bytes_read);
-    bbout(bb, "  \"bytes_sent\": %"APR_UINT64_T_FMT"\n", session->io.bytes_written);
+    add_stats(bb, s, stream, 1);
     bbout(bb, "}\n");
     
     return APR_SUCCESS;
index 0f893ec139da574c9030dad4111f59d42f13592d..876ec58bfb5172209836f24137e8b3dab95b01ed 100644 (file)
@@ -471,7 +471,8 @@ static h2_response *create_response(h2_from_h1 *from_h1, request_rec *r)
                      (void *) headers, r->headers_out, NULL);
     }
     
-    return h2_response_rcreate(from_h1->stream_id, r, headers, r->pool);
+    return h2_response_rcreate(from_h1->stream_id, r, r->status, 
+                               headers, r->pool);
 }
 
 apr_status_t h2_response_output_filter(ap_filter_t *f, apr_bucket_brigade *bb)
index fd6bf6ba29d386750e79fa900147fd2e257912f1..c1de2699b32023cba24564b2360837db63e48d58 100644 (file)
@@ -196,12 +196,17 @@ static int purge_stream(void *ctx, void *val)
 {
     h2_mplx *m = ctx;
     h2_stream *stream = val;
-    h2_task *task = h2_ihash_get(m->tasks, stream->id);
-    h2_ihash_remove(m->spurge, stream->id);
+    int stream_id = stream->id;
+    h2_task *task = h2_ihash_get(m->tasks, stream_id);
+    
+    h2_ihash_remove(m->spurge, stream_id);
     h2_stream_destroy(stream);
     if (task) {
         task_destroy(m, task, 1);
     }
+    /* FIXME: task_destroy() might in some twisted way place the
+     * stream in the spurge hash again. Remove it last. */
+    h2_ihash_remove(m->spurge, stream_id);
     return 0;
 }
 
@@ -212,6 +217,7 @@ static void purge_streams(h2_mplx *m)
             /* repeat until empty */
         }
         h2_ihash_clear(m->spurge);
+        AP_DEBUG_ASSERT(h2_ihash_empty(m->spurge));
     }
 }
 
@@ -467,6 +473,33 @@ static int stream_done_iter(void *ctx, void *val)
     return 0;
 }
 
+typedef struct {
+    h2_mplx_stream_cb *cb;
+    void *ctx;
+} stream_iter_ctx_t;
+
+static int stream_iter_wrap(void *ctx, void *stream)
+{
+    stream_iter_ctx_t *x = ctx;
+    return x->cb(stream, x->ctx);
+}
+
+apr_status_t h2_mplx_stream_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx)
+{
+    apr_status_t status;
+    int acquired;
+    
+    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+        stream_iter_ctx_t x;
+        x.cb = cb;
+        x.ctx = ctx;
+        h2_ihash_iter(m->streams, stream_iter_wrap, &x);
+        
+        leave_mutex(m, acquired);
+    }
+    return status;
+}
+
 static int task_print(void *ctx, void *val)
 {
     h2_mplx *m = ctx;
@@ -477,13 +510,13 @@ static int task_print(void *ctx, void *val)
 
         ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
                       "->03198: h2_stream(%s): %s %s %s -> %s %d"
-                      "[orph=%d/started=%d/done=%d]", 
+                      "[orph=%d/started=%d/done=%d/frozen=%d]", 
                       task->id, task->request->method, 
                       task->request->authority, task->request->path,
                       task->response? "http" : (task->rst_error? "reset" : "?"),
                       task->response? task->response->http_status : task->rst_error,
                       (stream? 0 : 1), task->worker_started, 
-                      task->worker_done);
+                      task->worker_done, task->frozen);
     }
     else if (task) {
         ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
@@ -566,8 +599,11 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
     
         if (!h2_ihash_empty(m->shold)) {
             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
-                          "h2_mplx(%ld): 2. release_join with %d streams in hold", 
-                          m->id, (int)h2_ihash_count(m->shold));
+                          "h2_mplx(%ld): 2. release_join with %d streams in "
+                          "hold, %d workers busy, %d tasks", 
+                          m->id, (int)h2_ihash_count(m->shold),
+                          m->workers_busy,  
+                          (int)h2_ihash_count(m->tasks));
         }
         if (!h2_ihash_empty(m->spurge)) {
             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
@@ -608,6 +644,12 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
             }
         }
         
+        if (!h2_ihash_empty(m->tasks) && APLOGctrace1(m->c)) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                          "h2_mplx(%ld): 3. release_join with %d tasks",
+                          m->id, (int)h2_ihash_count(m->tasks));
+            h2_ihash_iter(m->tasks, task_print, m);
+        }
         AP_DEBUG_ASSERT(h2_ihash_empty(m->shold));
         if (!h2_ihash_empty(m->spurge)) {
             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
@@ -615,7 +657,6 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
                           m->id, (int)h2_ihash_count(m->spurge));
             purge_streams(m);
         }
-        AP_DEBUG_ASSERT(h2_ihash_empty(m->spurge));
         
         if (!h2_ihash_empty(m->tasks)) {
             ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
@@ -675,13 +716,15 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response)
         return APR_ECONNABORTED;
     }
     
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                  "h2_mplx(%s): open response: %d, rst=%d",
+    status = h2_task_add_response(task, response);
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
+                  "h2_mplx(%s): add response: %d, rst=%d",
                   task->id, response->http_status, response->rst_error);
+    if (status != APR_SUCCESS) {
+        return status;
+    }
     
-    h2_task_set_response(task, response);
-    
-    if (task->output.beam) {
+    if (task->output.beam && !task->output.opened) {
         h2_beam_buffer_size_set(task->output.beam, m->stream_max_mem);
         h2_beam_timeout_set(task->output.beam, m->stream_timeout);
         h2_beam_on_consumed(task->output.beam, stream_output_consumed, task);
@@ -690,6 +733,7 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response)
             h2_beam_on_file_beam(task->output.beam, can_beam_file, m);
         }
         h2_beam_mutex_set(task->output.beam, beam_enter, task->cond, m);
+        task->output.opened = 1;
     }
     
     h2_ihash_add(m->sready, stream);
@@ -1375,13 +1419,21 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m,
                               m->id, stream->id);
                 task = h2_ihash_get(m->tasks, stream->id);
                 if (task) {
-                    task->submitted = 1;
+                    task->response_sent = 1;
                     if (task->rst_error) {
                         h2_stream_rst(stream, task->rst_error);
                     }
                     else {
                         AP_DEBUG_ASSERT(task->response);
-                        h2_stream_set_response(stream, task->response, task->output.beam);
+                        status = h2_stream_add_response(stream, task->response, 
+                                                        task->output.beam);
+                        if (status != APR_SUCCESS) {
+                            h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
+                        }
+                        if (!h2_response_get_final(task->response)) {
+                            /* the final response needs still to arrive */
+                            task->response = NULL;
+                        }
                     }
                 }
                 else {
@@ -1449,7 +1501,7 @@ apr_status_t h2_mplx_suspend_stream(h2_mplx *m, int stream_id)
             if (stream->started && (!task || task->worker_done)) {
                 h2_ihash_add(m->sresume, stream);
             }
-            else {
+            else if (task->output.beam) {
                 /* register callback so that we can resume on new output */
                 h2_beam_on_produced(task->output.beam, output_produced, m);
             }
index 821e6d65dfcb9773d1dc8ea21770b15c1dbd0ecb..4af0ba3c1385ed7065f391c36b80d57213e6cdf6 100644 (file)
@@ -230,6 +230,11 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m,
 
 apr_status_t h2_mplx_suspend_stream(h2_mplx *m, int stream_id);
 
+
+typedef int h2_mplx_stream_cb(struct h2_stream *s, void *ctx);
+
+apr_status_t h2_mplx_stream_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx);
+
 /*******************************************************************************
  * Output handling of streams.
  ******************************************************************************/
index f0676421e7d2a362056a5168601e8f580db8b728..14e57a7aa3ec694bc92e4a7020fa2143fd52cd88 100644 (file)
@@ -168,6 +168,12 @@ apr_status_t h2_ngn_shed_push_task(h2_ngn_shed *shed, const char *ngn_type,
         return APR_EOF;
     }
     
+    if (task->assigned) {
+        --task->assigned->no_assigned;
+        --task->assigned->no_live;
+        task->assigned = NULL;
+    }
+    
     ngn = apr_hash_get(shed->ngns, ngn_type, APR_HASH_KEY_STRING);
     if (ngn && !ngn->shutdown) {
         /* this task will be processed in another thread,
@@ -178,7 +184,6 @@ apr_status_t h2_ngn_shed_push_task(h2_ngn_shed *shed, const char *ngn_type,
         if (!h2_task_is_detached(task)) {
             h2_task_freeze(task);
         }
-        /* FIXME: sometimes ngn is garbage, probly alread freed */
         ngn_add_task(ngn, task);
         ngn->no_assigned++;
         return APR_SUCCESS;
index d213e16790f29e664e366ae1fe9fe2035041c166..14ddb5466598f7dc76ae45e41b0a58121ea0ccb4 100644 (file)
@@ -249,7 +249,8 @@ request_rec *h2_request_create_rec(const h2_request *req, conn_rec *conn)
     request_rec *r;
     apr_pool_t *p;
     int access_status = HTTP_OK;    
-    
+    const char *expect;
+
     apr_pool_create(&p, conn->pool);
     apr_pool_tag(p, "request");
     r = apr_pcalloc(p, sizeof(request_rec));
@@ -327,6 +328,18 @@ request_rec *h2_request_create_rec(const h2_request *req, conn_rec *conn)
     /* we may have switched to another server */
     r->per_dir_config = r->server->lookup_defaults;
     
+    if (r && ((expect = apr_table_get(r->headers_in, "Expect")) != NULL)
+        && (expect[0] != '\0')) {
+        if (ap_cstr_casecmp(expect, "100-continue") == 0) {
+            r->expecting_100 = 1;
+            ap_add_input_filter("H2_CONTINUE", NULL, r, conn);
+        }
+        else {
+            r->status = HTTP_EXPECTATION_FAILED;
+            ap_send_error_response(r, 0);
+        }
+    }
+    
     /*
      * Add the HTTP_IN filter here to ensure that ap_discard_request_body
      * called by ap_die and by ap_send_error_response works correctly on
@@ -350,7 +363,7 @@ request_rec *h2_request_create_rec(const h2_request *req, conn_rec *conn)
         r = NULL;
         goto traceout;
     }
-    
+
     AP_READ_REQUEST_SUCCESS((uintptr_t)r, (char *)r->method, 
                             (char *)r->uri, (char *)r->server->defn_name, 
                             r->status);
index 4cafd3550e00bcb8aa946ba06df975833d5fc1f7..792e7e57e0bf192db0e90f7cc8c56ce03a089dc7 100644 (file)
@@ -133,7 +133,7 @@ h2_response *h2_response_create(int stream_id,
                                   parse_headers(hlines, pool), notes, pool);
 }
 
-h2_response *h2_response_rcreate(int stream_id, request_rec *r,
+h2_response *h2_response_rcreate(int stream_id, request_rec *r, int status,
                                  apr_table_t *header, apr_pool_t *pool)
 {
     h2_response *response = apr_pcalloc(pool, sizeof(h2_response));
@@ -142,9 +142,9 @@ h2_response *h2_response_rcreate(int stream_id, request_rec *r,
     }
     
     response->stream_id      = stream_id;
-    response->http_status    = r->status;
+    response->http_status    = status;
     response->content_length = -1;
-    response->headers        = header;
+    response->headers        = header? header : apr_table_make(pool, 5);
     response->sos_filter     = get_sos_filter(r->notes);
 
     check_clen(response, r, pool);
@@ -203,3 +203,17 @@ void h2_response_set_trailers(h2_response *response, apr_table_t *trailers)
     response->trailers = trailers;
 }
 
+int h2_response_is_final(h2_response *response)
+{
+    return response->http_status >= 200;
+}
+
+h2_response *h2_response_get_final(h2_response *response)
+{
+    for (/**/; response; response = response->next) {
+        if (h2_response_is_final(response)) {
+            return response;
+        }
+    }
+    return NULL;
+}
index ca57c532e631cbc8719763258ec75ef2b6c02429..5d1bf3767eaebf43b91787e20361cdfecbc126b9 100644 (file)
@@ -40,7 +40,7 @@ h2_response *h2_response_create(int stream_id,
  * @param header the headers of the response
  * @param pool the memory pool to use
  */
-h2_response *h2_response_rcreate(int stream_id, request_rec *r,
+h2_response *h2_response_rcreate(int stream_id, request_rec *r, int status, 
                                  apr_table_t *header, apr_pool_t *pool);
 
 /**
@@ -70,4 +70,7 @@ h2_response *h2_response_clone(apr_pool_t *pool, h2_response *from);
  */
 void h2_response_set_trailers(h2_response *response, apr_table_t *trailers);
 
+int h2_response_is_final(h2_response *response);
+h2_response *h2_response_get_final(h2_response *response);
+
 #endif /* defined(__mod_h2__h2_response__) */
index f32c79c98412cffb6c37b55ab7547e32fb23ace8..77369cb74cab1c3863710ae3e7d5be3220695bee 100644 (file)
@@ -75,7 +75,6 @@ static apr_status_t h2_session_receive(void *ctx,
                                        const char *data, apr_size_t len,
                                        apr_size_t *readlen);
 
-static int is_accepting_streams(h2_session *session); 
 static void dispatch_event(h2_session *session, h2_session_event_t ev, 
                              int err, const char *msg);
 
@@ -285,11 +284,6 @@ static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags,
     int rv;
     
     (void)flags;
-    if (!is_accepting_streams(session)) {
-        /* ignore */
-        return 0;
-    }
-    
     stream = get_stream(session, stream_id);
     if (!stream) {
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03064)
@@ -398,11 +392,6 @@ static int on_header_cb(nghttp2_session *ngh2, const nghttp2_frame *frame,
     apr_status_t status;
     
     (void)flags;
-    if (!is_accepting_streams(session)) {
-        /* just ignore */
-        return 0;
-    }
-    
     stream = get_stream(session, frame->hd.stream_id);
     if (!stream) {
         ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c,
@@ -519,9 +508,16 @@ static int on_frame_recv_cb(nghttp2_session *ng2s,
             }
             break;
         case NGHTTP2_GOAWAY:
-            session->remote.accepted_max = frame->goaway.last_stream_id;
-            session->remote.error = frame->goaway.error_code;
-            dispatch_event(session, H2_SESSION_EV_REMOTE_GOAWAY, 0, NULL);
+            if (frame->goaway.error_code == 0 
+                && frame->goaway.last_stream_id == ((1u << 31) - 1)) {
+                /* shutdown notice. Should not come from a client... */
+                session->remote.accepting = 0;
+            }
+            else {
+                session->remote.accepted_max = frame->goaway.last_stream_id;
+                dispatch_event(session, H2_SESSION_EV_REMOTE_GOAWAY, 
+                               frame->goaway.error_code, NULL);
+            }
             break;
         default:
             if (APLOGctrace2(session->c)) {
@@ -628,7 +624,8 @@ static int on_send_data_cb(nghttp2_session *ngh2,
         
     apr_brigade_cleanup(session->bbtmp);
     if (status == APR_SUCCESS) {
-        stream->data_frames_sent++;
+        stream->out_data_frames++;
+        stream->out_data_octets += length;
         return 0;
     }
     else {
@@ -714,12 +711,35 @@ static void h2_session_destroy(h2_session *session)
     }
 }
 
+static apr_status_t h2_session_shutdown_notice(h2_session *session)
+{
+    apr_status_t status;
+    
+    AP_DEBUG_ASSERT(session);
+    if (!session->local.accepting) {
+        return APR_SUCCESS;
+    }
+    
+    nghttp2_submit_shutdown_notice(session->ngh2);
+    session->local.accepting = 0;
+    status = nghttp2_session_send(session->ngh2);
+    if (status == APR_SUCCESS) {
+        status = h2_conn_io_flush(&session->io);
+    }
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO()
+                  "session(%ld): sent shutdown notice", session->id);
+    return status;
+}
+
 static apr_status_t h2_session_shutdown(h2_session *session, int error, 
                                         const char *msg, int force_close)
 {
     apr_status_t status = APR_SUCCESS;
     
     AP_DEBUG_ASSERT(session);
+    if (session->local.shutdown) {
+        return APR_SUCCESS;
+    }
     if (!msg && error) {
         msg = nghttp2_strerror(error);
     }
@@ -743,6 +763,8 @@ static apr_status_t h2_session_shutdown(h2_session *session, int error,
     nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, 
                           session->local.accepted_max, 
                           error, (uint8_t*)msg, msg? strlen(msg):0);
+    session->local.accepting = 0;
+    session->local.shutdown = 1;
     status = nghttp2_session_send(session->ngh2);
     if (status == APR_SUCCESS) {
         status = h2_conn_io_flush(&session->io);
@@ -772,8 +794,7 @@ static apr_status_t session_pool_cleanup(void *data)
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
                   "session(%ld): pool_cleanup", session->id);
     
-    if (session->state != H2_SESSION_ST_DONE 
-        && session->state != H2_SESSION_ST_LOCAL_SHUTDOWN) {
+    if (session->state != H2_SESSION_ST_DONE) {
         /* Not good. The connection is being torn down and we have
          * not sent a goaway. This is considered a protocol error and
          * the client has to assume that any streams "in flight" may have
@@ -1179,12 +1200,6 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
     return (ssize_t)nread;
 }
 
-typedef struct {
-    nghttp2_nv *nv;
-    size_t nvlen;
-    size_t offset;
-} nvctx_t;
-
 struct h2_stream *h2_session_push(h2_session *session, h2_stream *is,
                                   h2_push *push)
 {
@@ -1418,18 +1433,28 @@ static apr_status_t on_stream_response(void *ctx, int stream_id)
     if (!stream) {
         return APR_NOTFOUND;
     }
-    
-    response = h2_stream_get_response(stream);
-    AP_DEBUG_ASSERT(response || stream->rst_error);
-    
-    if (stream->submitted) {
-        rv = NGHTTP2_PROTOCOL_ERROR;
+    else if (!stream->response) {
+        int err = H2_STREAM_RST(stream, H2_ERR_PROTOCOL_ERROR);
+        
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03074)
+                      "h2_stream(%ld-%d): RST_STREAM, err=%d",
+                      session->id, stream->id, err);
+
+        rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
+                                       stream->id, err);
+        goto leave;
     }
-    else if (response && response->headers) {
+    
+    while ((response = h2_stream_get_unsent_response(stream)) != NULL) {
         nghttp2_data_provider provider, *pprovider = NULL;
         h2_ngheader *ngh;
         const h2_priority *prio;
         
+        if (stream->submitted) {
+            rv = NGHTTP2_PROTOCOL_ERROR;
+            goto leave;
+        }
+        
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03073)
                       "h2_stream(%ld-%d): submit response %d, REMOTE_WINDOW_SIZE=%u",
                       session->id, stream->id, response->http_status,
@@ -1457,7 +1482,9 @@ static apr_status_t on_stream_response(void *ctx, int stream_id)
          *    as the client, having this resource in its cache, might
          *    also have the pushed ones as well.
          */
-        if (stream->request && !stream->request->initiated_on
+        if (stream->request 
+            && !stream->request->initiated_on
+            && h2_response_is_final(response)
             && H2_HTTP_2XX(response->http_status)
             && h2_session_push_enabled(session)) {
             
@@ -1467,35 +1494,24 @@ static apr_status_t on_stream_response(void *ctx, int stream_id)
         prio = h2_stream_get_priority(stream);
         if (prio) {
             h2_session_set_prio(session, stream, prio);
-            /* no showstopper if that fails for some reason */
         }
         
         ngh = h2_util_ngheader_make_res(stream->pool, response->http_status, 
                                         response->headers);
         rv = nghttp2_submit_response(session->ngh2, response->stream_id,
                                      ngh->nv, ngh->nvlen, pprovider);
-    }
-    else {
-        int err = H2_STREAM_RST(stream, H2_ERR_PROTOCOL_ERROR);
+        stream->submitted = h2_response_is_final(response);
+        session->have_written = 1;
         
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03074)
-                      "h2_stream(%ld-%d): RST_STREAM, err=%d",
-                      session->id, stream->id, err);
-
-        rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
-                                       stream->id, err);
-    }
-    
-    stream->submitted = 1;
-    session->have_written = 1;
-    
-    if (stream->request && stream->request->initiated_on) {
-        ++session->pushes_submitted;
-    }
-    else {
-        ++session->responses_submitted;
+        if (stream->request && stream->request->initiated_on) {
+            ++session->pushes_submitted;
+        }
+        else {
+            ++session->responses_submitted;
+        }
     }
     
+leave:
     if (nghttp2_is_fatal(rv)) {
         status = APR_EGENERAL;
         dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, rv, nghttp2_strerror(rv));
@@ -1600,9 +1616,6 @@ static apr_status_t h2_session_read(h2_session *session, int block)
                  * status. */
                 return rstatus;
         }
-        if (!is_accepting_streams(session)) {
-            break;
-        }
         if ((session->io.bytes_read - read_start) > (64*1024)) {
             /* read enough in one go, give write a chance */
             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c,
@@ -1653,8 +1666,6 @@ static const char *StateNames[] = {
     "IDLE",      /* H2_SESSION_ST_IDLE */
     "BUSY",      /* H2_SESSION_ST_BUSY */
     "WAIT",      /* H2_SESSION_ST_WAIT */
-    "LSHUTDOWN", /* H2_SESSION_ST_LOCAL_SHUTDOWN */
-    "RSHUTDOWN", /* H2_SESSION_ST_REMOTE_SHUTDOWN */
 };
 
 static const char *state_name(h2_session_state state)
@@ -1665,18 +1676,6 @@ static const char *state_name(h2_session_state state)
     return StateNames[state];
 }
 
-static int is_accepting_streams(h2_session *session)
-{
-    switch (session->state) {
-        case H2_SESSION_ST_IDLE:
-        case H2_SESSION_ST_BUSY:
-        case H2_SESSION_ST_WAIT:
-            return 1;
-        default:
-            return 0;
-    }
-}
-
 static void update_child_status(h2_session *session, int status, const char *msg)
 {
     /* Assume that we also change code/msg when something really happened and
@@ -1698,7 +1697,12 @@ static void update_child_status(h2_session *session, int status, const char *msg
 static void transit(h2_session *session, const char *action, h2_session_state nstate)
 {
     if (session->state != nstate) {
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03078)
+        int loglvl = APLOG_DEBUG;
+        if ((session->state == H2_SESSION_ST_BUSY && nstate == H2_SESSION_ST_WAIT)
+            || (session->state == H2_SESSION_ST_WAIT && nstate == H2_SESSION_ST_BUSY)){
+            loglvl = APLOG_TRACE1;
+        }
+        ap_log_cerror(APLOG_MARK, loglvl, 0, session->c, APLOGNO(03078)
                       "h2_session(%ld): transit [%s] -- %s --> [%s]", session->id,
                       state_name(session->state), action, state_name(nstate));
         session->state = nstate;
@@ -1708,12 +1712,6 @@ static void transit(h2_session *session, const char *action, h2_session_state ns
                                               SERVER_BUSY_KEEPALIVE
                                               : SERVER_BUSY_READ), "idle");
                 break;
-            case H2_SESSION_ST_REMOTE_SHUTDOWN:
-                update_child_status(session, SERVER_CLOSING, "remote goaway");
-                break;
-            case H2_SESSION_ST_LOCAL_SHUTDOWN:
-                update_child_status(session, SERVER_CLOSING, "local goaway");
-                break;
             case H2_SESSION_ST_DONE:
                 update_child_status(session, SERVER_CLOSING, "done");
                 break;
@@ -1738,39 +1736,22 @@ static void h2_session_ev_init(h2_session *session, int arg, const char *msg)
 
 static void h2_session_ev_local_goaway(h2_session *session, int arg, const char *msg)
 {
-    session->local.accepting = 0;
     cleanup_streams(session);
-    switch (session->state) {
-        case H2_SESSION_ST_LOCAL_SHUTDOWN:
-            /* already did that? */
-            break;
-        case H2_SESSION_ST_IDLE:
-        case H2_SESSION_ST_REMOTE_SHUTDOWN:
-            /* all done */
-            transit(session, "local goaway", H2_SESSION_ST_DONE);
-            break;
-        default:
-            transit(session, "local goaway", H2_SESSION_ST_LOCAL_SHUTDOWN);
-            break;
+    if (!session->remote.shutdown) {
+        update_child_status(session, SERVER_CLOSING, "local goaway");
     }
+    transit(session, "local goaway", H2_SESSION_ST_DONE);
 }
 
 static void h2_session_ev_remote_goaway(h2_session *session, int arg, const char *msg)
 {
-    session->remote.accepting = 0;
-    cleanup_streams(session);
-    switch (session->state) {
-        case H2_SESSION_ST_REMOTE_SHUTDOWN:
-            /* already received that? */
-            break;
-        case H2_SESSION_ST_IDLE:
-        case H2_SESSION_ST_LOCAL_SHUTDOWN:
-            /* all done */
-            transit(session, "remote goaway", H2_SESSION_ST_DONE);
-            break;
-        default:
-            transit(session, "remote goaway", H2_SESSION_ST_REMOTE_SHUTDOWN);
-            break;
+    if (!session->remote.shutdown) {
+        session->remote.error = arg;
+        session->remote.accepting = 0;
+        session->remote.shutdown = 1;
+        cleanup_streams(session);
+        update_child_status(session, SERVER_CLOSING, "remote goaway");
+        transit(session, "remote goaway", H2_SESSION_ST_DONE);
     }
 }
 
@@ -1779,7 +1760,6 @@ static void h2_session_ev_conn_error(h2_session *session, int arg, const char *m
     switch (session->state) {
         case H2_SESSION_ST_INIT:
         case H2_SESSION_ST_DONE:
-        case H2_SESSION_ST_LOCAL_SHUTDOWN:
             /* just leave */
             transit(session, "conn error", H2_SESSION_ST_DONE);
             break;
@@ -1794,31 +1774,18 @@ static void h2_session_ev_conn_error(h2_session *session, int arg, const char *m
 
 static void h2_session_ev_proto_error(h2_session *session, int arg, const char *msg)
 {
-    switch (session->state) {
-        case H2_SESSION_ST_DONE:
-        case H2_SESSION_ST_LOCAL_SHUTDOWN:
-            /* just leave */
-            transit(session, "proto error", H2_SESSION_ST_DONE);
-            break;
-        
-        default:
-            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03402)
-                          "h2_session(%ld): proto error -> shutdown", session->id);
-            h2_session_shutdown(session, arg, msg, 0);
-            break;
+    if (!session->local.shutdown) {
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03402)
+                      "h2_session(%ld): proto error -> shutdown", session->id);
+        h2_session_shutdown(session, arg, msg, 0);
     }
 }
 
 static void h2_session_ev_conn_timeout(h2_session *session, int arg, const char *msg)
 {
-    switch (session->state) {
-        case H2_SESSION_ST_LOCAL_SHUTDOWN:
-            transit(session, "conn timeout", H2_SESSION_ST_DONE);
-            break;
-        default:
-            h2_session_shutdown(session, arg, msg, 1);
-            transit(session, "conn timeout", H2_SESSION_ST_DONE);
-            break;
+    transit(session, msg, H2_SESSION_ST_DONE);
+    if (!session->local.shutdown) {
+        h2_session_shutdown(session, arg, msg, 1);
     }
 }
 
@@ -1826,8 +1793,6 @@ static void h2_session_ev_no_io(h2_session *session, int arg, const char *msg)
 {
     switch (session->state) {
         case H2_SESSION_ST_BUSY:
-        case H2_SESSION_ST_LOCAL_SHUTDOWN:
-        case H2_SESSION_ST_REMOTE_SHUTDOWN:
             /* Nothing to READ, nothing to WRITE on the master connection.
              * Possible causes:
              * - we wait for the client to send us sth
@@ -1837,6 +1802,7 @@ static void h2_session_ev_no_io(h2_session *session, int arg, const char *msg)
             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
                           "h2_session(%ld): NO_IO event, %d streams open", 
                           session->id, session->open_streams);
+            h2_conn_io_flush(&session->io);
             if (session->open_streams > 0) {
                 if (has_unsubmitted_streams(session) 
                     || has_suspended_streams(session)) {
@@ -1860,7 +1826,7 @@ static void h2_session_ev_no_io(h2_session *session, int arg, const char *msg)
                     }
                 }
             }
-            else if (is_accepting_streams(session)) {
+            else if (session->local.accepting) {
                 /* When we have no streams, but accept new, switch to idle */
                 apr_time_t now = apr_time_now();
                 transit(session, "no io (keepalive)", H2_SESSION_ST_IDLE);
@@ -1923,26 +1889,17 @@ static void h2_session_ev_mpm_stopping(h2_session *session, int arg, const char
 {
     switch (session->state) {
         case H2_SESSION_ST_DONE:
-        case H2_SESSION_ST_LOCAL_SHUTDOWN:
             /* nop */
             break;
         default:
-            h2_session_shutdown(session, arg, msg, 0);
+            h2_session_shutdown_notice(session);
             break;
     }
 }
 
 static void h2_session_ev_pre_close(h2_session *session, int arg, const char *msg)
 {
-    switch (session->state) {
-        case H2_SESSION_ST_DONE:
-        case H2_SESSION_ST_LOCAL_SHUTDOWN:
-            /* nop */
-            break;
-        default:
-            h2_session_shutdown(session, arg, msg, 1);
-            break;
-    }
+    h2_session_shutdown(session, arg, msg, 1);
 }
 
 static void h2_session_ev_stream_open(h2_session *session, int arg, const char *msg)
@@ -2052,14 +2009,14 @@ apr_status_t h2_session_process(h2_session *session, int async)
         c->cs->state = CONN_STATE_WRITE_COMPLETION;
     }
     
-    while (1) {
+    while (session->state != H2_SESSION_ST_DONE) {
         trace = APLOGctrace3(c);
         session->have_read = session->have_written = 0;
 
-        if (!ap_mpm_query(AP_MPMQ_MPM_STATE, &mpm_state)) {
+        if (session->local.accepting 
+            && !ap_mpm_query(AP_MPMQ_MPM_STATE, &mpm_state)) {
             if (mpm_state == AP_MPMQ_STOPPING) {
                 dispatch_event(session, H2_SESSION_EV_MPM_STOPPING, 0, NULL);
-                break;
             }
         }
         
@@ -2188,8 +2145,6 @@ apr_status_t h2_session_process(h2_session *session, int async)
                 break;
                 
             case H2_SESSION_ST_BUSY:
-            case H2_SESSION_ST_LOCAL_SHUTDOWN:
-            case H2_SESSION_ST_REMOTE_SHUTDOWN:
                 if (nghttp2_session_want_read(session->ngh2)) {
                     ap_update_child_status(session->c->sbh, SERVER_BUSY_READ, NULL);
                     h2_filter_cin_timeout_set(session->cin, session->s->timeout);
@@ -2272,7 +2227,7 @@ apr_status_t h2_session_process(h2_session *session, int async)
                 else if (APR_STATUS_IS_TIMEUP(status)) {
                     /* go back to checking all inputs again */
                     transit(session, "wait cycle", session->local.accepting? 
-                            H2_SESSION_ST_BUSY : H2_SESSION_ST_LOCAL_SHUTDOWN);
+                            H2_SESSION_ST_BUSY : H2_SESSION_ST_DONE);
                 }
                 else if (APR_STATUS_IS_ECONNRESET(status) 
                          || APR_STATUS_IS_ECONNABORTED(status)) {
@@ -2288,10 +2243,6 @@ apr_status_t h2_session_process(h2_session *session, int async)
                 }
                 break;
                 
-            case H2_SESSION_ST_DONE:
-                status = APR_EOF;
-                goto out;
-                
             default:
                 ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EGENERAL, c,
                               APLOGNO(03080)
@@ -2321,11 +2272,12 @@ out:
         && (APR_STATUS_IS_EOF(status)
             || APR_STATUS_IS_ECONNRESET(status) 
             || APR_STATUS_IS_ECONNABORTED(status))) {
-            dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
-        }
+        dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+    }
 
-    status = (session->state == H2_SESSION_ST_DONE)? APR_EOF : APR_SUCCESS;
+    status = APR_SUCCESS;
     if (session->state == H2_SESSION_ST_DONE) {
+        status = APR_EOF;
         if (!session->eoc_written) {
             session->eoc_written = 1;
             h2_conn_io_write_eoc(&session->io, session);
@@ -2339,6 +2291,7 @@ apr_status_t h2_session_pre_close(h2_session *session, int async)
 {
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, 
                   "h2_session(%ld): pre_close", session->id);
-    dispatch_event(session, H2_SESSION_EV_PRE_CLOSE, 0, "timeout");
+    dispatch_event(session, H2_SESSION_EV_PRE_CLOSE, 0, 
+        (session->state == H2_SESSION_ST_IDLE)? "timeout" : NULL);
     return APR_SUCCESS;
 }
index 3a7f1a94a1895f7829cdab0e9e3ba43b88f4484d..9b4c017567726d79d5a69af7c467ee5f7d5eb93c 100644 (file)
@@ -181,6 +181,7 @@ h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session,
     h2_stream *stream = apr_pcalloc(pool, sizeof(h2_stream));
     
     stream->id        = id;
+    stream->created   = apr_time_now();
     stream->state     = H2_STREAM_ST_IDLE;
     stream->pool      = pool;
     stream->session   = session;
@@ -265,6 +266,16 @@ struct h2_response *h2_stream_get_response(h2_stream *stream)
     return stream->response;
 }
 
+struct h2_response *h2_stream_get_unsent_response(h2_stream *stream)
+{
+    h2_response *unsent = (stream->last_sent? 
+                           stream->last_sent->next : stream->response);
+    if (unsent) {
+        stream->last_sent = unsent;
+    }
+    return unsent;
+}
+
 apr_status_t h2_stream_set_request(h2_stream *stream, request_rec *r)
 {
     apr_status_t status;
@@ -462,6 +473,9 @@ apr_status_t h2_stream_write_data(h2_stream *stream,
     
     status = h2_beam_send(stream->input, stream->tmp, APR_BLOCK_READ);
     apr_brigade_cleanup(stream->tmp);
+    stream->in_data_frames++;
+    stream->in_data_octets += len;
+    
     return status;
 }
 
@@ -521,11 +535,12 @@ static apr_status_t fill_buffer(h2_stream *stream, apr_size_t amount)
     return status;
 }
 
-apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response,
+apr_status_t h2_stream_add_response(h2_stream *stream, h2_response *response,
                                     h2_bucket_beam *output)
 {
     apr_status_t status = APR_SUCCESS;
     conn_rec *c = stream->session->c;
+    h2_response **pr = &stream->response;
     
     if (!output_open(stream)) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
@@ -533,15 +548,29 @@ apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response,
                       stream->session->id, stream->id);
         return APR_ECONNRESET;
     }
+    if (stream->submitted) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+                      "h2_stream(%ld-%d): already submitted final response", 
+                      stream->session->id, stream->id);
+        return APR_ECONNRESET;
+    }
     
-    stream->response = response;
-    stream->output = output;
-    stream->buffer = apr_brigade_create(stream->pool, c->bucket_alloc);
+    /* append */
+    while (*pr) {
+        pr = &((*pr)->next);
+    }
+    *pr = response;
     
-    h2_stream_filter(stream);
-    if (stream->output) {
-        status = fill_buffer(stream, 0);
+    if (h2_response_is_final(response)) {
+        stream->output = output;
+        stream->buffer = apr_brigade_create(stream->pool, c->bucket_alloc);
+        
+        h2_stream_filter(stream);
+        if (stream->output) {
+            status = fill_buffer(stream, 0);
+        }
     }
+    
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
                   "h2_stream(%ld-%d): set_response(%d)", 
                   stream->session->id, stream->id, 
@@ -558,7 +587,7 @@ apr_status_t h2_stream_set_error(h2_stream *stream, int http_status)
     }
     response = h2_response_die(stream->id, http_status, stream->request, 
                                stream->pool);
-    return h2_stream_set_response(stream, response, NULL);
+    return h2_stream_add_response(stream, response, NULL);
 }
 
 static const apr_size_t DATA_CHUNK_SIZE = ((16*1024) - 100 - 9); 
@@ -576,6 +605,10 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream,
         return APR_ECONNRESET;
     }
 
+    if (!stream->buffer) {
+        return APR_EAGAIN;
+    }
+    
     if (*plen > 0) {
         requested = H2MIN(*plen, DATA_CHUNK_SIZE);
     }
@@ -659,7 +692,7 @@ apr_status_t h2_stream_submit_pushes(h2_stream *stream)
     int i;
     
     pushes = h2_push_collect_update(stream, stream->request, 
-                                    h2_stream_get_response(stream));
+                                    stream->response);
     if (pushes && !apr_is_empty_array(pushes)) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
                       "h2_stream(%ld-%d): found %d push candidates",
@@ -683,10 +716,8 @@ apr_table_t *h2_stream_get_trailers(h2_stream *stream)
 
 const h2_priority *h2_stream_get_priority(h2_stream *stream)
 {
-    h2_response *response = h2_stream_get_response(stream);
-    
-    if (response && stream->request && stream->request->initiated_on) {
-        const char *ctype = apr_table_get(response->headers, "content-type");
+    if (stream->response && stream->request && stream->request->initiated_on) {
+        const char *ctype = apr_table_get(stream->response->headers, "content-type");
         if (ctype) {
             /* FIXME: Not good enough, config needs to come from request->server */
             return h2_config_get_priority(stream->session->config, ctype);
@@ -695,3 +726,26 @@ const h2_priority *h2_stream_get_priority(h2_stream *stream)
     return NULL;
 }
 
+const char *h2_stream_state_str(h2_stream *stream)
+{
+    switch (stream->state) {
+        case H2_STREAM_ST_IDLE:
+            return "IDLE";
+        case H2_STREAM_ST_OPEN:
+            return "OPEN";
+        case H2_STREAM_ST_RESV_LOCAL:
+            return "RESERVED_LOCAL";
+        case H2_STREAM_ST_RESV_REMOTE:
+            return "RESERVED_REMOTE";
+        case H2_STREAM_ST_CLOSED_INPUT:
+            return "HALF_CLOSED_REMOTE";
+        case H2_STREAM_ST_CLOSED_OUTPUT:
+            return "HALF_CLOSED_LOCAL";
+        case H2_STREAM_ST_CLOSED:
+            return "CLOSED";
+        default:
+            return "UNKNOWN";
+            
+    }
+}
+
index f80f8115e71186d34f07d8d3f32181e4a0624589..b7eb5ed4a494e8cc6d3bba56870aea675aa1ed83 100644 (file)
@@ -43,6 +43,7 @@ typedef struct h2_stream h2_stream;
 
 struct h2_stream {
     int id;                     /* http2 stream id */
+    apr_time_t created;         /* when stream was created */
     h2_stream_state_t state;    /* http/2 state of this stream */
     struct h2_session *session; /* the session this stream belongs to */
     
@@ -52,6 +53,7 @@ struct h2_stream {
     int request_headers_added;  /* number of request headers added */
     
     struct h2_response *response;
+    struct h2_response *last_sent;
     struct h2_bucket_beam *output;
     apr_bucket_brigade *buffer;
     apr_bucket_brigade *tmp;
@@ -65,7 +67,10 @@ struct h2_stream {
     unsigned int submitted : 1; /* response HEADER has been sent */
     
     apr_off_t input_remaining;  /* remaining bytes on input as advertised via content-length */
-    apr_off_t data_frames_sent; /* # of DATA frames sent out for this stream */
+    apr_off_t out_data_frames;  /* # of DATA frames sent */
+    apr_off_t out_data_octets;  /* # of DATA octets (payload) sent */
+    apr_off_t in_data_frames;   /* # of DATA frames received */
+    apr_off_t in_data_octets;   /* # of DATA octets (payload) received */
 };
 
 
@@ -176,6 +181,7 @@ apr_status_t h2_stream_schedule(h2_stream *stream, int eos, int push_enabled,
 int h2_stream_is_scheduled(const h2_stream *stream);
 
 struct h2_response *h2_stream_get_response(h2_stream *stream);
+struct h2_response *h2_stream_get_unsent_response(h2_stream *stream);
 
 /**
  * Set the response for this stream. Invoked when all meta data for
@@ -186,7 +192,7 @@ struct h2_response *h2_stream_get_response(h2_stream *stream);
  * @param bb bucket brigade with output data for the stream. Optional,
  *        may be incomplete.
  */
-apr_status_t h2_stream_set_response(h2_stream *stream, 
+apr_status_t h2_stream_add_response(h2_stream *stream, 
                                     struct h2_response *response,
                                     struct h2_bucket_beam *output);
 
@@ -277,4 +283,10 @@ apr_status_t h2_stream_submit_pushes(h2_stream *stream);
  */
 const struct h2_priority *h2_stream_get_priority(h2_stream *stream);
 
+/**
+ * Return a textual representation of the stream state as in RFC 7540
+ * nomenclator, all caps, underscores.
+ */
+const char *h2_stream_state_str(h2_stream *stream);
+
 #endif /* defined(__mod_h2__h2_stream__) */
index 1893b12fad465474899c8c5ae18b7c1f41fb5fa1..75f376cf0ff4c6e549f98548a0cdef21d2ea80e0 100644 (file)
@@ -42,6 +42,7 @@
 #include "h2_h2.h"
 #include "h2_mplx.h"
 #include "h2_request.h"
+#include "h2_response.h"
 #include "h2_session.h"
 #include "h2_stream.h"
 #include "h2_task.h"
@@ -163,6 +164,21 @@ static apr_status_t input_read(h2_task *task, ap_filter_t* f,
         return APR_EOF;
     }
     
+    /*
+    if (f->r && f->r->expecting_100) {
+        ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, f->c,
+                      "h2_task(%s): need to send 100 Continue here", 
+                      task->id);
+        f->r->expecting_100 = 0;
+    }
+    if (task->r && task->r->expecting_100) {
+        ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, f->c,
+                      "h2_task2(%s): need to send 100 Continue here", 
+                      task->id);
+        task->r->expecting_100 = 0;
+    }
+    */
+
     /* Cleanup brigades from those nasty 0 length non-meta buckets
      * that apr_brigade_split_line() sometimes produces. */
     for (b = APR_BRIGADE_FIRST(task->input.bb);
@@ -314,10 +330,8 @@ static apr_status_t input_read(h2_task *task, ap_filter_t* f,
  * task output handling
  ******************************************************************************/
 
-static apr_status_t open_response(h2_task *task)
+static apr_status_t open_response(h2_task *task, h2_response *response)
 {
-    h2_response *response;
-    response = h2_from_h1_get_response(task->output.from_h1);
     if (!response) {
         /* This happens currently when ap_die(status, r) is invoked
          * by a read request filter. */
@@ -461,7 +475,8 @@ static apr_status_t output_write(h2_task *task, ap_filter_t* f,
         && (flush || h2_beam_get_mem_used(task->output.beam) > (32*1024))) {
         /* if we have enough buffered or we got a flush bucket, open
         * the response now. */
-        status = open_response(task);
+        status = open_response(task, 
+            h2_from_h1_get_response(task->output.from_h1));
         task->output.response_open = 1;
     }
     
@@ -473,7 +488,8 @@ static apr_status_t output_finish(h2_task *task)
     apr_status_t status = APR_SUCCESS;
     
     if (!task->output.response_open) {
-        status = open_response(task);
+        status = open_response(task,
+            h2_from_h1_get_response(task->output.from_h1));
         task->output.response_open = 1;
     }
     return status;
@@ -494,6 +510,33 @@ static apr_status_t h2_filter_stream_input(ap_filter_t* filter,
     return input_read(task, filter, brigade, mode, block, readbytes);
 }
 
+static apr_status_t h2_filter_continue(ap_filter_t* f,
+                                       apr_bucket_brigade* brigade,
+                                       ap_input_mode_t mode,
+                                       apr_read_type_e block,
+                                       apr_off_t readbytes)
+{
+    h2_task *task = h2_ctx_cget_task(f->c);
+    apr_status_t status;
+    
+    AP_DEBUG_ASSERT(task);
+    if (f->r->expecting_100 && ap_is_HTTP_SUCCESS(f->r->status)) {
+        h2_response *response;
+
+        response = h2_response_rcreate(task->stream_id, f->r, HTTP_CONTINUE, 
+                                       NULL, f->r->pool);
+        ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, f->r,
+                      "h2_task(%s): send 100 Continue", task->id);
+        status = open_response(task, response);
+        if (status != APR_SUCCESS) {
+            return status;
+        }
+        f->r->expecting_100 = 0;
+        apr_table_clear(f->r->headers_out);
+    }
+    return ap_get_brigade(f->next, brigade, mode, block, readbytes);
+}
+
 static apr_status_t h2_filter_stream_output(ap_filter_t* filter,
                                             apr_bucket_brigade* brigade)
 {
@@ -517,22 +560,23 @@ static apr_status_t h2_filter_read_response(ap_filter_t* filter,
  * task things
  ******************************************************************************/
  
-void h2_task_set_response(h2_task *task, h2_response *response) 
+apr_status_t h2_task_add_response(h2_task *task, h2_response *response) 
 {
     AP_DEBUG_ASSERT(response);
-    AP_DEBUG_ASSERT(!task->response);
     /* we used to clone the response into out own pool. But
      * we have much tighter control over the EOR bucket nowadays,
      * so just use the instance given */
+    response->next = task->response;
     task->response = response;
     if (response->rst_error) {
         h2_task_rst(task, response->rst_error);
     }
+    return APR_SUCCESS;
 }
 
 
 int h2_task_can_redo(h2_task *task) {
-    if (task->submitted
+    if (task->response_sent
         || (task->input.beam && h2_beam_was_received(task->input.beam)) 
         || !task->request) {
         /* cannot repeat that. */
@@ -591,6 +635,8 @@ void h2_task_register_hooks(void)
                               NULL, AP_FTYPE_PROTOCOL);
     ap_register_input_filter("H2_TO_H1", h2_filter_stream_input,
                              NULL, AP_FTYPE_NETWORK);
+    ap_register_input_filter("H2_CONTINUE", h2_filter_continue,
+                             NULL, AP_FTYPE_PROTOCOL);
     ap_register_output_filter("H1_TO_H2", h2_filter_stream_output,
                               NULL, AP_FTYPE_NETWORK);
     ap_register_output_filter("H1_TO_H2_RESP", h2_filter_read_response,
@@ -749,11 +795,16 @@ static apr_status_t h2_task_process_request(h2_task *task, conn_rec *c)
         }
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
                       "h2_task(%s): start process_request", task->id);
+        task->r = r;
+    
         ap_process_request(r);
         if (task->frozen) {
             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
                           "h2_task(%s): process_request frozen", task->id);
         }
+        else {
+            task->r = NULL;
+        }
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
                       "h2_task(%s): process_request done", task->id);
         
index 1086e053e99dc0f05abc3453b62a2c29642898cc..76e8780d2ac06d20203450f367953b2d406f6338 100644 (file)
@@ -70,6 +70,7 @@ struct h2_task {
     struct {
         struct h2_bucket_beam *beam;
         struct h2_from_h1 *from_h1;
+        unsigned int opened : 1;
         unsigned int response_open : 1;
         unsigned int copy_files : 1;
         apr_off_t written;
@@ -85,7 +86,7 @@ struct h2_task {
     unsigned int frozen         : 1;
     unsigned int blocking       : 1;
     unsigned int detached       : 1;
-    unsigned int submitted      : 1; /* response has been submitted to client */
+    unsigned int response_sent  : 1; /* a response has been sent to client */
     unsigned int worker_started : 1; /* h2_worker started processing for this io */
     unsigned int worker_done    : 1; /* h2_worker finished for this io */
     
@@ -105,7 +106,7 @@ void h2_task_destroy(h2_task *task);
 
 apr_status_t h2_task_do(h2_task *task, apr_thread_t *thread);
 
-void h2_task_set_response(h2_task *task, struct h2_response *response);
+apr_status_t h2_task_add_response(h2_task *task, struct h2_response *response);
 
 void h2_task_redo(h2_task *task);
 int h2_task_can_redo(h2_task *task);
index 8d1060e579a16b2585b665fae23c1975bd74cac1..84849551f5c8c60d84a390d150c1e1d1240d16cc 100644 (file)
@@ -1160,7 +1160,7 @@ typedef struct {
 #define H2_LIT_ARGS(a)      (a),H2_ALEN(a)
 
 static literal IgnoredRequestHeaders[] = {
-    H2_DEF_LITERAL("expect"),
+/*H2_DEF_LITERAL("expect"),*/
     H2_DEF_LITERAL("upgrade"),
     H2_DEF_LITERAL("connection"),
     H2_DEF_LITERAL("keep-alive"),
index 3ab3802395e82c01d215f0e2f4a6375f132a772f..b62b6c7a77f746acbc3d5f3cff0529b75d23e389 100644 (file)
@@ -26,7 +26,7 @@
  * @macro
  * Version number of the http2 module as c string
  */
-#define MOD_HTTP2_VERSION "1.5.13"
+#define MOD_HTTP2_VERSION "1.6.0"
 
 /**
  * @macro
@@ -34,7 +34,7 @@
  * release. This is a 24 bit number with 8 bits for major number, 8 bits
  * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
  */
-#define MOD_HTTP2_VERSION_NUM 0x01050d
+#define MOD_HTTP2_VERSION_NUM 0x010600
 
 
 #endif /* mod_h2_h2_version_h */