]> granicus.if.org Git - apache/blobdiff - modules/http2/h2_session.c
Fix spelling in comments and text files.
[apache] / modules / http2 / h2_session.c
index 1dea34b93437fc570e515fe0a4a063c5dd4c6941..8a7b49cd900a34fd5de8fa939f65cec8b65939e3 100644 (file)
  */
 
 #include <assert.h>
+#include <stddef.h>
 #include <apr_thread_cond.h>
 #include <apr_base64.h>
 #include <apr_strings.h>
 
+#include <ap_mpm.h>
+
 #include <httpd.h>
 #include <http_core.h>
 #include <http_config.h>
@@ -25,6 +28,7 @@
 #include <scoreboard.h>
 
 #include "h2_private.h"
+#include "h2.h"
 #include "h2_bucket_eoc.h"
 #include "h2_bucket_eos.h"
 #include "h2_config.h"
@@ -36,7 +40,6 @@
 #include "h2_request.h"
 #include "h2_response.h"
 #include "h2_stream.h"
-#include "h2_stream_set.h"
 #include "h2_from_h1.h"
 #include "h2_task.h"
 #include "h2_session.h"
 #include "h2_version.h"
 #include "h2_workers.h"
 
-#define H2MAX(x,y) ((x) > (y) ? (x) : (y))
-#define H2MIN(x,y) ((x) < (y) ? (x) : (y))
-
-static int frame_print(const nghttp2_frame *frame, char *buffer, size_t maxlen);
 
 static int h2_session_status_from_apr_status(apr_status_t rv)
 {
@@ -58,7 +57,7 @@ static int h2_session_status_from_apr_status(apr_status_t rv)
         return NGHTTP2_ERR_WOULDBLOCK;
     }
     else if (APR_STATUS_IS_EOF(rv)) {
-            return NGHTTP2_ERR_EOF;
+        return NGHTTP2_ERR_EOF;
     }
     return NGHTTP2_ERR_PROTO;
 }
@@ -76,35 +75,97 @@ static apr_status_t h2_session_receive(void *ctx,
                                        const char *data, apr_size_t len,
                                        apr_size_t *readlen);
 
-h2_stream *h2_session_open_stream(h2_session *session, int stream_id)
+static int is_accepting_streams(h2_session *session); 
+static void dispatch_event(h2_session *session, h2_session_event_t ev, 
+                             int err, const char *msg);
+
+apr_status_t h2_session_stream_done(h2_session *session, h2_stream *stream)
 {
-    h2_stream * stream;
-    apr_pool_t *stream_pool;
-    if (session->aborted) {
-        return NULL;
-    }
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+                  "h2_stream(%ld-%d): EOS bucket cleanup -> done", 
+                  session->id, stream->id);
+    h2_ihash_remove(session->streams, stream->id);
+    h2_mplx_stream_done(session->mplx, stream);
     
-    if (session->spare) {
-        stream_pool = session->spare;
-        session->spare = NULL;
+    dispatch_event(session, H2_SESSION_EV_STREAM_DONE, 0, NULL);
+    return APR_SUCCESS;
+}
+
+typedef struct stream_sel_ctx {
+    h2_session *session;
+    h2_stream *candidate;
+} stream_sel_ctx;
+
+static int find_cleanup_stream(void *udata, void *sdata)
+{
+    stream_sel_ctx *ctx = udata;
+    h2_stream *stream = sdata;
+    if (H2_STREAM_CLIENT_INITIATED(stream->id)) {
+        if (!ctx->session->local.accepting
+            && stream->id > ctx->session->local.accepted_max) {
+            ctx->candidate = stream;
+            return 0;
+        }
     }
     else {
-        apr_pool_create(&stream_pool, session->pool);
+        if (!ctx->session->remote.accepting
+            && stream->id > ctx->session->remote.accepted_max) {
+            ctx->candidate = stream;
+            return 0;
+        }
     }
+    return 1;
+}
+
+static void cleanup_streams(h2_session *session)
+{
+    stream_sel_ctx ctx;
+    ctx.session = session;
+    ctx.candidate = NULL;
+    while (1) {
+        h2_ihash_iter(session->streams, find_cleanup_stream, &ctx);
+        if (ctx.candidate) {
+            h2_session_stream_done(session, ctx.candidate);
+            ctx.candidate = NULL;
+        }
+        else {
+            break;
+        }
+    }
+}
+
+h2_stream *h2_session_open_stream(h2_session *session, int stream_id,
+                                  int initiated_on, const h2_request *req)
+{
+    h2_stream * stream;
+    apr_pool_t *stream_pool;
+    
+    apr_pool_create(&stream_pool, session->pool);
+    apr_pool_tag(stream_pool, "h2_stream");
     
-    stream = h2_stream_open(stream_id, stream_pool, session);
+    stream = h2_stream_open(stream_id, stream_pool, session, 
+                            initiated_on, req);
+    nghttp2_session_set_stream_user_data(session->ngh2, stream_id, stream);
+    h2_ihash_add(session->streams, stream);
     
-    h2_stream_set_add(session->streams, stream);
-    if (H2_STREAM_CLIENT_INITIATED(stream_id)
-        && stream_id > session->max_stream_received) {
-        session->max_stream_received = stream->id;
+    if (H2_STREAM_CLIENT_INITIATED(stream_id)) {
+        if (stream_id > session->remote.emitted_max) {
+            ++session->remote.emitted_count;
+            session->remote.emitted_max = stream->id;
+            session->local.accepted_max = stream->id;
+        }
+    }
+    else {
+        if (stream_id > session->local.emitted_max) {
+            ++session->local.emitted_count;
+            session->remote.emitted_max = stream->id;
+        }
     }
+    dispatch_event(session, H2_SESSION_EV_STREAM_OPEN, 0, NULL);
     
     return stream;
 }
 
-#ifdef H2_NG2_STREAM_API
-
 /**
  * Determine the importance of streams when scheduling tasks.
  * - if both stream depend on the same one, compare weights
@@ -158,25 +219,10 @@ static int stream_pri_cmp(int sid1, int sid2, void *ctx)
     return spri_cmp(sid1, s1, sid2, s2, session);
 }
 
-#else /* ifdef H2_NG2_STREAM_API */
-
-/* In absence of nghttp2_stream API, which gives information about
- * priorities since nghttp2 1.3.x, we just sort the streams by
- * their identifier, aka. order of arrival.
- */
-static int stream_pri_cmp(int sid1, int sid2, void *ctx)
-{
-    (void)ctx;
-    return sid1 - sid2;
-}
-
-#endif /* (ifdef else) H2_NG2_STREAM_API */
-
 static apr_status_t stream_schedule(h2_session *session,
                                     h2_stream *stream, int eos)
 {
     (void)session;
-    ++session->requests_received;
     return h2_stream_schedule(stream, eos, h2_session_push_enabled(session), 
                               stream_pri_cmp, session);
 }
@@ -200,7 +246,7 @@ static ssize_t send_cb(nghttp2_session *ngh2,
     if (APR_STATUS_IS_EAGAIN(status)) {
         return NGHTTP2_ERR_WOULDBLOCK;
     }
-    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, APLOGNO(03062)
                   "h2_session: send error");
     return h2_session_status_from_apr_status(status);
 }
@@ -212,20 +258,23 @@ static int on_invalid_frame_recv_cb(nghttp2_session *ngh2,
     h2_session *session = (h2_session *)userp;
     (void)ngh2;
     
-    if (session->aborted) {
-        return NGHTTP2_ERR_CALLBACK_FAILURE;
-    }
-    if (APLOGctrace2(session->c)) {
+    if (APLOGcdebug(session->c)) {
         char buffer[256];
         
-        frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
-                      "h2_session: callback on_invalid_frame_recv error=%d %s",
-                      error, buffer);
+        h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03063)
+                      "h2_session(%ld): recv invalid FRAME[%s], frames=%ld/%ld (r/s)",
+                      session->id, buffer, (long)session->frames_received,
+                     (long)session->frames_sent);
     }
     return 0;
 }
 
+static h2_stream *get_stream(h2_session *session, int stream_id)
+{
+    return nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
+}
+
 static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags,
                                  int32_t stream_id,
                                  const uint8_t *data, size_t len, void *userp)
@@ -236,13 +285,14 @@ static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags,
     int rv;
     
     (void)flags;
-    if (session->aborted) {
-        return NGHTTP2_ERR_CALLBACK_FAILURE;
+    if (!is_accepting_streams(session)) {
+        /* ignore */
+        return 0;
     }
     
-    stream = h2_session_get_stream(session, stream_id);
+    stream = get_stream(session, stream_id);
     if (!stream) {
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03064)
                       "h2_stream(%ld-%d): on_data_chunk for unknown stream",
                       session->id, (int)stream_id);
         rv = nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE, stream_id,
@@ -252,8 +302,11 @@ static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags,
         }
         return 0;
     }
-    
-    status = h2_stream_write_data(stream, (const char *)data, len);
+
+    /* FIXME: enabling setting EOS this way seems to break input handling
+     * in mod_proxy_http2. why? */
+    status = h2_stream_write_data(stream, (const char *)data, len,
+                                  0 /*flags & NGHTTP2_FLAG_END_STREAM*/);
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
                   "h2_stream(%ld-%d): data_chunk_recv, written %ld bytes",
                   session->id, stream_id, (long)len);
@@ -272,25 +325,33 @@ static apr_status_t stream_release(h2_session *session,
                                    h2_stream *stream,
                                    uint32_t error_code) 
 {
+    conn_rec *c = session->c;
+    apr_bucket *b;
+    apr_status_t status;
+    
     if (!error_code) {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
                       "h2_stream(%ld-%d): handled, closing", 
                       session->id, (int)stream->id);
-        if (stream->id > session->max_stream_handled) {
-            session->max_stream_handled = stream->id;
+        if (H2_STREAM_CLIENT_INITIATED(stream->id)) {
+            if (stream->id > session->local.completed_max) {
+                session->local.completed_max = stream->id;
+            }
         }
     }
     else {
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(03065)
                       "h2_stream(%ld-%d): closing with err=%d %s", 
                       session->id, (int)stream->id, (int)error_code,
                       h2_h2_err_description(error_code));
         h2_stream_rst(stream, error_code);
     }
     
-    return h2_conn_io_writeb(&session->io,
-                             h2_bucket_eos_create(session->c->bucket_alloc, 
-                                                  stream));
+    b = h2_bucket_eos_create(c->bucket_alloc, stream);
+    APR_BRIGADE_INSERT_TAIL(session->bbtmp, b);
+    status = h2_conn_io_pass(&session->io, session->bbtmp);
+    apr_brigade_cleanup(session->bbtmp);
+    return status;
 }
 
 static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id,
@@ -300,10 +361,7 @@ static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id,
     h2_stream *stream;
     
     (void)ngh2;
-    if (session->aborted) {
-        return NGHTTP2_ERR_CALLBACK_FAILURE;
-    }
-    stream = h2_session_get_stream(session, stream_id);
+    stream = get_stream(session, stream_id);
     if (stream) {
         stream_release(session, stream, error_code);
     }
@@ -319,14 +377,14 @@ static int on_begin_headers_cb(nghttp2_session *ngh2,
     /* We may see HEADERs at the start of a stream or after all DATA
      * streams to carry trailers. */
     (void)ngh2;
-    s = h2_session_get_stream(session, frame->hd.stream_id);
+    s = get_stream(session, frame->hd.stream_id);
     if (s) {
         /* nop */
     }
     else {
-        s = h2_session_open_stream((h2_session *)userp, frame->hd.stream_id);
+        s = h2_session_open_stream(userp, frame->hd.stream_id, 0, NULL);
     }
-    return s? 0 : NGHTTP2_ERR_CALLBACK_FAILURE;
+    return s? 0 : NGHTTP2_ERR_START_STREAM_NOT_ALLOWED;
 }
 
 static int on_header_cb(nghttp2_session *ngh2, const nghttp2_frame *frame,
@@ -339,25 +397,24 @@ static int on_header_cb(nghttp2_session *ngh2, const nghttp2_frame *frame,
     h2_stream * stream;
     apr_status_t status;
     
-    (void)ngh2;
     (void)flags;
-    if (session->aborted) {
-        return NGHTTP2_ERR_CALLBACK_FAILURE;
+    if (!is_accepting_streams(session)) {
+        /* just ignore */
+        return 0;
     }
     
-    stream = h2_session_get_stream(session, frame->hd.stream_id);
+    stream = get_stream(session, frame->hd.stream_id);
     if (!stream) {
         ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c,
                       APLOGNO(02920) 
-                      "h2_session:  stream(%ld-%d): on_header for unknown stream",
+                      "h2_session:  stream(%ld-%d): on_header unknown stream",
                       session->id, (int)frame->hd.stream_id);
         return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
     }
     
     status = h2_stream_add_header(stream, (const char *)name, namelen,
                                   (const char *)value, valuelen);
-                                    
-    if (status != APR_SUCCESS) {
+    if (status != APR_SUCCESS && !stream->response) {
         return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
     }
     return 0;
@@ -376,15 +433,11 @@ static int on_frame_recv_cb(nghttp2_session *ng2s,
     apr_status_t status = APR_SUCCESS;
     h2_stream *stream;
     
-    if (session->aborted) {
-        return NGHTTP2_ERR_CALLBACK_FAILURE;
-    }
-    
     if (APLOGcdebug(session->c)) {
         char buffer[256];
         
-        frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+        h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03066)
                       "h2_session(%ld): recv FRAME[%s], frames=%ld/%ld (r/s)",
                       session->id, buffer, (long)session->frames_received,
                      (long)session->frames_sent);
@@ -396,7 +449,7 @@ static int on_frame_recv_cb(nghttp2_session *ng2s,
             /* This can be HEADERS for a new stream, defining the request,
              * or HEADER may come after DATA at the end of a stream as in
              * trailers */
-            stream = h2_session_get_stream(session, frame->hd.stream_id);
+            stream = get_stream(session, frame->hd.stream_id);
             if (stream) {
                 int eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM);
                 
@@ -420,7 +473,7 @@ static int on_frame_recv_cb(nghttp2_session *ng2s,
             }
             break;
         case NGHTTP2_DATA:
-            stream = h2_session_get_stream(session, frame->hd.stream_id);
+            stream = get_stream(session, frame->hd.stream_id);
             if (stream) {
                 int eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM);
                 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
@@ -453,24 +506,29 @@ static int on_frame_recv_cb(nghttp2_session *ng2s,
                           frame->window_update.window_size_increment);
             break;
         case NGHTTP2_RST_STREAM:
-            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03067)
                           "h2_session(%ld-%d): RST_STREAM by client, errror=%d",
                           session->id, (int)frame->hd.stream_id,
                           (int)frame->rst_stream.error_code);
-            ++session->streams_reset;
+            stream = get_stream(session, frame->hd.stream_id);
+            if (stream && stream->request && stream->request->initiated_on) {
+                ++session->pushes_reset;
+            }
+            else {
+                ++session->streams_reset;
+            }
             break;
         case NGHTTP2_GOAWAY:
-            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
-                          "h2_session(%ld): GOAWAY errror=%d",
-                          session->id, (int)frame->goaway.error_code);
-            session->client_goaway = 1;
+            session->remote.accepted_max = frame->goaway.last_stream_id;
+            session->remote.error = frame->goaway.error_code;
+            dispatch_event(session, H2_SESSION_EV_REMOTE_GOAWAY, 0, NULL);
             break;
         default:
             if (APLOGctrace2(session->c)) {
                 char buffer[256];
                 
-                frame_print(frame, buffer,
-                            sizeof(buffer)/sizeof(buffer[0]));
+                h2_util_frame_print(frame, buffer,
+                                    sizeof(buffer)/sizeof(buffer[0]));
                 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
                               "h2_session: on_frame_rcv %s", buffer);
             }
@@ -495,13 +553,6 @@ static int on_frame_recv_cb(nghttp2_session *ng2s,
     return 0;
 }
 
-static apr_status_t pass_data(void *ctx, 
-                              const char *data, apr_off_t length)
-{
-    return h2_conn_io_write(&((h2_session*)ctx)->io, data, length);
-}
-
-
 static char immortal_zeros[H2_MAX_PADLEN];
 
 static int on_send_data_cb(nghttp2_session *ngh2, 
@@ -517,83 +568,68 @@ static int on_send_data_cb(nghttp2_session *ngh2,
     unsigned char padlen;
     int eos;
     h2_stream *stream;
+    apr_bucket *b;
+    apr_off_t len = length;
     
     (void)ngh2;
     (void)source;
-    if (session->aborted) {
-        return NGHTTP2_ERR_CALLBACK_FAILURE;
-    }
-    
     if (frame->data.padlen > H2_MAX_PADLEN) {
         return NGHTTP2_ERR_PROTO;
     }
     padlen = (unsigned char)frame->data.padlen;
     
-    stream = h2_session_get_stream(session, stream_id);
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+                  "h2_stream(%ld-%d): send_data_cb for %ld bytes",
+                  session->id, (int)stream_id, (long)length);
+                  
+    stream = get_stream(session, stream_id);
     if (!stream) {
         ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_NOTFOUND, session->c,
                       APLOGNO(02924) 
-                      "h2_stream(%ld-%d): send_data",
+                      "h2_stream(%ld-%d): send_data, lookup stream",
                       session->id, (int)stream_id);
         return NGHTTP2_ERR_CALLBACK_FAILURE;
     }
     
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
-                  "h2_stream(%ld-%d): send_data_cb for %ld bytes",
-                  session->id, (int)stream_id, (long)length);
-                  
-    if (h2_conn_io_is_buffered(&session->io)) {
-        status = h2_conn_io_write(&session->io, (const char *)framehd, 9);
-        if (status == APR_SUCCESS) {
-            if (padlen) {
-                status = h2_conn_io_write(&session->io, (const char *)&padlen, 1);
-            }
-            
-            if (status == APR_SUCCESS) {
-                apr_off_t len = length;
-                status = h2_stream_readx(stream, pass_data, session, &len, &eos);
-                if (status == APR_SUCCESS && len != length) {
-                    status = APR_EINVAL;
-                }
-            }
-            
-            if (status == APR_SUCCESS && padlen) {
-                if (padlen) {
-                    status = h2_conn_io_write(&session->io, immortal_zeros, padlen);
-                }
-            }
-        }
+    status = h2_conn_io_write(&session->io, (const char *)framehd, 9);
+    if (padlen && status == APR_SUCCESS) {
+        status = h2_conn_io_write(&session->io, (const char *)&padlen, 1);
     }
-    else {
-        apr_bucket *b;
-        char *header = apr_pcalloc(stream->pool, 10);
-        memcpy(header, (const char *)framehd, 9);
-        if (padlen) {
-            header[9] = (char)padlen;
-        }
-        b = apr_bucket_pool_create(header, padlen? 10 : 9, 
-                                   stream->pool, session->c->bucket_alloc);
-        status = h2_conn_io_writeb(&session->io, b);
-        
-        if (status == APR_SUCCESS) {
-            apr_off_t len = length;
-            status = h2_stream_read_to(stream, session->io.output, &len, &eos);
-            if (status == APR_SUCCESS && len != length) {
-                status = APR_EINVAL;
-            }
-        }
-            
-        if (status == APR_SUCCESS && padlen) {
-            b = apr_bucket_immortal_create(immortal_zeros, padlen, 
-                                           session->c->bucket_alloc);
-            status = h2_conn_io_writeb(&session->io, b);
-        }
+    
+    if (status != APR_SUCCESS) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
+                      "h2_stream(%ld-%d): writing frame header",
+                      session->id, (int)stream_id);
+        return NGHTTP2_ERR_CALLBACK_FAILURE;
     }
     
+    status = h2_stream_read_to(stream, session->bbtmp, &len, &eos);
+    if (status != APR_SUCCESS) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
+                      "h2_stream(%ld-%d): send_data_cb, reading stream",
+                      session->id, (int)stream_id);
+        return NGHTTP2_ERR_CALLBACK_FAILURE;
+    }
+    else if (len != length) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
+                      "h2_stream(%ld-%d): send_data_cb, wanted %ld bytes, "
+                      "got %ld from stream",
+                      session->id, (int)stream_id, (long)length, (long)len);
+        return NGHTTP2_ERR_CALLBACK_FAILURE;
+    }
+    
+    if (padlen) {
+        b = apr_bucket_immortal_create(immortal_zeros, padlen, 
+                                       session->c->bucket_alloc);
+        APR_BRIGADE_INSERT_TAIL(session->bbtmp, b);
+    }
     
+    status = h2_conn_io_pass(&session->io, session->bbtmp);
+        
+    apr_brigade_cleanup(session->bbtmp);
     if (status == APR_SUCCESS) {
-        stream->data_frames_sent++;
-        h2_conn_io_consider_flush(&session->io);
+        stream->out_data_frames++;
+        stream->out_data_octets += length;
         return 0;
     }
     else {
@@ -601,9 +637,8 @@ static int on_send_data_cb(nghttp2_session *ngh2,
                       APLOGNO(02925) 
                       "h2_stream(%ld-%d): failed send_data_cb",
                       session->id, (int)stream_id);
+        return NGHTTP2_ERR_CALLBACK_FAILURE;
     }
-    
-    return h2_session_status_from_apr_status(status);
 }
 
 static int on_frame_send_cb(nghttp2_session *ngh2, 
@@ -614,8 +649,8 @@ static int on_frame_send_cb(nghttp2_session *ngh2,
     if (APLOGcdebug(session->c)) {
         char buffer[256];
         
-        frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+        h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03068)
                       "h2_session(%ld): sent FRAME[%s], frames=%ld/%ld (r/s)",
                       session->id, buffer, (long)session->frames_received,
                      (long)session->frames_sent);
@@ -650,51 +685,79 @@ static apr_status_t init_callbacks(conn_rec *c, nghttp2_session_callbacks **pcb)
     return APR_SUCCESS;
 }
 
-static void h2_session_cleanup(h2_session *session)
+static void h2_session_destroy(h2_session *session)
 {
-    AP_DEBUG_ASSERT(session);
-    /* This is an early cleanup of the session that may
-     * discard what is no longer necessary for *new* streams
-     * and general HTTP/2 processing.
-     * At this point, all frames are in transit or somehwere in
-     * our buffers or passed down output filters.
-     * h2 streams might still being written out.
-     */
-    if (session->c) {
-        h2_ctx_clear(session->c);
+    AP_DEBUG_ASSERT(session);    
+
+    h2_ihash_clear(session->streams);
+    if (session->mplx) {
+        h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
+        h2_mplx_release_and_join(session->mplx, session->iowait);
+        session->mplx = NULL;
     }
+
+    ap_remove_input_filter_byhandle((session->r? session->r->input_filters :
+                                     session->c->input_filters), "H2_IN");
     if (session->ngh2) {
         nghttp2_session_del(session->ngh2);
         session->ngh2 = NULL;
     }
-    if (session->spare) {
-        apr_pool_destroy(session->spare);
-        session->spare = NULL;
+    if (session->c) {
+        h2_ctx_clear(session->c);
+    }
+
+    if (APLOGctrace1(session->c)) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+                      "h2_session(%ld): destroy", session->id);
+    }
+    if (session->pool) {
+        apr_pool_destroy(session->pool);
     }
 }
 
-static void h2_session_destroy(h2_session *session)
+static apr_status_t h2_session_shutdown(h2_session *session, int error, 
+                                        const char *msg, int force_close)
 {
+    apr_status_t status = APR_SUCCESS;
+    
     AP_DEBUG_ASSERT(session);
-    h2_session_cleanup(session);
-
-    if (APLOGctrace1(session->c)) {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
-                      "h2_session(%ld): destroy, %d streams open",
-                      session->id, (int)h2_stream_set_size(session->streams));
+    if (!msg && error) {
+        msg = nghttp2_strerror(error);
     }
-    if (session->mplx) {
-        h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
-        h2_mplx_release_and_join(session->mplx, session->iowait);
-        session->mplx = NULL;
+    
+    if (error || force_close) {
+        /* not a graceful shutdown, we want to leave... 
+         * Do not start further streams that are waiting to be scheduled. 
+         * Find out the max stream id that we habe been processed or
+         * are still actively working on.
+         * Remove all streams greater than this number without submitting
+         * a RST_STREAM frame, since that should be clear from the GOAWAY
+         * we send. */
+        session->local.accepted_max = h2_mplx_shutdown(session->mplx);
+        session->local.error = error;
     }
-    if (session->streams) {
-        h2_stream_set_destroy(session->streams);
-        session->streams = NULL;
+    else {
+        /* graceful shutdown. we will continue processing all streams
+         * we have, but no longer accept new ones. Report the max stream
+         * we have received and discard all new ones. */
+    }
+    nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, 
+                          session->local.accepted_max, 
+                          error, (uint8_t*)msg, msg? strlen(msg):0);
+    status = nghttp2_session_send(session->ngh2);
+    if (status == APR_SUCCESS) {
+        status = h2_conn_io_flush(&session->io);
     }
-    if (session->pool) {
-        apr_pool_destroy(session->pool);
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03069)
+                  "session(%ld): sent GOAWAY, err=%d, msg=%s", 
+                  session->id, error, msg? msg : "");
+    dispatch_event(session, H2_SESSION_EV_LOCAL_GOAWAY, error, msg);
+    
+    if (force_close) {
+        h2_mplx_abort(session->mplx);
     }
+    
+    return status;
 }
 
 static apr_status_t session_pool_cleanup(void *data)
@@ -709,6 +772,22 @@ static apr_status_t session_pool_cleanup(void *data)
      */
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
                   "session(%ld): pool_cleanup", session->id);
+    
+    if (session->state != H2_SESSION_ST_DONE 
+        && session->state != H2_SESSION_ST_LOCAL_SHUTDOWN) {
+        /* Not good. The connection is being torn down and we have
+         * not sent a goaway. This is considered a protocol error and
+         * the client has to assume that any streams "in flight" may have
+         * been processed and are not safe to retry.
+         * As clients with idle connection may only learn about a closed
+         * connection when sending the next request, this has the effect
+         * that at least this one request will fail.
+         */
+        ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, session->c, APLOGNO(03199)
+                      "session(%ld): connection disappeared without proper "
+                      "goodbye, clients will be confused, should not happen", 
+                      session->id);
+    }
     /* keep us from destroying the pool, since that is already ongoing. */
     session->pool = NULL;
     h2_session_destroy(session);
@@ -760,6 +839,7 @@ static h2_session *h2_session_create_int(conn_rec *c,
 {
     nghttp2_session_callbacks *callbacks = NULL;
     nghttp2_option *options = NULL;
+    uint32_t n;
 
     apr_pool_t *pool = NULL;
     apr_status_t status = apr_pool_create(&pool, c->pool);
@@ -767,6 +847,7 @@ static h2_session *h2_session_create_int(conn_rec *c,
     if (status != APR_SUCCESS) {
         return NULL;
     }
+    apr_pool_tag(pool, "h2_session");
 
     session = apr_pcalloc(pool, sizeof(h2_session));
     if (session) {
@@ -777,41 +858,39 @@ static h2_session *h2_session_create_int(conn_rec *c,
         session->c = c;
         session->r = r;
         session->s = h2_ctx_server_get(ctx);
+        session->pool = pool;
         session->config = h2_config_sget(session->s);
+        session->workers = workers;
         
         session->state = H2_SESSION_ST_INIT;
+        session->local.accepting = 1;
+        session->remote.accepting = 1;
         
-        session->pool = pool;
         apr_pool_pre_cleanup_register(pool, session, session_pool_cleanup);
         
-        session->max_stream_count = h2_config_geti(session->config, H2_CONF_MAX_STREAMS);
-        session->max_stream_mem = h2_config_geti(session->config, H2_CONF_STREAM_MAX_MEM);
-        session->timeout_secs = h2_config_geti(session->config, H2_CONF_TIMEOUT_SECS);
-        if (session->timeout_secs <= 0) {
-            session->timeout_secs = apr_time_sec(session->s->timeout);
-        }
-        session->keepalive_secs = h2_config_geti(session->config, H2_CONF_KEEPALIVE_SECS);
-        if (session->keepalive_secs <= 0) {
-            session->keepalive_secs = apr_time_sec(session->s->keep_alive_timeout);
-        }
-        
+        session->max_stream_count = h2_config_geti(session->config, 
+                                                   H2_CONF_MAX_STREAMS);
+        session->max_stream_mem = h2_config_geti(session->config, 
+                                                 H2_CONF_STREAM_MAX_MEM);
+
         status = apr_thread_cond_create(&session->iowait, session->pool);
         if (status != APR_SUCCESS) {
             return NULL;
         }
         
-        session->streams = h2_stream_set_create(session->pool, session->max_stream_count);
-        
-        session->workers = workers;
-        session->mplx = h2_mplx_create(c, session->pool, session->config, workers);
+        session->streams = h2_ihash_create(session->pool,
+                                           offsetof(h2_stream, id));
+        session->mplx = h2_mplx_create(c, session->pool, session->config, 
+                                       session->s->timeout, workers);
         
         h2_mplx_set_consumed_cb(session->mplx, update_window, session);
         
         /* Install the connection input filter that feeds the session */
-        session->cin = h2_filter_cin_create(session->pool, h2_session_receive, session);
+        session->cin = h2_filter_cin_create(session->pool, 
+                                            h2_session_receive, session);
         ap_add_input_filter("H2_IN", session->cin, r, c);
 
-        h2_conn_io_init(&session->io, c, session->config, session->pool);
+        h2_conn_io_init(&session->io, c, session->config);
         session->bbtmp = apr_brigade_create(session->pool, c->bucket_alloc);
         
         status = init_callbacks(c, &callbacks);
@@ -830,8 +909,8 @@ static h2_session *h2_session_create_int(conn_rec *c,
             h2_session_destroy(session);
             return NULL;
         }
-        nghttp2_option_set_peer_max_concurrent_streams(options, 
-                                                       (uint32_t)session->max_stream_count);
+        nghttp2_option_set_peer_max_concurrent_streams(
+            options, (uint32_t)session->max_stream_count);
         /* We need to handle window updates ourself, otherwise we
          * get flooded by nghttp2. */
         nghttp2_option_set_no_auto_window_update(options, 1);
@@ -861,13 +940,21 @@ static h2_session *h2_session_create_int(conn_rec *c,
             h2_session_destroy(session);
             return NULL;
         }
-            
+         
+        n = h2_config_geti(session->config, H2_CONF_PUSH_DIARY_SIZE);
+        session->push_diary = h2_push_diary_create(session->pool, n);
+        
         if (APLOGcdebug(c)) {
-            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c,
-                          "session(%ld) created, timeout=%d, keepalive_timeout=%d, "
-                          "max_streams=%d, stream_mem=%d",
-                          session->id, session->timeout_secs, session->keepalive_secs,
-                          (int)session->max_stream_count, (int)session->max_stream_mem);
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(03200)
+                          "h2_session(%ld) created, max_streams=%d, "
+                          "stream_mem=%d, workers_limit=%d, workers_max=%d, "
+                          "push_diary(type=%d,N=%d)",
+                          session->id, (int)session->max_stream_count, 
+                          (int)session->max_stream_mem,
+                          session->mplx->workers_limit, 
+                          session->mplx->workers_max, 
+                          session->push_diary->dtype, 
+                          (int)session->push_diary->N);
         }
     }
     return session;
@@ -891,54 +978,6 @@ void h2_session_eoc_callback(h2_session *session)
     h2_session_destroy(session);
 }
 
-static apr_status_t h2_session_shutdown(h2_session *session, int reason)
-{
-    AP_DEBUG_ASSERT(session);
-    session->aborted = 1;
-    if (session->state != H2_SESSION_ST_CLOSING
-        && session->state != H2_SESSION_ST_ABORTED) {
-        if (session->client_goaway) {
-            /* client sent us a GOAWAY, just terminate */
-            nghttp2_session_terminate_session(session->ngh2, NGHTTP2_ERR_EOF);
-            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
-                          "session(%ld): shutdown, GOAWAY from client", session->id);
-        }
-        else if (!reason) {
-            nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, 
-                                  session->max_stream_received, 
-                                  reason, NULL, 0);
-            nghttp2_session_send(session->ngh2);
-            session->server_goaway = 1;
-            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
-                          "session(%ld): shutdown, no err", session->id);
-        }
-        else {
-            const char *err = nghttp2_strerror(reason);
-            nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, 
-                                  session->max_stream_received, 
-                                  reason, (const uint8_t *)err, 
-                                  strlen(err));
-            nghttp2_session_send(session->ngh2);
-            session->server_goaway = 1;
-            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
-                          "session(%ld): shutdown, err=%d '%s'",
-                          session->id, reason, err);
-        }
-        session->state = H2_SESSION_ST_CLOSING;
-        h2_mplx_abort(session->mplx);
-    }
-    return APR_SUCCESS;
-}
-
-void h2_session_abort(h2_session *session, apr_status_t status)
-{
-    AP_DEBUG_ASSERT(session);
-    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
-                  "h2_session(%ld): aborting", session->id);
-    session->state = H2_SESSION_ST_ABORTED;
-    session->aborted = 1;
-}
-
 static apr_status_t h2_session_start(h2_session *session, int *rv)
 {
     apr_status_t status = APR_SUCCESS;
@@ -969,7 +1008,7 @@ static apr_status_t h2_session_start(h2_session *session, int *rv)
         if (APLOGrdebug(session->r)) {
             char buffer[128];
             h2_util_hex_dump(buffer, 128, (char*)cs, dlen);
-            ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, session->r,
+            ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, session->r, APLOGNO(03070)
                           "upgrading h2c session with HTTP2-Settings: %s -> %s (%d)",
                           s, buffer, (int)dlen);
         }
@@ -984,7 +1023,7 @@ static apr_status_t h2_session_start(h2_session *session, int *rv)
         }
         
         /* Now we need to auto-open stream 1 for the request we got. */
-        stream = h2_session_open_stream(session, 1);
+        stream = h2_session_open_stream(session, 1, 0, NULL);
         if (!stream) {
             status = APR_EGENERAL;
             ap_log_rerror(APLOG_MARK, APLOG_ERR, status, session->r,
@@ -1014,6 +1053,10 @@ static apr_status_t h2_session_start(h2_session *session, int *rv)
         ++slen;
     }
     
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, APLOGNO(03201)
+                  "h2_session(%ld): start, INITIAL_WINDOW_SIZE=%ld, "
+                  "MAX_CONCURRENT_STREAMS=%d", 
+                  session->id, (long)win_size, (int)session->max_stream_count);
     *rv = nghttp2_submit_settings(session->ngh2, NGHTTP2_FLAG_NONE,
                                   settings, slen);
     if (*rv != 0) {
@@ -1041,85 +1084,8 @@ static apr_status_t h2_session_start(h2_session *session, int *rv)
                           nghttp2_strerror(*rv));        
         }
     }
-    return status;
-}
-
-typedef struct {
-    h2_session *session;
-    int resume_count;
-} resume_ctx;
-
-static int resume_on_data(void *ctx, h2_stream *stream)
-{
-    resume_ctx *rctx = (resume_ctx*)ctx;
-    h2_session *session = rctx->session;
-    AP_DEBUG_ASSERT(session);
-    AP_DEBUG_ASSERT(stream);
-    
-    if (h2_stream_is_suspended(stream)) {
-        if (h2_mplx_out_has_data_for(stream->session->mplx, stream->id)) {
-            int rv;
-            h2_stream_set_suspended(stream, 0);
-            ++rctx->resume_count;
-            
-            rv = nghttp2_session_resume_data(session->ngh2, stream->id);
-            ap_log_cerror(APLOG_MARK, nghttp2_is_fatal(rv)?
-                          APLOG_ERR : APLOG_DEBUG, 0, session->c,
-                          APLOGNO(02936) 
-                          "h2_stream(%ld-%d): resuming %s",
-                          session->id, stream->id, rv? nghttp2_strerror(rv) : "");
-        }
-    }
-    return 1;
-}
-
-static int h2_session_resume_streams_with_data(h2_session *session)
-{
-    AP_DEBUG_ASSERT(session);
-    if (!h2_stream_set_is_empty(session->streams)
-        && session->mplx && !session->aborted) {
-        resume_ctx ctx;
-        
-        ctx.session      = session;
-        ctx.resume_count = 0;
-
-        /* Resume all streams where we have data in the out queue and
-         * which had been suspended before. */
-        h2_stream_set_iter(session->streams, resume_on_data, &ctx);
-        return ctx.resume_count;
-    }
-    return 0;
-}
-
-h2_stream *h2_session_get_stream(h2_session *session, int stream_id)
-{
-    if (!session->last_stream || stream_id != session->last_stream->id) {
-        session->last_stream = h2_stream_set_get(session->streams, stream_id);
-    }
-    return session->last_stream;
-}
-
-void h2_session_close(h2_session *session)
-{
-    apr_bucket *b;
-    conn_rec *c = session->c;
-    apr_status_t status;
     
-    AP_DEBUG_ASSERT(session);
-    if (!session->aborted) {
-        h2_session_shutdown(session, 0);
-    }
-    h2_session_cleanup(session);
-
-    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c,
-                  "h2_session(%ld): writing eoc", c->id);
-    b = h2_bucket_eoc_create(c->bucket_alloc, session);
-    status = h2_conn_io_write_eoc(&session->io, b);
-    if (status != APR_SUCCESS) {
-        ap_log_cerror(APLOG_MARK, APLOG_ERR, status, c,
-                      "h2_session(%ld): flushed eoc bucket", c->id);
-    } 
-    /* and all is or will be destroyed */
+    return status;
 }
 
 static ssize_t stream_data_cb(nghttp2_session *ng2s,
@@ -1147,7 +1113,7 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
     (void)ng2s;
     (void)buf;
     (void)source;
-    stream = h2_session_get_stream(session, stream_id);
+    stream = get_stream(session, stream_id);
     if (!stream) {
         ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c,
                       APLOGNO(02937) 
@@ -1158,7 +1124,7 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
     
     AP_DEBUG_ASSERT(!h2_stream_is_suspended(stream));
     
-    status = h2_stream_prep_read(stream, &nread, &eos);
+    status = h2_stream_out_prepare(stream, &nread, &eos);
     if (nread) {
         *data_flags |=  NGHTTP2_DATA_FLAG_NO_COPY;
     }
@@ -1177,17 +1143,12 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
              * it. Remember at our h2_stream that we need to do this.
              */
             nread = 0;
-            h2_stream_set_suspended(stream, 1);
-            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+            h2_mplx_suspend_stream(session->mplx, stream->id);
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03071)
                           "h2_stream(%ld-%d): suspending",
                           session->id, (int)stream_id);
             return NGHTTP2_ERR_DEFERRED;
             
-        case APR_EOF:
-            nread = 0;
-            eos = 1;
-            break;
-            
         default:
             nread = 0;
             ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c,
@@ -1203,7 +1164,7 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
             int rv;
             
             nh = h2_util_ngheader_make(stream->pool, trailers);
-            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03072)
                           "h2_stream(%ld-%d): submit %d trailers",
                           session->id, (int)stream_id,(int) nh->nvlen);
             rv = nghttp2_submit_trailer(ng2s, stream->id, nh->nv, nh->nvlen);
@@ -1225,134 +1186,43 @@ typedef struct {
     size_t offset;
 } nvctx_t;
 
-/**
- * Start submitting the response to a stream request. This is possible
- * once we have all the response headers. The response body will be
- * read by the session using the callback we supply.
- */
-static apr_status_t submit_response(h2_session *session, h2_stream *stream)
+struct h2_stream *h2_session_push(h2_session *session, h2_stream *is,
+                                  h2_push *push)
 {
-    apr_status_t status = APR_SUCCESS;
-    int rv = 0;
-    AP_DEBUG_ASSERT(session);
-    AP_DEBUG_ASSERT(stream);
-    AP_DEBUG_ASSERT(stream->response || stream->rst_error);
+    apr_status_t status;
+    h2_stream *stream;
+    h2_ngheader *ngh;
+    int nid;
     
-    if (stream->submitted) {
-        rv = NGHTTP2_PROTOCOL_ERROR;
-    }
-    else if (stream->response && stream->response->headers) {
-        nghttp2_data_provider provider;
-        h2_response *response = stream->response;
-        h2_ngheader *ngh;
-        const h2_priority *prio;
-        
-        memset(&provider, 0, sizeof(provider));
-        provider.source.fd = stream->id;
-        provider.read_callback = stream_data_cb;
-        
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
-                      "h2_stream(%ld-%d): submit response %d",
-                      session->id, stream->id, response->http_status);
-        
-        /* If this stream is not a pushed one itself,
-         * and HTTP/2 server push is enabled here,
-         * and the response is in the range 200-299 *),
-         * and the remote side has pushing enabled,
-         * -> find and perform any pushes on this stream
-         *    *before* we submit the stream response itself.
-         *    This helps clients avoid opening new streams on Link
-         *    headers that get pushed right afterwards.
-         * 
-         * *) the response code is relevant, as we do not want to 
-         *    make pushes on 401 or 403 codes, neiterh on 301/302
-         *    and friends. And if we see a 304, we do not push either
-         *    as the client, having this resource in its cache, might
-         *    also have the pushed ones as well.
-         */
-        if (!stream->initiated_on
-            && H2_HTTP_2XX(response->http_status)
-            && h2_session_push_enabled(session)) {
-            
-            h2_stream_submit_pushes(stream);
-        }
-        
-        prio = h2_stream_get_priority(stream);
-        if (prio) {
-            h2_session_set_prio(session, stream, prio);
-            /* no showstopper if that fails for some reason */
-        }
-        
-        ngh = h2_util_ngheader_make_res(stream->pool, response->http_status, 
-                                        response->headers);
-        rv = nghttp2_submit_response(session->ngh2, response->stream_id,
-                                     ngh->nv, ngh->nvlen, &provider);
-        ++session->responses_sent;
-    }
-    else {
-        int err = H2_STREAM_RST(stream, H2_ERR_PROTOCOL_ERROR);
-        
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
-                      "h2_stream(%ld-%d): RST_STREAM, err=%d",
-                      session->id, stream->id, err);
-
-        rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
-                                       stream->id, err);
-        ++session->responses_sent;
+    ngh = h2_util_ngheader_make_req(is->pool, push->req);
+    nid = nghttp2_submit_push_promise(session->ngh2, 0, is->id, 
+                                      ngh->nv, ngh->nvlen, NULL);
+    if (nid <= 0) {
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03075)
+                      "h2_stream(%ld-%d): submitting push promise fail: %s",
+                      session->id, is->id, nghttp2_strerror(nid));
+        return NULL;
     }
+    ++session->pushes_promised;
     
-    stream->submitted = 1;
-
-    if (nghttp2_is_fatal(rv)) {
-        status = APR_EGENERAL;
-        h2_session_shutdown(session, rv);
-        ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c,
-                      APLOGNO(02940) "submit_response: %s", 
-                      nghttp2_strerror(rv));
-    }
-    
-    return status;
-}
-
-struct h2_stream *h2_session_push(h2_session *session, h2_stream *is,
-                                  h2_push *push)
-{
-    apr_status_t status;
-    h2_stream *stream;
-    h2_ngheader *ngh;
-    int nid;
-    
-    ngh = h2_util_ngheader_make_req(is->pool, push->req);
-    nid = nghttp2_submit_push_promise(session->ngh2, 0, is->id, 
-                                      ngh->nv, ngh->nvlen, NULL);
-    if (nid <= 0) {
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
-                      "h2_stream(%ld-%d): submitting push promise fail: %s",
-                      session->id, is->id, nghttp2_strerror(nid));
-        return NULL;
-    }
-    ++session->streams_pushed;
-    
-    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03076)
                   "h2_stream(%ld-%d): SERVER_PUSH %d for %s %s on %d",
                   session->id, is->id, nid,
                   push->req->method, push->req->path, is->id);
                   
-    stream = h2_session_open_stream(session, nid);
+    stream = h2_session_open_stream(session, nid, is->id, push->req);
     if (stream) {
-        h2_stream_set_h2_request(stream, is->id, push->req);
         status = stream_schedule(session, stream, 1);
         if (status != APR_SUCCESS) {
             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
                           "h2_stream(%ld-%d): scheduling push stream",
                           session->id, stream->id);
-            h2_stream_cleanup(stream);
             stream = NULL;
         }
         ++session->unsent_promises;
     }
     else {
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03077)
                       "h2_stream(%ld-%d): failed to create stream obj %d",
                       session->id, is->id, nid);
     }
@@ -1431,7 +1301,7 @@ apr_status_t h2_session_set_prio(h2_session *session, h2_stream *stream,
                 id_grandpa = nghttp2_stream_get_stream_id(s_grandpa);
                 rv = nghttp2_session_change_stream_priority(session->ngh2, id_parent, &ps);
                 if (rv < 0) {
-                    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+                    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03202)
                                   "h2_stream(%ld-%d): PUSH BEFORE, weight=%d, "
                                   "depends=%d, returned=%d",
                                   session->id, id_parent, ps.weight, ps.stream_id, rv);
@@ -1453,7 +1323,7 @@ apr_status_t h2_session_set_prio(h2_session *session, h2_stream *stream,
 
 
         rv = nghttp2_session_change_stream_priority(session->ngh2, stream->id, &ps);
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03203)
                       "h2_stream(%ld-%d): PUSH %s, weight=%d, "
                       "depends=%d, returned=%d",
                       session->id, stream->id, ptype, 
@@ -1469,168 +1339,221 @@ apr_status_t h2_session_set_prio(h2_session *session, h2_stream *stream,
     return status;
 }
 
-apr_status_t h2_session_stream_destroy(h2_session *session, h2_stream *stream)
+int h2_session_push_enabled(h2_session *session)
 {
-    apr_pool_t *pool = h2_stream_detach_pool(stream);
+    /* iff we can and they can and want */
+    return (session->remote.accepting /* remote GOAWAY received */
+            && h2_config_geti(session->config, H2_CONF_PUSH)
+            && nghttp2_session_get_remote_settings(session->ngh2, 
+                   NGHTTP2_SETTINGS_ENABLE_PUSH));
+}
 
-    /* this may be called while the session has already freed
-     * some internal structures. */
-    if (session->mplx) {
-        h2_mplx_stream_done(session->mplx, stream->id, stream->rst_error);
-        if (session->last_stream == stream) {
-            session->last_stream = NULL;
-        }
-    }
+static apr_status_t h2_session_send(h2_session *session)
+{
+    apr_interval_time_t saved_timeout;
+    int rv;
+    apr_socket_t *socket;
     
-    if (session->streams) {
-        h2_stream_set_remove(session->streams, stream->id);
+    socket = ap_get_conn_socket(session->c);
+    if (socket) {
+        apr_socket_timeout_get(socket, &saved_timeout);
+        apr_socket_timeout_set(socket, session->s->timeout);
     }
-    h2_stream_destroy(stream);
     
-    if (pool) {
-        apr_pool_clear(pool);
-        if (session->spare) {
-            apr_pool_destroy(session->spare);
+    rv = nghttp2_session_send(session->ngh2);
+    
+    if (socket) {
+        apr_socket_timeout_set(socket, saved_timeout);
+    }
+    session->have_written = 1;
+    if (rv != 0) {
+        if (nghttp2_is_fatal(rv)) {
+            dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, rv, nghttp2_strerror(rv));
+            return APR_EGENERAL;
         }
-        session->spare = pool;
     }
+    
+    session->unsent_promises = 0;
+    session->unsent_submits = 0;
+    
     return APR_SUCCESS;
 }
 
-static int frame_print(const nghttp2_frame *frame, char *buffer, size_t maxlen)
+/**
+ * A stream was resumed as new output data arrived.
+ */
+static apr_status_t on_stream_resume(void *ctx, int stream_id)
+{
+    h2_session *session = ctx;
+    h2_stream *stream = get_stream(session, stream_id);
+    apr_status_t status = APR_SUCCESS;
+    
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, 
+                  "h2_stream(%ld-%d): on_resume", session->id, stream_id);
+    if (stream) {
+        int rv = nghttp2_session_resume_data(session->ngh2, stream_id);
+        session->have_written = 1;
+        ap_log_cerror(APLOG_MARK, nghttp2_is_fatal(rv)?
+                      APLOG_ERR : APLOG_DEBUG, 0, session->c,
+                      APLOGNO(02936) 
+                      "h2_stream(%ld-%d): resuming %s",
+                      session->id, stream->id, rv? nghttp2_strerror(rv) : "");
+    }
+    return status;
+}
+
+/**
+ * A response for the stream is ready.
+ */
+static apr_status_t on_stream_response(void *ctx, int stream_id)
 {
-    char scratch[128];
-    size_t s_len = sizeof(scratch)/sizeof(scratch[0]);
+    h2_session *session = ctx;
+    h2_stream *stream = get_stream(session, stream_id);
+    apr_status_t status = APR_SUCCESS;
+    h2_response *response;
+    int rv = 0;
+
+    AP_DEBUG_ASSERT(session);
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, 
+                  "h2_stream(%ld-%d): on_response", session->id, stream_id);
+    if (!stream) {
+        return APR_NOTFOUND;
+    }
+    else if (!stream->response) {
+        int err = H2_STREAM_RST(stream, H2_ERR_PROTOCOL_ERROR);
+        
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03074)
+                      "h2_stream(%ld-%d): RST_STREAM, err=%d",
+                      session->id, stream->id, err);
+
+        rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
+                                       stream->id, err);
+        goto leave;
+    }
     
-    switch (frame->hd.type) {
-        case NGHTTP2_DATA: {
-            return apr_snprintf(buffer, maxlen,
-                                "DATA[length=%d, flags=%d, stream=%d, padlen=%d]",
-                                (int)frame->hd.length, frame->hd.flags,
-                                frame->hd.stream_id, (int)frame->data.padlen);
-        }
-        case NGHTTP2_HEADERS: {
-            return apr_snprintf(buffer, maxlen,
-                                "HEADERS[length=%d, hend=%d, stream=%d, eos=%d]",
-                                (int)frame->hd.length,
-                                !!(frame->hd.flags & NGHTTP2_FLAG_END_HEADERS),
-                                frame->hd.stream_id,
-                                !!(frame->hd.flags & NGHTTP2_FLAG_END_STREAM));
-        }
-        case NGHTTP2_PRIORITY: {
-            return apr_snprintf(buffer, maxlen,
-                                "PRIORITY[length=%d, flags=%d, stream=%d]",
-                                (int)frame->hd.length,
-                                frame->hd.flags, frame->hd.stream_id);
-        }
-        case NGHTTP2_RST_STREAM: {
-            return apr_snprintf(buffer, maxlen,
-                                "RST_STREAM[length=%d, flags=%d, stream=%d]",
-                                (int)frame->hd.length,
-                                frame->hd.flags, frame->hd.stream_id);
+    while ((response = h2_stream_get_unsent_response(stream)) != NULL) {
+        nghttp2_data_provider provider, *pprovider = NULL;
+        h2_ngheader *ngh;
+        const h2_priority *prio;
+        
+        if (stream->submitted) {
+            rv = NGHTTP2_PROTOCOL_ERROR;
+            goto leave;
         }
-        case NGHTTP2_SETTINGS: {
-            if (frame->hd.flags & NGHTTP2_FLAG_ACK) {
-                return apr_snprintf(buffer, maxlen,
-                                    "SETTINGS[ack=1, stream=%d]",
-                                    frame->hd.stream_id);
-            }
-            return apr_snprintf(buffer, maxlen,
-                                "SETTINGS[length=%d, stream=%d]",
-                                (int)frame->hd.length, frame->hd.stream_id);
+        
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03073)
+                      "h2_stream(%ld-%d): submit response %d, REMOTE_WINDOW_SIZE=%u",
+                      session->id, stream->id, response->http_status,
+                      (unsigned int)nghttp2_session_get_stream_remote_window_size(session->ngh2, stream->id));
+        
+        if (response->content_length != 0) {
+            memset(&provider, 0, sizeof(provider));
+            provider.source.fd = stream->id;
+            provider.read_callback = stream_data_cb;
+            pprovider = &provider;
         }
-        case NGHTTP2_PUSH_PROMISE: {
-            return apr_snprintf(buffer, maxlen,
-                                "PUSH_PROMISE[length=%d, hend=%d, stream=%d]",
-                                (int)frame->hd.length,
-                                !!(frame->hd.flags & NGHTTP2_FLAG_END_HEADERS),
-                                frame->hd.stream_id);
+        
+        /* If this stream is not a pushed one itself,
+         * and HTTP/2 server push is enabled here,
+         * and the response is in the range 200-299 *),
+         * and the remote side has pushing enabled,
+         * -> find and perform any pushes on this stream
+         *    *before* we submit the stream response itself.
+         *    This helps clients avoid opening new streams on Link
+         *    headers that get pushed right afterwards.
+         * 
+         * *) the response code is relevant, as we do not want to 
+         *    make pushes on 401 or 403 codes, neiterh on 301/302
+         *    and friends. And if we see a 304, we do not push either
+         *    as the client, having this resource in its cache, might
+         *    also have the pushed ones as well.
+         */
+        if (stream->request 
+            && !stream->request->initiated_on
+            && h2_response_is_final(response)
+            && H2_HTTP_2XX(response->http_status)
+            && h2_session_push_enabled(session)) {
+            
+            h2_stream_submit_pushes(stream);
         }
-        case NGHTTP2_PING: {
-            return apr_snprintf(buffer, maxlen,
-                                "PING[length=%d, ack=%d, stream=%d]",
-                                (int)frame->hd.length,
-                                frame->hd.flags&NGHTTP2_FLAG_ACK,
-                                frame->hd.stream_id);
+        
+        prio = h2_stream_get_priority(stream);
+        if (prio) {
+            h2_session_set_prio(session, stream, prio);
         }
-        case NGHTTP2_GOAWAY: {
-            size_t len = (frame->goaway.opaque_data_len < s_len)?
-            frame->goaway.opaque_data_len : s_len-1;
-            memcpy(scratch, frame->goaway.opaque_data, len);
-            scratch[len+1] = '\0';
-            return apr_snprintf(buffer, maxlen, "GOAWAY[error=%d, reason='%s']",
-                                frame->goaway.error_code, scratch);
+        
+        ngh = h2_util_ngheader_make_res(stream->pool, response->http_status, 
+                                        response->headers);
+        rv = nghttp2_submit_response(session->ngh2, response->stream_id,
+                                     ngh->nv, ngh->nvlen, pprovider);
+        stream->submitted = h2_response_is_final(response);
+        session->have_written = 1;
+        
+        if (stream->request && stream->request->initiated_on) {
+            ++session->pushes_submitted;
         }
-        case NGHTTP2_WINDOW_UPDATE: {
-            return apr_snprintf(buffer, maxlen,
-                                "WINDOW_UPDATE[length=%d, stream=%d]",
-                                (int)frame->hd.length, frame->hd.stream_id);
+        else {
+            ++session->responses_submitted;
         }
-        default:
-            return apr_snprintf(buffer, maxlen,
-                                "type=%d[length=%d, flags=%d, stream=%d]",
-                                frame->hd.type, (int)frame->hd.length,
-                                frame->hd.flags, frame->hd.stream_id);
     }
-}
-
-int h2_session_push_enabled(h2_session *session)
-{
-    /* iff we can and they can */
-    return (h2_config_geti(session->config, H2_CONF_PUSH)
-            && nghttp2_session_get_remote_settings(session->ngh2, 
-                                                   NGHTTP2_SETTINGS_ENABLE_PUSH));
-}
-
-static apr_status_t h2_session_send(h2_session *session)
-{
-    int rv = nghttp2_session_send(session->ngh2);
-    if (rv != 0) {
-        if (nghttp2_is_fatal(rv)) {
-            ap_log_cerror( APLOG_MARK, APLOG_DEBUG, 0, session->c,
-                          "h2_session: send gave error=%s", nghttp2_strerror(rv));
-            h2_session_shutdown(session, rv);
-            return APR_EGENERAL;
-        }
+    
+leave:
+    if (nghttp2_is_fatal(rv)) {
+        status = APR_EGENERAL;
+        dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, rv, nghttp2_strerror(rv));
+        ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c,
+                      APLOGNO(02940) "submit_response: %s", 
+                      nghttp2_strerror(rv));
     }
     
-    session->unsent_promises = 0;
-    session->unsent_submits = 0;
+    ++session->unsent_submits;
     
-    return APR_SUCCESS;
+    /* Unsent push promises are written immediately, as nghttp2
+     * 1.5.0 realizes internal stream data structures only on 
+     * send and we might need them for other submits. 
+     * Also, to conserve memory, we send at least every 10 submits
+     * so that nghttp2 does not buffer all outbound items too 
+     * long.
+     */
+    if (status == APR_SUCCESS 
+        && (session->unsent_promises || session->unsent_submits > 10)) {
+        status = h2_session_send(session);
+    }
+    return status;
 }
 
 static apr_status_t h2_session_receive(void *ctx, const char *data, 
                                        apr_size_t len, apr_size_t *readlen)
 {
     h2_session *session = ctx;
+    ssize_t n;
+    
     if (len > 0) {
-        ssize_t n = nghttp2_session_mem_recv(session->ngh2,
-                                             (const uint8_t *)data, len);
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+                      "h2_session(%ld): feeding %ld bytes to nghttp2",
+                      session->id, (long)len);
+        n = nghttp2_session_mem_recv(session->ngh2, (const uint8_t *)data, len);
         if (n < 0) {
-            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, APR_EGENERAL,
-                          session->c,
-                          "h2_session: nghttp2_session_mem_recv error=%d",
-                          (int)n);
             if (nghttp2_is_fatal((int)n)) {
-                h2_session_shutdown(session, (int)n);
+                dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, (int)n, nghttp2_strerror(n));
                 return APR_EGENERAL;
             }
         }
         else {
             *readlen = n;
+            session->io.bytes_read += n;
         }
     }
     return APR_SUCCESS;
 }
 
-static apr_status_t h2_session_read(h2_session *session, int block, int loops)
+static apr_status_t h2_session_read(h2_session *session, int block)
 {
     apr_status_t status, rstatus = APR_EAGAIN;
     conn_rec *c = session->c;
-    int i;
+    apr_off_t read_start = session->io.bytes_read;
     
-    for (i = 0; i < loops; ++i) {
+    while (1) {
         /* H2_IN filter handles all incoming data against the session.
          * We just pull at the filter chain to make it happen */
         status = ap_get_brigade(c->input_filters,
@@ -1645,7 +1568,7 @@ static apr_status_t h2_session_read(h2_session *session, int block, int loops)
                 /* successful read, reset our idle timers */
                 rstatus = APR_SUCCESS;
                 if (block) {
-                    /* successfull blocked read, try unblocked to
+                    /* successful blocked read, try unblocked to
                      * get more. */
                     block = 0;
                 }
@@ -1655,7 +1578,7 @@ static apr_status_t h2_session_read(h2_session *session, int block, int loops)
             case APR_TIMEUP:
                 return status;
             default:
-                if (!i) {
+                if (session->io.bytes_read == read_start) {
                     /* first attempt failed */
                     if (APR_STATUS_IS_ETIMEDOUT(status)
                         || APR_STATUS_IS_ECONNABORTED(status)
@@ -1668,7 +1591,7 @@ static apr_status_t h2_session_read(h2_session *session, int block, int loops)
                     }
                     else {
                         /* uncommon status, log on INFO so that we see this */
-                        ap_log_cerror( APLOG_MARK, APLOG_INFO, status, c,
+                        ap_log_cerror( APLOG_MARK, APLOG_DEBUG, status, c,
                                       APLOGNO(02950) 
                                       "h2_session(%ld): error reading, terminating",
                                       session->id);
@@ -1679,38 +1602,443 @@ static apr_status_t h2_session_read(h2_session *session, int block, int loops)
                  * status. */
                 return rstatus;
         }
+        if (!is_accepting_streams(session)) {
+            break;
+        }
+        if ((session->io.bytes_read - read_start) > (64*1024)) {
+            /* read enough in one go, give write a chance */
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c,
+                          "h2_session(%ld): read 64k, returning", session->id);
+            break;
+        }
     }
     return rstatus;
 }
 
-static apr_status_t h2_session_submit(h2_session *session)
+static int unsubmitted_iter(void *ctx, void *val)
 {
-    apr_status_t status = APR_EAGAIN;
-    h2_stream *stream;
-    
-    if (h2_stream_set_has_unsubmitted(session->streams)) {
-        /* If we have responses ready, submit them now. */
-        while ((stream = h2_mplx_next_submit(session->mplx, session->streams))) {
-            status = submit_response(session, stream);
-            ++session->unsent_submits;
-            
-            /* Unsent push promises are written immediately, as nghttp2
-             * 1.5.0 realizes internal stream data structures only on 
-             * send and we might need them for other submits. 
-             * Also, to conserve memory, we send at least every 10 submits
-             * so that nghttp2 does not buffer all outbound items too 
-             * long.
+    h2_stream *stream = val;
+    if (h2_stream_needs_submit(stream)) {
+        *((int *)ctx) = 1;
+        return 0;
+    }
+    return 1;
+}
+
+static int has_unsubmitted_streams(h2_session *session)
+{
+    int has_unsubmitted = 0;
+    h2_ihash_iter(session->streams, unsubmitted_iter, &has_unsubmitted);
+    return has_unsubmitted;
+}
+
+static int suspended_iter(void *ctx, void *val)
+{
+    h2_stream *stream = val;
+    if (h2_stream_is_suspended(stream)) {
+        *((int *)ctx) = 1;
+        return 0;
+    }
+    return 1;
+}
+
+static int has_suspended_streams(h2_session *session)
+{
+    int has_suspended = 0;
+    h2_ihash_iter(session->streams, suspended_iter, &has_suspended);
+    return has_suspended;
+}
+
+static const char *StateNames[] = {
+    "INIT",      /* H2_SESSION_ST_INIT */
+    "DONE",      /* H2_SESSION_ST_DONE */
+    "IDLE",      /* H2_SESSION_ST_IDLE */
+    "BUSY",      /* H2_SESSION_ST_BUSY */
+    "WAIT",      /* H2_SESSION_ST_WAIT */
+    "LSHUTDOWN", /* H2_SESSION_ST_LOCAL_SHUTDOWN */
+    "RSHUTDOWN", /* H2_SESSION_ST_REMOTE_SHUTDOWN */
+};
+
+static const char *state_name(h2_session_state state)
+{
+    if (state >= (sizeof(StateNames)/sizeof(StateNames[0]))) {
+        return "unknown";
+    }
+    return StateNames[state];
+}
+
+static int is_accepting_streams(h2_session *session)
+{
+    switch (session->state) {
+        case H2_SESSION_ST_IDLE:
+        case H2_SESSION_ST_BUSY:
+        case H2_SESSION_ST_WAIT:
+            return 1;
+        default:
+            return 0;
+    }
+}
+
+static void update_child_status(h2_session *session, int status, const char *msg)
+{
+    /* Assume that we also change code/msg when something really happened and
+     * avoid updating the scoreboard in between */
+    if (session->last_status_code != status 
+        || session->last_status_msg != msg) {
+        apr_snprintf(session->status, sizeof(session->status),
+                     "%s, streams: %d/%d/%d/%d/%d (open/recv/resp/push/rst)", 
+                     msg? msg : "-",
+                     (int)session->open_streams, 
+                     (int)session->remote.emitted_count,
+                     (int)session->responses_submitted,
+                     (int)session->pushes_submitted,
+                     (int)session->pushes_reset + session->streams_reset);
+        ap_update_child_status_descr(session->c->sbh, status, session->status);
+    }
+}
+
+static void transit(h2_session *session, const char *action, h2_session_state nstate)
+{
+    if (session->state != nstate) {
+        int loglvl = APLOG_DEBUG;
+        if ((session->state == H2_SESSION_ST_BUSY && nstate == H2_SESSION_ST_WAIT)
+            || (session->state == H2_SESSION_ST_WAIT && nstate == H2_SESSION_ST_BUSY)){
+            loglvl = APLOG_TRACE1;
+        }
+        ap_log_cerror(APLOG_MARK, loglvl, 0, session->c, APLOGNO(03078)
+                      "h2_session(%ld): transit [%s] -- %s --> [%s]", session->id,
+                      state_name(session->state), action, state_name(nstate));
+        session->state = nstate;
+        switch (session->state) {
+            case H2_SESSION_ST_IDLE:
+                update_child_status(session, (session->open_streams == 0? 
+                                              SERVER_BUSY_KEEPALIVE
+                                              : SERVER_BUSY_READ), "idle");
+                break;
+            case H2_SESSION_ST_REMOTE_SHUTDOWN:
+                update_child_status(session, SERVER_CLOSING, "remote goaway");
+                break;
+            case H2_SESSION_ST_LOCAL_SHUTDOWN:
+                update_child_status(session, SERVER_CLOSING, "local goaway");
+                break;
+            case H2_SESSION_ST_DONE:
+                update_child_status(session, SERVER_CLOSING, "done");
+                break;
+            default:
+                /* nop */
+                break;
+        }
+    }
+}
+
+static void h2_session_ev_init(h2_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_SESSION_ST_INIT:
+            transit(session, "init", H2_SESSION_ST_BUSY);
+            break;
+        default:
+            /* nop */
+            break;
+    }
+}
+
+static void h2_session_ev_local_goaway(h2_session *session, int arg, const char *msg)
+{
+    session->local.accepting = 0;
+    cleanup_streams(session);
+    switch (session->state) {
+        case H2_SESSION_ST_LOCAL_SHUTDOWN:
+            /* already did that? */
+            break;
+        case H2_SESSION_ST_IDLE:
+        case H2_SESSION_ST_REMOTE_SHUTDOWN:
+            /* all done */
+            transit(session, "local goaway", H2_SESSION_ST_DONE);
+            break;
+        default:
+            transit(session, "local goaway", H2_SESSION_ST_LOCAL_SHUTDOWN);
+            break;
+    }
+}
+
+static void h2_session_ev_remote_goaway(h2_session *session, int arg, const char *msg)
+{
+    session->remote.accepting = 0;
+    cleanup_streams(session);
+    switch (session->state) {
+        case H2_SESSION_ST_REMOTE_SHUTDOWN:
+            /* already received that? */
+            break;
+        case H2_SESSION_ST_IDLE:
+        case H2_SESSION_ST_LOCAL_SHUTDOWN:
+            /* all done */
+            transit(session, "remote goaway", H2_SESSION_ST_DONE);
+            break;
+        default:
+            transit(session, "remote goaway", H2_SESSION_ST_REMOTE_SHUTDOWN);
+            break;
+    }
+}
+
+static void h2_session_ev_conn_error(h2_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_SESSION_ST_INIT:
+        case H2_SESSION_ST_DONE:
+        case H2_SESSION_ST_LOCAL_SHUTDOWN:
+            /* just leave */
+            transit(session, "conn error", H2_SESSION_ST_DONE);
+            break;
+        
+        default:
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03401)
+                          "h2_session(%ld): conn error -> shutdown", session->id);
+            h2_session_shutdown(session, arg, msg, 0);
+            break;
+    }
+}
+
+static void h2_session_ev_proto_error(h2_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_SESSION_ST_DONE:
+        case H2_SESSION_ST_LOCAL_SHUTDOWN:
+            /* just leave */
+            transit(session, "proto error", H2_SESSION_ST_DONE);
+            break;
+        
+        default:
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03402)
+                          "h2_session(%ld): proto error -> shutdown", session->id);
+            h2_session_shutdown(session, arg, msg, 0);
+            break;
+    }
+}
+
+static void h2_session_ev_conn_timeout(h2_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_SESSION_ST_LOCAL_SHUTDOWN:
+            transit(session, "conn timeout", H2_SESSION_ST_DONE);
+            break;
+        default:
+            h2_session_shutdown(session, arg, msg, 1);
+            transit(session, "conn timeout", H2_SESSION_ST_DONE);
+            break;
+    }
+}
+
+static void h2_session_ev_no_io(h2_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_SESSION_ST_BUSY:
+        case H2_SESSION_ST_LOCAL_SHUTDOWN:
+        case H2_SESSION_ST_REMOTE_SHUTDOWN:
+            /* Nothing to READ, nothing to WRITE on the master connection.
+             * Possible causes:
+             * - we wait for the client to send us sth
+             * - we wait for started tasks to produce output
+             * - we have finished all streams and the client has sent GO_AWAY
              */
-            if (status == APR_SUCCESS 
-                && (session->unsent_promises || session->unsent_submits > 10)) {
-                status = h2_session_send(session);
-                if (status != APR_SUCCESS) {
-                    break;
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+                          "h2_session(%ld): NO_IO event, %d streams open", 
+                          session->id, session->open_streams);
+            if (session->open_streams > 0) {
+                if (has_unsubmitted_streams(session) 
+                    || has_suspended_streams(session)) {
+                    /* waiting for at least one stream to produce data */
+                    transit(session, "no io", H2_SESSION_ST_WAIT);
+                }
+                else {
+                    /* we have streams open, and all are submitted and none
+                     * is suspended. The only thing keeping us from WRITEing
+                     * more must be the flow control.
+                     * This means we only wait for WINDOW_UPDATE from the 
+                     * client and can block on READ. */
+                    transit(session, "no io (flow wait)", H2_SESSION_ST_IDLE);
+                    session->idle_until = apr_time_now() + session->s->timeout;
+                    session->keep_sync_until = session->idle_until;
+                    /* Make sure we have flushed all previously written output
+                     * so that the client will react. */
+                    if (h2_conn_io_flush(&session->io) != APR_SUCCESS) {
+                        dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+                        return;
+                    }
                 }
             }
-        }
+            else if (is_accepting_streams(session)) {
+                /* When we have no streams, but accept new, switch to idle */
+                apr_time_t now = apr_time_now();
+                transit(session, "no io (keepalive)", H2_SESSION_ST_IDLE);
+                session->idle_until = (session->remote.emitted_count? 
+                                       session->s->keep_alive_timeout : 
+                                       session->s->timeout) + now;
+                session->keep_sync_until = now + apr_time_from_sec(1);
+            }
+            else {
+                /* We are no longer accepting new streams and there are
+                 * none left. Time to leave. */
+                h2_session_shutdown(session, arg, msg, 0);
+                transit(session, "no io", H2_SESSION_ST_DONE);
+            }
+            break;
+        default:
+            /* nop */
+            break;
+    }
+}
+
+static void h2_session_ev_stream_ready(h2_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_SESSION_ST_WAIT:
+            transit(session, "stream ready", H2_SESSION_ST_BUSY);
+            break;
+        default:
+            /* nop */
+            break;
+    }
+}
+
+static void h2_session_ev_data_read(h2_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_SESSION_ST_IDLE:
+        case H2_SESSION_ST_WAIT:
+            transit(session, "data read", H2_SESSION_ST_BUSY);
+            break;
+        default:
+            /* nop */
+            break;
+    }
+}
+
+static void h2_session_ev_ngh2_done(h2_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_SESSION_ST_DONE:
+            /* nop */
+            break;
+        default:
+            transit(session, "nghttp2 done", H2_SESSION_ST_DONE);
+            break;
+    }
+}
+
+static void h2_session_ev_mpm_stopping(h2_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_SESSION_ST_DONE:
+        case H2_SESSION_ST_LOCAL_SHUTDOWN:
+            /* nop */
+            break;
+        default:
+            h2_session_shutdown(session, arg, msg, 0);
+            break;
+    }
+}
+
+static void h2_session_ev_pre_close(h2_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_SESSION_ST_DONE:
+        case H2_SESSION_ST_LOCAL_SHUTDOWN:
+            /* nop */
+            break;
+        default:
+            h2_session_shutdown(session, arg, msg, 1);
+            break;
+    }
+}
+
+static void h2_session_ev_stream_open(h2_session *session, int arg, const char *msg)
+{
+    ++session->open_streams;
+    switch (session->state) {
+        case H2_SESSION_ST_IDLE:
+            if (session->open_streams == 1) {
+                /* enter tiomeout, since we have a stream again */
+                session->idle_until = (session->s->timeout + apr_time_now());
+            }
+            break;
+        default:
+            break;
+    }
+}
+
+static void h2_session_ev_stream_done(h2_session *session, int arg, const char *msg)
+{
+    --session->open_streams;
+    switch (session->state) {
+        case H2_SESSION_ST_IDLE:
+            if (session->open_streams == 0) {
+                /* enter keepalive timeout, since we no longer have streams */
+                session->idle_until = (session->s->keep_alive_timeout
+                                       + apr_time_now());
+            }
+            break;
+        default:
+            break;
+    }
+}
+
+static void dispatch_event(h2_session *session, h2_session_event_t ev, 
+                      int arg, const char *msg)
+{
+    switch (ev) {
+        case H2_SESSION_EV_INIT:
+            h2_session_ev_init(session, arg, msg);
+            break;            
+        case H2_SESSION_EV_LOCAL_GOAWAY:
+            h2_session_ev_local_goaway(session, arg, msg);
+            break;
+        case H2_SESSION_EV_REMOTE_GOAWAY:
+            h2_session_ev_remote_goaway(session, arg, msg);
+            break;
+        case H2_SESSION_EV_CONN_ERROR:
+            h2_session_ev_conn_error(session, arg, msg);
+            break;
+        case H2_SESSION_EV_PROTO_ERROR:
+            h2_session_ev_proto_error(session, arg, msg);
+            break;
+        case H2_SESSION_EV_CONN_TIMEOUT:
+            h2_session_ev_conn_timeout(session, arg, msg);
+            break;
+        case H2_SESSION_EV_NO_IO:
+            h2_session_ev_no_io(session, arg, msg);
+            break;
+        case H2_SESSION_EV_STREAM_READY:
+            h2_session_ev_stream_ready(session, arg, msg);
+            break;
+        case H2_SESSION_EV_DATA_READ:
+            h2_session_ev_data_read(session, arg, msg);
+            break;
+        case H2_SESSION_EV_NGH2_DONE:
+            h2_session_ev_ngh2_done(session, arg, msg);
+            break;
+        case H2_SESSION_EV_MPM_STOPPING:
+            h2_session_ev_mpm_stopping(session, arg, msg);
+            break;
+        case H2_SESSION_EV_PRE_CLOSE:
+            h2_session_ev_pre_close(session, arg, msg);
+            break;
+        case H2_SESSION_EV_STREAM_OPEN:
+            h2_session_ev_stream_open(session, arg, msg);
+            break;
+        case H2_SESSION_EV_STREAM_DONE:
+            h2_session_ev_stream_done(session, arg, msg);
+            break;
+        default:
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+                          "h2_session(%ld): unknown event %d", 
+                          session->id, ev);
+            break;
+    }
+    
+    if (session->state == H2_SESSION_ST_DONE) {
+        h2_mplx_abort(session->mplx);
     }
-    return status;
 }
 
 static const int MAX_WAIT_MICROS = 200 * 1000;
@@ -1719,281 +2047,305 @@ apr_status_t h2_session_process(h2_session *session, int async)
 {
     apr_status_t status = APR_SUCCESS;
     conn_rec *c = session->c;
-    int rv, have_written, have_read, remain_secs;
-    const char *reason = "";
+    int rv, mpm_state, trace = APLOGctrace3(c);
 
-    ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
-                  "h2_session(%ld): process start, async=%d", session->id, async);
+    if (trace) {
+        ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+                      "h2_session(%ld): process start, async=%d", 
+                      session->id, async);
+    }
                   
+    if (c->cs) {
+        c->cs->state = CONN_STATE_WRITE_COMPLETION;
+    }
+    
     while (1) {
-        have_read = have_written = 0;
+        trace = APLOGctrace3(c);
+        session->have_read = session->have_written = 0;
 
-        if (session->aborted) {
-            reason = "aborted";
-            status = APR_ECONNABORTED;
-            goto out;
+        if (!ap_mpm_query(AP_MPMQ_MPM_STATE, &mpm_state)) {
+            if (mpm_state == AP_MPMQ_STOPPING) {
+                dispatch_event(session, H2_SESSION_EV_MPM_STOPPING, 0, NULL);
+                break;
+            }
         }
         
+        session->status[0] = '\0';
+        
         switch (session->state) {
             case H2_SESSION_ST_INIT:
+                ap_update_child_status_from_conn(c->sbh, SERVER_BUSY_READ, c);
                 if (!h2_is_acceptable_connection(c, 1)) {
-                    nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, 0,
-                                          NGHTTP2_INADEQUATE_SECURITY, NULL, 0);
-                    nghttp2_session_send(session->ngh2);
-                    session->server_goaway = 1;
+                    update_child_status(session, SERVER_BUSY_READ, "inadequate security");
+                    h2_session_shutdown(session, NGHTTP2_INADEQUATE_SECURITY, NULL, 1);
                 } 
-                
-                status = h2_session_start(session, &rv);
-                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c,
-                              "h2_session(%ld): started on %s:%d", session->id,
-                              session->s->server_hostname,
-                              c->local_addr->port);
-                if (status != APR_SUCCESS) {
-                    reason = "start failed";
-                    goto out;
+                else {
+                    update_child_status(session, SERVER_BUSY_READ, "init");
+                    status = h2_session_start(session, &rv);
+                    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c, APLOGNO(03079)
+                                  "h2_session(%ld): started on %s:%d", session->id,
+                                  session->s->server_hostname,
+                                  c->local_addr->port);
+                    if (status != APR_SUCCESS) {
+                        dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+                    }
+                    dispatch_event(session, H2_SESSION_EV_INIT, 0, NULL);
                 }
-                    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
-                                  "h2_session(%ld): INIT -> BUSY", session->id);
-                session->state = H2_SESSION_ST_BUSY;
                 break;
                 
-            case H2_SESSION_ST_IDLE_READ:
-                h2_filter_cin_timeout_set(session->cin, session->timeout_secs);
-                ap_update_child_status(c->sbh, SERVER_BUSY_READ, NULL);
-                status = h2_session_read(session, 1, 10);
-                if (APR_STATUS_IS_TIMEUP(status)) {
-                    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
-                                  "h2_session(%ld): IDLE -> KEEPALIVE", session->id);
-                    session->state = H2_SESSION_ST_KEEPALIVE;
-                }
-                else if (status == APR_SUCCESS) {
-                    /* got something, go busy again */
-                    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
-                                  "h2_session(%ld): IDLE -> BUSY", session->id);
-                    session->state = H2_SESSION_ST_BUSY;
+            case H2_SESSION_ST_IDLE:
+                /* make certain, we send everything before we idle */
+                h2_conn_io_flush(&session->io);
+                if (!session->keep_sync_until && async && !session->open_streams
+                    && !session->r && session->remote.emitted_count) {
+                    if (trace) {
+                        ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+                                      "h2_session(%ld): async idle, nonblock read, "
+                                      "%d streams open", session->id, 
+                                      session->open_streams);
+                    }
+                    /* We do not return to the async mpm immediately, since under
+                     * load, mpms show the tendency to throw keep_alive connections
+                     * away very rapidly.
+                     * So, if we are still processing streams, we wait for the
+                     * normal timeout first and, on timeout, close.
+                     * If we have no streams, we still wait a short amount of
+                     * time here for the next frame to arrive, before handing
+                     * it to keep_alive processing of the mpm.
+                     */
+                    status = h2_session_read(session, 0);
+                    
+                    if (status == APR_SUCCESS) {
+                        session->have_read = 1;
+                        dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
+                    }
+                    else if (APR_STATUS_IS_EAGAIN(status) || APR_STATUS_IS_TIMEUP(status)) {
+                        if (apr_time_now() > session->idle_until) {
+                            dispatch_event(session, H2_SESSION_EV_CONN_TIMEOUT, 0, NULL);
+                        }
+                        else {
+                            status = APR_EAGAIN;
+                            goto out;
+                        }
+                    }
+                    else {
+                        ap_log_cerror( APLOG_MARK, APLOG_DEBUG, status, c,
+                                     APLOGNO(03403)
+                                      "h2_session(%ld): idle, no data, error", 
+                                      session->id);
+                        dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, "timeout");
+                    }
                 }
                 else {
-                    reason = "keepalive error";
-                    goto out;
+                    if (trace) {
+                        ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+                                      "h2_session(%ld): sync idle, stutter 1-sec, "
+                                      "%d streams open", session->id,
+                                      session->open_streams);
+                    }
+                    /* We wait in smaller increments, using a 1 second timeout.
+                     * That gives us the chance to check for MPMQ_STOPPING often. 
+                     */
+                    status = h2_mplx_idle(session->mplx);
+                    if (status != APR_SUCCESS) {
+                        dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 
+                                       H2_ERR_ENHANCE_YOUR_CALM, "less is more");
+                    }
+                    h2_filter_cin_timeout_set(session->cin, apr_time_from_sec(1));
+                    status = h2_session_read(session, 1);
+                    if (status == APR_SUCCESS) {
+                        session->have_read = 1;
+                        dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
+                    }
+                    else if (status == APR_EAGAIN) {
+                        /* nothing to read */
+                    }
+                    else if (APR_STATUS_IS_TIMEUP(status)) {
+                        apr_time_t now = apr_time_now();
+                        if (now > session->keep_sync_until) {
+                            /* if we are on an async mpm, now is the time that
+                             * we may dare to pass control to it. */
+                            session->keep_sync_until = 0;
+                        }
+                        if (now > session->idle_until) {
+                            if (trace) {
+                                ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+                                              "h2_session(%ld): keepalive timeout",
+                                              session->id);
+                            }
+                            dispatch_event(session, H2_SESSION_EV_CONN_TIMEOUT, 0, "timeout");
+                        }
+                        else if (trace) {                        
+                            ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+                                          "h2_session(%ld): keepalive, %f sec left",
+                                          session->id, (session->idle_until - now) / 1000000.0f);
+                        }
+                        /* continue reading handling */
+                    }
+                    else {
+                        if (trace) {
+                            ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+                                          "h2_session(%ld): idle(1 sec timeout) "
+                                          "read failed", session->id);
+                        }
+                        dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, "error");
+                    }
                 }
+                
                 break;
                 
             case H2_SESSION_ST_BUSY:
+            case H2_SESSION_ST_LOCAL_SHUTDOWN:
+            case H2_SESSION_ST_REMOTE_SHUTDOWN:
                 if (nghttp2_session_want_read(session->ngh2)) {
-                    h2_filter_cin_timeout_set(session->cin, session->timeout_secs);
-                    status = h2_session_read(session, 0, 10);
+                    ap_update_child_status(session->c->sbh, SERVER_BUSY_READ, NULL);
+                    h2_filter_cin_timeout_set(session->cin, session->s->timeout);
+                    status = h2_session_read(session, 0);
                     if (status == APR_SUCCESS) {
-                        /* got something, continue processing */
-                        have_read = 1;
+                        session->have_read = 1;
+                        dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
                     }
                     else if (status == APR_EAGAIN) {
                         /* nothing to read */
                     }
+                    else if (APR_STATUS_IS_TIMEUP(status)) {
+                        dispatch_event(session, H2_SESSION_EV_CONN_TIMEOUT, 0, NULL);
+                        break;
+                    }
                     else {
-                        reason = "busy read error";
-                        goto out;
+                        dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
                     }
                 }
                 
-                if (!h2_stream_set_is_empty(session->streams)) {
-                    /* resume any streams for which data is available again */
-                    h2_session_resume_streams_with_data(session);
-                    /* Submit any responses/push_promises that are ready */
-                    status = h2_session_submit(session);
-                    if (status == APR_SUCCESS) {
-                        have_written = 1;
-                    }
-                    else if (status != APR_EAGAIN) {
-                        reason = "submit error";
-                        goto out;
-                    }
-                    /* send out window updates for our inputs */
-                    status = h2_mplx_in_update_windows(session->mplx);
-                    if (status != APR_SUCCESS && status != APR_EAGAIN) {
-                        reason = "window update error";
-                        goto out;
-                    }
+                /* trigger window updates, stream resumes and submits */
+                status = h2_mplx_dispatch_master_events(session->mplx, 
+                                                        on_stream_resume,
+                                                        on_stream_response, 
+                                                        session);
+                if (status != APR_SUCCESS) {
+                    ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, c,
+                                  "h2_session(%ld): dispatch error", 
+                                  session->id);
+                    dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 
+                                   H2_ERR_INTERNAL_ERROR, 
+                                   "dispatch error");
+                    break;
                 }
                 
                 if (nghttp2_session_want_write(session->ngh2)) {
+                    ap_update_child_status(session->c->sbh, SERVER_BUSY_WRITE, NULL);
                     status = h2_session_send(session);
                     if (status != APR_SUCCESS) {
-                        reason = "send error";
-                        goto out;
+                        dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 
+                                       H2_ERR_INTERNAL_ERROR, "writing");
+                        break;
                     }
-                    have_written = 1;
                 }
                 
-                if (have_read || have_written) {
-                    session->wait_us = 0;
-                }
-                else {
-                    /* nothing for input and output to do. If we remain
-                     * in this state, we go into a tight loop and suck up
-                     * CPU cycles. 
-                     * Ideally, we'd like to do a blocking read, but that
-                     * is not possible if we have scheduled tasks and wait
-                     * for them to produce something. */
-                    if (h2_stream_set_is_empty(session->streams)) {
-                        /* When we have no streams, no task event are possible,
-                         * switch to blocking reads */
-                    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
-                                  "h2_session(%ld): BUSY -> IDLE", session->id);
-                        session->state = H2_SESSION_ST_IDLE_READ;
-                    }
-                    else if (!h2_stream_set_has_unsubmitted(session->streams)
-                             && !h2_stream_set_has_suspended(session->streams)) {
-                        /* none of our streams is waiting for a response or
-                         * new output data from task processing, 
-                         * switch to blocking reads. */
-                    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
-                                  "h2_session(%ld): BUSY -> IDLE", session->id);
-                        session->state = H2_SESSION_ST_IDLE_READ;
-                    }
-                    else {
-                        /* Unable to do blocking reads, as we wait on events from
-                         * task processing in other threads. Do a busy wait with
-                         * backoff timer. */
-                        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
-                                      "h2_session(%ld): BUSY -> WAIT", session->id);
-                        session->state = H2_SESSION_ST_BUSY_WAIT;
+                if (session->have_read || session->have_written) {
+                    if (session->wait_us) {
+                        session->wait_us = 0;
                     }
                 }
+                else if (!nghttp2_session_want_write(session->ngh2)) {
+                    dispatch_event(session, H2_SESSION_EV_NO_IO, 0, NULL);
+                }
                 break;
                 
-            case H2_SESSION_ST_BUSY_WAIT:
-                session->wait_us = H2MAX(session->wait_us, 10);
-                if (APLOGctrace1(c)) {
-                    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+            case H2_SESSION_ST_WAIT:
+                if (session->wait_us <= 0) {
+                    session->wait_us = 10;
+                    if (h2_conn_io_flush(&session->io) != APR_SUCCESS) {
+                        dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+                        break;
+                    }
+                }
+                else {
+                    /* repeating, increase timer for graceful backoff */
+                    session->wait_us = H2MIN(session->wait_us*2, MAX_WAIT_MICROS);
+                }
+
+                if (trace) {
+                    ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, c,
                                   "h2_session: wait for data, %ld micros", 
                                   (long)session->wait_us);
                 }
-                
-                h2_conn_io_flush(&session->io);
-                ap_log_cerror( APLOG_MARK, APLOG_TRACE2, status, c,
-                              "h2_session(%ld): process -> trywait", session->id);
                 status = h2_mplx_out_trywait(session->mplx, session->wait_us, 
                                              session->iowait);
                 if (status == APR_SUCCESS) {
-                    /* got something, go busy again */
-                    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
-                                  "h2_session(%ld): WAIT -> BUSY", session->id);
-                    session->state = H2_SESSION_ST_BUSY;
-                }
-                else if (status == APR_TIMEUP) {
-                    if (nghttp2_session_want_read(session->ngh2)) {
-                        status = h2_session_read(session, 0, 1);
-                        if (status == APR_SUCCESS) {
-                            /* got something, go busy again */
-                            session->wait_us = 0;
-                            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
-                                          "h2_session(%ld): WAIT -> BUSY", session->id);
-                            session->state = H2_SESSION_ST_BUSY;
-                        }
-                        else if (status != APR_EAGAIN) {
-                            reason = "busy read error";
-                            goto out;
-                        }
-                    }
-                    /* nothing, increase timer for graceful backup */
-                    session->wait_us = H2MIN(session->wait_us*2, MAX_WAIT_MICROS);
-                    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
-                                  "h2_session(%ld): WAIT -> BUSY", session->id);
-                    session->state = H2_SESSION_ST_BUSY;
-                }
-                else {
-                    reason = "busy wait error";
-                    goto out;
-                }
-                break;
-                
-            case H2_SESSION_ST_KEEPALIVE:
-                /* Our normal H2Timeout has passed and we are considering to
-                 * extend that with the H2KeepAliveTimeout. */
-                remain_secs = session->keepalive_secs - session->timeout_secs;
-                if (remain_secs <= 0) {
-                    /* keepalive is <= normal timeout, close the session */
-                    reason = "keepalive expired";
-                    h2_session_shutdown(session, 0);
-                    goto out;
+                    session->wait_us = 0;
+                    dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
                 }
-                session->c->keepalive = AP_CONN_KEEPALIVE;
-                ap_update_child_status_from_conn(c->sbh, SERVER_BUSY_KEEPALIVE, c);
-                
-                if ((apr_time_sec(session->s->keep_alive_timeout) >= remain_secs)
-                    && async && session->c->cs
-                    && !session->r) {
-                    /* Async MPMs are able to handle keep-alive connections without
-                     * blocking a thread. For this to happen, we need to return from
-                     * processing, indicating the IO event we are waiting for, and
-                     * may be called again if the event happens.
-                     * TODO: this does not properly GOAWAY connections...
-                     * TODO: This currently does not work on upgraded requests...
-                     */
-                    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
-                                  "h2_session(%ld): async KEEPALIVE -> IDLE_READ", session->id);
-                    session->state = H2_SESSION_ST_IDLE_READ;
-                    session->c->cs->state = CONN_STATE_WRITE_COMPLETION;
-                    reason = "async keepalive";
-                    status = APR_SUCCESS;
-                    goto out;
+                else if (APR_STATUS_IS_TIMEUP(status)) {
+                    /* go back to checking all inputs again */
+                    transit(session, "wait cycle", session->local.accepting? 
+                            H2_SESSION_ST_BUSY : H2_SESSION_ST_LOCAL_SHUTDOWN);
                 }
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
-                              "h2_session(%ld): KEEPALIVE read", session->id);
-                h2_filter_cin_timeout_set(session->cin, remain_secs);
-                status = h2_session_read(session, 1, 1);
-                if (APR_STATUS_IS_TIMEUP(status)) {
-                    reason = "keepalive expired";
-                    h2_session_shutdown(session, 0);
-                    goto out;
+                else if (APR_STATUS_IS_ECONNRESET(status) 
+                         || APR_STATUS_IS_ECONNABORTED(status)) {
+                    dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
                 }
-                else if (status != APR_SUCCESS) {
-                    reason = "keepalive error";
-                    goto out;
+                else {
+                    ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, c,
+                                 APLOGNO(03404)
+                                  "h2_session(%ld): waiting on conditional",
+                                  session->id);
+                    h2_session_shutdown(session, H2_ERR_INTERNAL_ERROR, 
+                                        "cond wait error", 0);
                 }
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
-                              "h2_session(%ld): KEEPALIVE -> BUSY", session->id);
-                session->state = H2_SESSION_ST_BUSY;
                 break;
                 
-            case H2_SESSION_ST_CLOSING:
-                if (nghttp2_session_want_write(session->ngh2)) {
-                    status = h2_session_send(session);
-                    if (status != APR_SUCCESS) {
-                        reason = "send error";
-                        goto out;
-                    }
-                    have_written = 1;
-                }
-                reason = "closing";
+            case H2_SESSION_ST_DONE:
+                status = APR_EOF;
                 goto out;
                 
-            case H2_SESSION_ST_ABORTED:
-                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c,
-                              "h2_session(%ld): processing ABORTED", session->id);
-                return APR_ECONNABORTED;
-                
             default:
                 ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EGENERAL, c,
-                              "h2_session(%ld): state %d", session->id, session->state);
-                return APR_EGENERAL;
+                              APLOGNO(03080)
+                              "h2_session(%ld): unknown state %d", session->id, session->state);
+                dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, 0, NULL);
+                break;
         }
 
-        if (!nghttp2_session_want_read(session->ngh2)
-            && !nghttp2_session_want_write(session->ngh2)) {
-            session->state = H2_SESSION_ST_CLOSING;
-        }        
-
-        if (have_written) {
-            h2_conn_io_flush(&session->io);
+        if (!nghttp2_session_want_read(session->ngh2) 
+                 && !nghttp2_session_want_write(session->ngh2)) {
+            dispatch_event(session, H2_SESSION_EV_NGH2_DONE, 0, NULL); 
+        }
+        if (session->reprioritize) {
+            h2_mplx_reprioritize(session->mplx, stream_pri_cmp, session);
+            session->reprioritize = 0;
         }
     }
+    
 out:
-    if (have_written) {
-        h2_conn_io_flush(&session->io);
+    if (trace) {
+        ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+                      "h2_session(%ld): [%s] process returns", 
+                      session->id, state_name(session->state));
+    }
+    
+    if ((session->state != H2_SESSION_ST_DONE)
+        && (APR_STATUS_IS_EOF(status)
+            || APR_STATUS_IS_ECONNRESET(status) 
+            || APR_STATUS_IS_ECONNABORTED(status))) {
+            dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+        }
+
+    status = (session->state == H2_SESSION_ST_DONE)? APR_EOF : APR_SUCCESS;
+    if (session->state == H2_SESSION_ST_DONE) {
+        if (!session->eoc_written) {
+            session->eoc_written = 1;
+            h2_conn_io_write_eoc(&session->io, session);
+        }
     }
-    ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
-                  "h2_session(%ld): process return, state %d, reason '%s'", 
-                  session->id, session->state, reason);
+    
     return status;
 }
+
+apr_status_t h2_session_pre_close(h2_session *session, int async)
+{
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, 
+                  "h2_session(%ld): pre_close", session->id);
+    dispatch_event(session, H2_SESSION_EV_PRE_CLOSE, 0, "timeout");
+    return APR_SUCCESS;
+}