]> granicus.if.org Git - apache/blobdiff - modules/http2/h2_session.c
mod_http2: push diary to avoid duplicate pushes, cache-digest handling, http2-status...
[apache] / modules / http2 / h2_session.c
index d12b1107928c722b014c500b80e96b7bd453b371..34575a6b55a1e031a03d41a48be883cdb9977ccd 100644 (file)
@@ -74,13 +74,14 @@ static apr_status_t h2_session_receive(void *ctx,
                                        const char *data, apr_size_t len,
                                        apr_size_t *readlen);
 
+static int is_accepting_streams(h2_session *session); 
+static void dispatch_event(h2_session *session, h2_session_event_t ev, 
+                             int err, const char *msg);
+
 h2_stream *h2_session_open_stream(h2_session *session, int stream_id)
 {
     h2_stream * stream;
     apr_pool_t *stream_pool;
-    if (session->aborted) {
-        return NULL;
-    }
     
     if (session->spare) {
         stream_pool = session->spare;
@@ -95,6 +96,7 @@ h2_stream *h2_session_open_stream(h2_session *session, int stream_id)
     h2_stream_set_add(session->streams, stream);
     if (H2_STREAM_CLIENT_INITIATED(stream_id)
         && stream_id > session->max_stream_received) {
+        ++session->requests_received;
         session->max_stream_received = stream->id;
     }
     
@@ -174,7 +176,6 @@ static apr_status_t stream_schedule(h2_session *session,
                                     h2_stream *stream, int eos)
 {
     (void)session;
-    ++session->requests_received;
     return h2_stream_schedule(stream, eos, h2_session_push_enabled(session), 
                               stream_pri_cmp, session);
 }
@@ -210,16 +211,14 @@ static int on_invalid_frame_recv_cb(nghttp2_session *ngh2,
     h2_session *session = (h2_session *)userp;
     (void)ngh2;
     
-    if (session->aborted) {
-        return NGHTTP2_ERR_CALLBACK_FAILURE;
-    }
-    if (APLOGctrace2(session->c)) {
+    if (APLOGcdebug(session->c)) {
         char buffer[256];
         
         frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
-                      "h2_session: callback on_invalid_frame_recv error=%d %s",
-                      error, buffer);
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+                      "h2_session(%ld): recv unknown FRAME[%s], frames=%ld/%ld (r/s)",
+                      session->id, buffer, (long)session->frames_received,
+                     (long)session->frames_sent);
     }
     return 0;
 }
@@ -234,8 +233,9 @@ static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags,
     int rv;
     
     (void)flags;
-    if (session->aborted) {
-        return NGHTTP2_ERR_CALLBACK_FAILURE;
+    if (!is_accepting_streams(session)) {
+        /* ignore */
+        return 0;
     }
     
     stream = h2_session_get_stream(session, stream_id);
@@ -298,9 +298,6 @@ static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id,
     h2_stream *stream;
     
     (void)ngh2;
-    if (session->aborted) {
-        return NGHTTP2_ERR_CALLBACK_FAILURE;
-    }
     stream = h2_session_get_stream(session, stream_id);
     if (stream) {
         stream_release(session, stream, error_code);
@@ -339,8 +336,9 @@ static int on_header_cb(nghttp2_session *ngh2, const nghttp2_frame *frame,
     
     (void)ngh2;
     (void)flags;
-    if (session->aborted) {
-        return NGHTTP2_ERR_CALLBACK_FAILURE;
+    if (!is_accepting_streams(session)) {
+        /* just ignore */
+        return 0;
     }
     
     stream = h2_session_get_stream(session, frame->hd.stream_id);
@@ -374,10 +372,6 @@ static int on_frame_recv_cb(nghttp2_session *ng2s,
     apr_status_t status = APR_SUCCESS;
     h2_stream *stream;
     
-    if (session->aborted) {
-        return NGHTTP2_ERR_CALLBACK_FAILURE;
-    }
-    
     if (APLOGcdebug(session->c)) {
         char buffer[256];
         
@@ -455,13 +449,16 @@ static int on_frame_recv_cb(nghttp2_session *ng2s,
                           "h2_session(%ld-%d): RST_STREAM by client, errror=%d",
                           session->id, (int)frame->hd.stream_id,
                           (int)frame->rst_stream.error_code);
-            ++session->streams_reset;
+            stream = h2_session_get_stream(session, frame->hd.stream_id);
+            if (stream && stream->initiated_on) {
+                ++session->pushes_reset;
+            }
+            else {
+                ++session->streams_reset;
+            }
             break;
         case NGHTTP2_GOAWAY:
-            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
-                          "h2_session(%ld): GOAWAY errror=%d",
-                          session->id, (int)frame->goaway.error_code);
-            session->client_goaway = 1;
+            dispatch_event(session, H2_SESSION_EV_REMOTE_GOAWAY, 0, NULL);
             break;
         default:
             if (APLOGctrace2(session->c)) {
@@ -518,10 +515,6 @@ static int on_send_data_cb(nghttp2_session *ngh2,
     
     (void)ngh2;
     (void)source;
-    if (session->aborted) {
-        return NGHTTP2_ERR_CALLBACK_FAILURE;
-    }
-    
     if (frame->data.padlen > H2_MAX_PADLEN) {
         return NGHTTP2_ERR_PROTO;
     }
@@ -695,6 +688,27 @@ static void h2_session_destroy(h2_session *session)
     }
 }
 
+static apr_status_t h2_session_shutdown(h2_session *session, int reason, const char *msg)
+{
+    apr_status_t status = APR_SUCCESS;
+    const char *err = msg;
+    
+    AP_DEBUG_ASSERT(session);
+    if (!err && reason) {
+        err = nghttp2_strerror(reason);
+    }
+    nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, 
+                          h2_mplx_get_max_stream_started(session->mplx), 
+                          reason, (uint8_t*)err, err? strlen(err):0);
+    status = nghttp2_session_send(session->ngh2);
+    h2_conn_io_flush(&session->io);
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+                  "session(%ld): sent GOAWAY, err=%d, msg=%s", 
+                  session->id, reason, err? err : "");
+    dispatch_event(session, H2_SESSION_EV_LOCAL_GOAWAY, reason, err);
+    return status;
+}
+
 static apr_status_t session_pool_cleanup(void *data)
 {
     h2_session *session = data;
@@ -707,6 +721,22 @@ static apr_status_t session_pool_cleanup(void *data)
      */
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
                   "session(%ld): pool_cleanup", session->id);
+    
+    if (session->state != H2_SESSION_ST_DONE 
+        && session->state != H2_SESSION_ST_LOCAL_SHUTDOWN) {
+        /* Not good. The connection is being torn down and we have
+         * not sent a goaway. This is considered a protocol error and
+         * the client has to assume that any streams "in flight" may have
+         * been processed and are not safe to retry.
+         * As clients with idle connection may only learn about a closed
+         * connection when sending the next request, this has the effect
+         * that at least this one request will fail.
+         */
+        ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, session->c,
+                      "session(%ld): connection disappeared without proper "
+                      "goodbye, clients will be confused, should not happen", 
+                      session->id);
+    }
     /* keep us from destroying the pool, since that is already ongoing. */
     session->pool = NULL;
     h2_session_destroy(session);
@@ -758,6 +788,7 @@ static h2_session *h2_session_create_int(conn_rec *c,
 {
     nghttp2_session_callbacks *callbacks = NULL;
     nghttp2_option *options = NULL;
+    uint32_t n;
 
     apr_pool_t *pool = NULL;
     apr_status_t status = apr_pool_create(&pool, c->pool);
@@ -859,13 +890,18 @@ static h2_session *h2_session_create_int(conn_rec *c,
             h2_session_destroy(session);
             return NULL;
         }
-            
+         
+        n = h2_config_geti(session->config, H2_CONF_PUSH_DIARY_SIZE);
+        session->push_diary = h2_push_diary_create(session->pool, n);
+        
         if (APLOGcdebug(c)) {
             ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c,
                           "session(%ld) created, timeout=%d, keepalive_timeout=%d, "
-                          "max_streams=%d, stream_mem=%d",
+                          "max_streams=%d, stream_mem=%d, push_diary(type=%d,N=%d)",
                           session->id, session->timeout_secs, session->keepalive_secs,
-                          (int)session->max_stream_count, (int)session->max_stream_mem);
+                          (int)session->max_stream_count, (int)session->max_stream_mem,
+                          session->push_diary->dtype,
+                          (int)session->push_diary->N);
         }
     }
     return session;
@@ -889,54 +925,6 @@ void h2_session_eoc_callback(h2_session *session)
     h2_session_destroy(session);
 }
 
-static apr_status_t h2_session_shutdown(h2_session *session, int reason)
-{
-    AP_DEBUG_ASSERT(session);
-    session->aborted = 1;
-    if (session->state != H2_SESSION_ST_CLOSING
-        && session->state != H2_SESSION_ST_ABORTED) {
-        if (session->client_goaway) {
-            /* client sent us a GOAWAY, just terminate */
-            nghttp2_session_terminate_session(session->ngh2, NGHTTP2_ERR_EOF);
-            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
-                          "session(%ld): shutdown, GOAWAY from client", session->id);
-        }
-        else if (!reason) {
-            nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, 
-                                  session->max_stream_received, 
-                                  reason, NULL, 0);
-            nghttp2_session_send(session->ngh2);
-            session->server_goaway = 1;
-            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
-                          "session(%ld): shutdown, no err", session->id);
-        }
-        else {
-            const char *err = nghttp2_strerror(reason);
-            nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, 
-                                  session->max_stream_received, 
-                                  reason, (const uint8_t *)err, 
-                                  strlen(err));
-            nghttp2_session_send(session->ngh2);
-            session->server_goaway = 1;
-            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
-                          "session(%ld): shutdown, err=%d '%s'",
-                          session->id, reason, err);
-        }
-        session->state = H2_SESSION_ST_CLOSING;
-        h2_mplx_abort(session->mplx);
-    }
-    return APR_SUCCESS;
-}
-
-void h2_session_abort(h2_session *session, apr_status_t status)
-{
-    AP_DEBUG_ASSERT(session);
-    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
-                  "h2_session(%ld): aborting", session->id);
-    session->state = H2_SESSION_ST_ABORTED;
-    session->aborted = 1;
-}
-
 static apr_status_t h2_session_start(h2_session *session, int *rv)
 {
     apr_status_t status = APR_SUCCESS;
@@ -1012,6 +1000,10 @@ static apr_status_t h2_session_start(h2_session *session, int *rv)
         ++slen;
     }
     
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
+                  "h2_session(%ld): start, INITIAL_WINDOW_SIZE=%ld, "
+                  "MAX_CONCURRENT_STREAMS=%d", 
+                  session->id, (long)win_size, (int)session->max_stream_count);
     *rv = nghttp2_submit_settings(session->ngh2, NGHTTP2_FLAG_NONE,
                                   settings, slen);
     if (*rv != 0) {
@@ -1075,7 +1067,7 @@ static int h2_session_resume_streams_with_data(h2_session *session)
 {
     AP_DEBUG_ASSERT(session);
     if (!h2_stream_set_is_empty(session->streams)
-        && session->mplx && !session->aborted) {
+        && session->mplx && !session->mplx->aborted) {
         resume_ctx ctx;
         
         ctx.session      = session;
@@ -1097,29 +1089,6 @@ h2_stream *h2_session_get_stream(h2_session *session, int stream_id)
     return session->last_stream;
 }
 
-void h2_session_close(h2_session *session)
-{
-    apr_bucket *b;
-    conn_rec *c = session->c;
-    apr_status_t status;
-    
-    AP_DEBUG_ASSERT(session);
-    if (!session->aborted) {
-        h2_session_shutdown(session, 0);
-    }
-    h2_session_cleanup(session);
-
-    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c,
-                  "h2_session(%ld): writing eoc", c->id);
-    b = h2_bucket_eoc_create(c->bucket_alloc, session);
-    status = h2_conn_io_write_eoc(&session->io, b);
-    if (status != APR_SUCCESS) {
-        ap_log_cerror(APLOG_MARK, APLOG_ERR, status, c,
-                      "h2_session(%ld): flushed eoc bucket", c->id);
-    } 
-    /* and all is or will be destroyed */
-}
-
 static ssize_t stream_data_cb(nghttp2_session *ng2s,
                               int32_t stream_id,
                               uint8_t *buf,
@@ -1231,17 +1200,17 @@ typedef struct {
 static apr_status_t submit_response(h2_session *session, h2_stream *stream)
 {
     apr_status_t status = APR_SUCCESS;
+    h2_response *response = h2_stream_get_response(stream);
     int rv = 0;
     AP_DEBUG_ASSERT(session);
     AP_DEBUG_ASSERT(stream);
-    AP_DEBUG_ASSERT(stream->response || stream->rst_error);
+    AP_DEBUG_ASSERT(response || stream->rst_error);
     
     if (stream->submitted) {
         rv = NGHTTP2_PROTOCOL_ERROR;
     }
-    else if (stream->response && stream->response->headers) {
+    else if (response && response->headers) {
         nghttp2_data_provider provider;
-        h2_response *response = stream->response;
         h2_ngheader *ngh;
         const h2_priority *prio;
         
@@ -1285,7 +1254,6 @@ static apr_status_t submit_response(h2_session *session, h2_stream *stream)
                                         response->headers);
         rv = nghttp2_submit_response(session->ngh2, response->stream_id,
                                      ngh->nv, ngh->nvlen, &provider);
-        ++session->responses_sent;
     }
     else {
         int err = H2_STREAM_RST(stream, H2_ERR_PROTOCOL_ERROR);
@@ -1296,14 +1264,19 @@ static apr_status_t submit_response(h2_session *session, h2_stream *stream)
 
         rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
                                        stream->id, err);
-        ++session->responses_sent;
     }
     
     stream->submitted = 1;
+    if (stream->initiated_on) {
+        ++session->pushes_submitted;
+    }
+    else {
+        ++session->responses_submitted;
+    }
 
     if (nghttp2_is_fatal(rv)) {
         status = APR_EGENERAL;
-        h2_session_shutdown(session, rv);
+        dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, rv, nghttp2_strerror(rv));
         ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c,
                       APLOGNO(02940) "submit_response: %s", 
                       nghttp2_strerror(rv));
@@ -1329,7 +1302,7 @@ struct h2_stream *h2_session_push(h2_session *session, h2_stream *is,
                       session->id, is->id, nghttp2_strerror(nid));
         return NULL;
     }
-    ++session->streams_pushed;
+    ++session->pushes_promised;
     
     ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
                   "h2_stream(%ld-%d): SERVER_PUSH %d for %s %s on %d",
@@ -1585,9 +1558,7 @@ static apr_status_t h2_session_send(h2_session *session)
     int rv = nghttp2_session_send(session->ngh2);
     if (rv != 0) {
         if (nghttp2_is_fatal(rv)) {
-            ap_log_cerror( APLOG_MARK, APLOG_DEBUG, 0, session->c,
-                          "h2_session: send gave error=%s", nghttp2_strerror(rv));
-            h2_session_shutdown(session, rv);
+            dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, rv, nghttp2_strerror(rv));
             return APR_EGENERAL;
         }
     }
@@ -1602,21 +1573,22 @@ static apr_status_t h2_session_receive(void *ctx, const char *data,
                                        apr_size_t len, apr_size_t *readlen)
 {
     h2_session *session = ctx;
+    ssize_t n;
+    
     if (len > 0) {
-        ssize_t n = nghttp2_session_mem_recv(session->ngh2,
-                                             (const uint8_t *)data, len);
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+                      "h2_session(%ld): feeding %ld bytes to nghttp2",
+                      session->id, (long)len);
+        n = nghttp2_session_mem_recv(session->ngh2, (const uint8_t *)data, len);
         if (n < 0) {
-            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, APR_EGENERAL,
-                          session->c,
-                          "h2_session: nghttp2_session_mem_recv error=%d",
-                          (int)n);
             if (nghttp2_is_fatal((int)n)) {
-                h2_session_shutdown(session, (int)n);
+                dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, (int)n, nghttp2_strerror(n));
                 return APR_EGENERAL;
             }
         }
         else {
             *readlen = n;
+            session->io.bytes_read += n;
         }
     }
     return APR_SUCCESS;
@@ -1677,6 +1649,9 @@ static apr_status_t h2_session_read(h2_session *session, int block, int loops)
                  * status. */
                 return rstatus;
         }
+        if (!is_accepting_streams(session)) {
+            break;
+        }
     }
     return rstatus;
 }
@@ -1711,14 +1686,278 @@ static apr_status_t h2_session_submit(h2_session *session)
     return status;
 }
 
+static const char *StateNames[] = {
+    "INIT",      /* H2_SESSION_ST_INIT */
+    "DONE",      /* H2_SESSION_ST_DONE */
+    "IDLE",      /* H2_SESSION_ST_IDLE */
+    "BUSY",      /* H2_SESSION_ST_BUSY */
+    "WAIT",      /* H2_SESSION_ST_WAIT */
+    "LSHUTDOWN", /* H2_SESSION_ST_LOCAL_SHUTDOWN */
+    "RSHUTDOWN", /* H2_SESSION_ST_REMOTE_SHUTDOWN */
+};
+
+static const char *state_name(h2_session_state state)
+{
+    if (state >= (sizeof(StateNames)/sizeof(StateNames[0]))) {
+        return "unknown";
+    }
+    return StateNames[state];
+}
+
+static int is_accepting_streams(h2_session *session)
+{
+    switch (session->state) {
+        case H2_SESSION_ST_IDLE:
+        case H2_SESSION_ST_BUSY:
+        case H2_SESSION_ST_WAIT:
+            return 1;
+        default:
+            return 0;
+    }
+}
+
+static void transit(h2_session *session, const char *action, h2_session_state nstate)
+{
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+                  "h2_session(%ld): transit [%s] -- %s --> [%s]", session->id,
+                  state_name(session->state), action, state_name(nstate));
+    session->state = nstate;
+}
+
+static void h2_session_ev_init(h2_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_SESSION_ST_INIT:
+            transit(session, "init", H2_SESSION_ST_BUSY);
+            break;
+
+        default:
+            /* nop */
+            break;
+    }
+}
+
+static void h2_session_ev_local_goaway(h2_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_SESSION_ST_LOCAL_SHUTDOWN:
+            /* already did that? */
+            break;
+        case H2_SESSION_ST_IDLE:
+        case H2_SESSION_ST_REMOTE_SHUTDOWN:
+            /* all done */
+            transit(session, "local goaway", H2_SESSION_ST_DONE);
+            break;
+        default:
+            transit(session, "local goaway", H2_SESSION_ST_LOCAL_SHUTDOWN);
+            break;
+    }
+}
+
+static void h2_session_ev_remote_goaway(h2_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_SESSION_ST_REMOTE_SHUTDOWN:
+            /* already received that? */
+            break;
+        case H2_SESSION_ST_IDLE:
+        case H2_SESSION_ST_LOCAL_SHUTDOWN:
+            /* all done */
+            transit(session, "remote goaway", H2_SESSION_ST_DONE);
+            break;
+        default:
+            transit(session, "remote goaway", H2_SESSION_ST_REMOTE_SHUTDOWN);
+            break;
+    }
+}
+
+static void h2_session_ev_conn_error(h2_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_SESSION_ST_INIT:
+        case H2_SESSION_ST_DONE:
+        case H2_SESSION_ST_LOCAL_SHUTDOWN:
+            /* just leave */
+            transit(session, "conn error", H2_SESSION_ST_DONE);
+            break;
+        
+        default:
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+                          "h2_session(%ld): conn error -> shutdown", session->id);
+            h2_session_shutdown(session, arg, msg);
+            break;
+    }
+}
+
+static void h2_session_ev_proto_error(h2_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_SESSION_ST_DONE:
+        case H2_SESSION_ST_LOCAL_SHUTDOWN:
+            /* just leave */
+            transit(session, "proto error", H2_SESSION_ST_DONE);
+            break;
+        
+        default:
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+                          "h2_session(%ld): proto error -> shutdown", session->id);
+            h2_session_shutdown(session, arg, msg);
+            break;
+    }
+}
+
+static void h2_session_ev_conn_timeout(h2_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_SESSION_ST_LOCAL_SHUTDOWN:
+            transit(session, "conn timeout", H2_SESSION_ST_DONE);
+            break;
+        default:
+            h2_session_shutdown(session, arg, msg);
+            transit(session, "conn timeout", H2_SESSION_ST_DONE);
+            break;
+    }
+}
+
+static void h2_session_ev_no_io(h2_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_SESSION_ST_BUSY:
+            /* nothing for input and output to do. If we remain
+             * in this state, we go into a tight loop and suck up
+             * CPU cycles. Ideally, we'd like to do a blocking read, but that
+             * is not possible if we have scheduled tasks and wait
+             * for them to produce something. */
+            if (h2_stream_set_is_empty(session->streams)) {
+                /* When we have no streams, no task event are possible,
+                 * switch to blocking reads */
+                transit(session, "no io", H2_SESSION_ST_IDLE);
+            }
+            else if (!h2_stream_set_has_unsubmitted(session->streams)
+                     && !h2_stream_set_has_suspended(session->streams)) {
+                /* none of our streams is waiting for a response or
+                 * new output data from task processing, 
+                 * switch to blocking reads. */
+                transit(session, "no io", H2_SESSION_ST_IDLE);
+            }
+            else {
+                /* Unable to do blocking reads, as we wait on events from
+                 * task processing in other threads. Do a busy wait with
+                 * backoff timer. */
+                transit(session, "no io", H2_SESSION_ST_WAIT);
+            }
+            break;
+        default:
+            /* nop */
+            break;
+    }
+}
+
+static void h2_session_ev_wait_timeout(h2_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_SESSION_ST_WAIT:
+            transit(session, "wait timeout", H2_SESSION_ST_BUSY);
+            break;
+        default:
+            /* nop */
+            break;
+    }
+}
+
+static void h2_session_ev_stream_ready(h2_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_SESSION_ST_WAIT:
+            transit(session, "stream ready", H2_SESSION_ST_BUSY);
+            break;
+        default:
+            /* nop */
+            break;
+    }
+}
+
+static void h2_session_ev_data_read(h2_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_SESSION_ST_IDLE:
+            transit(session, "data read", H2_SESSION_ST_BUSY);
+            break;
+            /* fall through */
+        default:
+            /* nop */
+            break;
+    }
+}
+
+static void h2_session_ev_ngh2_done(h2_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_SESSION_ST_DONE:
+            /* nop */
+            break;
+        default:
+            transit(session, "nghttp2 done", H2_SESSION_ST_DONE);
+            break;
+    }
+}
+
+static void dispatch_event(h2_session *session, h2_session_event_t ev, 
+                      int arg, const char *msg)
+{
+    switch (ev) {
+        case H2_SESSION_EV_INIT:
+            h2_session_ev_init(session, arg, msg);
+            break;            
+        case H2_SESSION_EV_LOCAL_GOAWAY:
+            h2_session_ev_local_goaway(session, arg, msg);
+            break;
+        case H2_SESSION_EV_REMOTE_GOAWAY:
+            h2_session_ev_remote_goaway(session, arg, msg);
+            break;
+        case H2_SESSION_EV_CONN_ERROR:
+            h2_session_ev_conn_error(session, arg, msg);
+            break;
+        case H2_SESSION_EV_PROTO_ERROR:
+            h2_session_ev_proto_error(session, arg, msg);
+            break;
+        case H2_SESSION_EV_CONN_TIMEOUT:
+            h2_session_ev_conn_timeout(session, arg, msg);
+            break;
+        case H2_SESSION_EV_NO_IO:
+            h2_session_ev_no_io(session, arg, msg);
+            break;
+        case H2_SESSION_EV_WAIT_TIMEOUT:
+            h2_session_ev_wait_timeout(session, arg, msg);
+            break;
+        case H2_SESSION_EV_STREAM_READY:
+            h2_session_ev_stream_ready(session, arg, msg);
+            break;
+        case H2_SESSION_EV_DATA_READ:
+            h2_session_ev_data_read(session, arg, msg);
+            break;
+        case H2_SESSION_EV_NGH2_DONE:
+            h2_session_ev_ngh2_done(session, arg, msg);
+            break;
+        default:
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+                          "h2_session(%ld): unknown event %d", 
+                          session->id, ev);
+            break;
+    }
+    
+    if (session->state == H2_SESSION_ST_DONE) {
+        h2_mplx_abort(session->mplx);
+    }
+}
+
 static const int MAX_WAIT_MICROS = 200 * 1000;
 
 apr_status_t h2_session_process(h2_session *session, int async)
 {
     apr_status_t status = APR_SUCCESS;
     conn_rec *c = session->c;
-    int rv, have_written, have_read, remain_secs;
-    const char *reason = "";
+    int rv, have_written, have_read;
 
     ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
                   "h2_session(%ld): process start, async=%d", session->id, async);
@@ -1726,70 +1965,65 @@ apr_status_t h2_session_process(h2_session *session, int async)
     while (1) {
         have_read = have_written = 0;
 
-        if (session->aborted) {
-            reason = "aborted";
-            status = APR_ECONNABORTED;
-            goto out;
-        }
-        
         switch (session->state) {
             case H2_SESSION_ST_INIT:
                 if (!h2_is_acceptable_connection(c, 1)) {
-                    nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, 0,
-                                          NGHTTP2_INADEQUATE_SECURITY, NULL, 0);
-                    nghttp2_session_send(session->ngh2);
-                    session->server_goaway = 1;
+                    h2_session_shutdown(session, NGHTTP2_INADEQUATE_SECURITY, NULL);
                 } 
-                
-                status = h2_session_start(session, &rv);
-                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c,
-                              "h2_session(%ld): started on %s:%d", session->id,
-                              session->s->server_hostname,
-                              c->local_addr->port);
-                if (status != APR_SUCCESS) {
-                    reason = "start failed";
-                    goto out;
+                else {
+                    ap_update_child_status(c->sbh, SERVER_BUSY_READ, NULL);
+                    status = h2_session_start(session, &rv);
+                    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c,
+                                  "h2_session(%ld): started on %s:%d", session->id,
+                                  session->s->server_hostname,
+                                  c->local_addr->port);
+                    if (status != APR_SUCCESS) {
+                        dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+                    }
+                    dispatch_event(session, H2_SESSION_EV_INIT, 0, NULL);
                 }
-                    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
-                                  "h2_session(%ld): INIT -> BUSY", session->id);
-                session->state = H2_SESSION_ST_BUSY;
                 break;
                 
-            case H2_SESSION_ST_IDLE_READ:
-                h2_filter_cin_timeout_set(session->cin, session->timeout_secs);
-                ap_update_child_status(c->sbh, SERVER_BUSY_READ, NULL);
+            case H2_SESSION_ST_IDLE:
+                h2_filter_cin_timeout_set(session->cin, session->keepalive_secs);
+                ap_update_child_status(c->sbh, SERVER_BUSY_KEEPALIVE, NULL);
                 status = h2_session_read(session, 1, 10);
-                if (APR_STATUS_IS_TIMEUP(status)) {
-                    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
-                                  "h2_session(%ld): IDLE -> KEEPALIVE", session->id);
-                    session->state = H2_SESSION_ST_KEEPALIVE;
+                if (status == APR_SUCCESS) {
+                    have_read = 1;
+                    dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
                 }
-                else if (status == APR_SUCCESS) {
-                    /* got something, go busy again */
-                    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
-                                  "h2_session(%ld): IDLE -> BUSY", session->id);
-                    session->state = H2_SESSION_ST_BUSY;
+                else if (status == APR_EAGAIN) {
+                    /* nothing to read */
+                }
+                else if (APR_STATUS_IS_TIMEUP(status)) {
+                    dispatch_event(session, H2_SESSION_EV_CONN_TIMEOUT, 0, NULL);
+                    break;
                 }
                 else {
-                    reason = "keepalive error";
-                    goto out;
+                    dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
                 }
                 break;
                 
             case H2_SESSION_ST_BUSY:
+            case H2_SESSION_ST_LOCAL_SHUTDOWN:
+            case H2_SESSION_ST_REMOTE_SHUTDOWN:
                 if (nghttp2_session_want_read(session->ngh2)) {
+                    ap_update_child_status(c->sbh, SERVER_BUSY_READ, NULL);
                     h2_filter_cin_timeout_set(session->cin, session->timeout_secs);
                     status = h2_session_read(session, 0, 10);
                     if (status == APR_SUCCESS) {
-                        /* got something, continue processing */
                         have_read = 1;
+                        dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
                     }
                     else if (status == APR_EAGAIN) {
                         /* nothing to read */
                     }
+                    else if (APR_STATUS_IS_TIMEUP(status)) {
+                        dispatch_event(session, H2_SESSION_EV_CONN_TIMEOUT, 0, NULL);
+                        break;
+                    }
                     else {
-                        reason = "busy read error";
-                        goto out;
+                        dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
                     }
                 }
                 
@@ -1802,64 +2036,40 @@ apr_status_t h2_session_process(h2_session *session, int async)
                         have_written = 1;
                     }
                     else if (status != APR_EAGAIN) {
-                        reason = "submit error";
-                        goto out;
+                        dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 
+                                         H2_ERR_INTERNAL_ERROR, "submit error");
+                        break;
                     }
                     /* send out window updates for our inputs */
                     status = h2_mplx_in_update_windows(session->mplx);
                     if (status != APR_SUCCESS && status != APR_EAGAIN) {
-                        reason = "window update error";
-                        goto out;
+                        dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 
+                                         H2_ERR_INTERNAL_ERROR, "window update error");
+                        break;
                     }
                 }
                 
                 if (nghttp2_session_want_write(session->ngh2)) {
                     status = h2_session_send(session);
-                    if (status != APR_SUCCESS) {
-                        reason = "send error";
-                        goto out;
+                    if (status == APR_SUCCESS) {
+                        have_written = 1;
+                    }
+                    else {
+                        dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 
+                                         H2_ERR_INTERNAL_ERROR, "writing");
+                        break;
                     }
-                    have_written = 1;
                 }
                 
                 if (have_read || have_written) {
                     session->wait_us = 0;
                 }
                 else {
-                    /* nothing for input and output to do. If we remain
-                     * in this state, we go into a tight loop and suck up
-                     * CPU cycles. 
-                     * Ideally, we'd like to do a blocking read, but that
-                     * is not possible if we have scheduled tasks and wait
-                     * for them to produce something. */
-                    if (h2_stream_set_is_empty(session->streams)) {
-                        /* When we have no streams, no task event are possible,
-                         * switch to blocking reads */
-                    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
-                                  "h2_session(%ld): BUSY -> IDLE", session->id);
-                        session->state = H2_SESSION_ST_IDLE_READ;
-                    }
-                    else if (!h2_stream_set_has_unsubmitted(session->streams)
-                             && !h2_stream_set_has_suspended(session->streams)) {
-                        /* none of our streams is waiting for a response or
-                         * new output data from task processing, 
-                         * switch to blocking reads. */
-                    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
-                                  "h2_session(%ld): BUSY -> IDLE", session->id);
-                        session->state = H2_SESSION_ST_IDLE_READ;
-                    }
-                    else {
-                        /* Unable to do blocking reads, as we wait on events from
-                         * task processing in other threads. Do a busy wait with
-                         * backoff timer. */
-                        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
-                                      "h2_session(%ld): BUSY -> WAIT", session->id);
-                        session->state = H2_SESSION_ST_BUSY_WAIT;
-                    }
+                    dispatch_event(session, H2_SESSION_EV_NO_IO, 0, NULL);
                 }
                 break;
                 
-            case H2_SESSION_ST_BUSY_WAIT:
+            case H2_SESSION_ST_WAIT:
                 session->wait_us = H2MAX(session->wait_us, 10);
                 if (APLOGctrace1(c)) {
                     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
@@ -1867,131 +2077,67 @@ apr_status_t h2_session_process(h2_session *session, int async)
                                   (long)session->wait_us);
                 }
                 
-                h2_conn_io_flush(&session->io);
                 ap_log_cerror( APLOG_MARK, APLOG_TRACE2, status, c,
                               "h2_session(%ld): process -> trywait", session->id);
                 status = h2_mplx_out_trywait(session->mplx, session->wait_us, 
                                              session->iowait);
                 if (status == APR_SUCCESS) {
-                    /* got something, go busy again */
-                    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
-                                  "h2_session(%ld): WAIT -> BUSY", session->id);
-                    session->state = H2_SESSION_ST_BUSY;
+                    dispatch_event(session, H2_SESSION_EV_STREAM_READY, 0, NULL);
                 }
                 else if (status == APR_TIMEUP) {
-                    if (nghttp2_session_want_read(session->ngh2)) {
-                        status = h2_session_read(session, 0, 1);
-                        if (status == APR_SUCCESS) {
-                            /* got something, go busy again */
-                            session->wait_us = 0;
-                            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
-                                          "h2_session(%ld): WAIT -> BUSY", session->id);
-                            session->state = H2_SESSION_ST_BUSY;
-                        }
-                        else if (status != APR_EAGAIN) {
-                            reason = "busy read error";
-                            goto out;
-                        }
-                    }
                     /* nothing, increase timer for graceful backup */
                     session->wait_us = H2MIN(session->wait_us*2, MAX_WAIT_MICROS);
-                    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
-                                  "h2_session(%ld): WAIT -> BUSY", session->id);
-                    session->state = H2_SESSION_ST_BUSY;
+                    dispatch_event(session, H2_SESSION_EV_WAIT_TIMEOUT, 0, NULL);
                 }
                 else {
-                    reason = "busy wait error";
-                    goto out;
+                    h2_session_shutdown(session, H2_ERR_INTERNAL_ERROR, "cond wait error");
                 }
                 break;
                 
-            case H2_SESSION_ST_KEEPALIVE:
-                /* Our normal H2Timeout has passed and we are considering to
-                 * extend that with the H2KeepAliveTimeout. */
-                remain_secs = session->keepalive_secs - session->timeout_secs;
-                if (remain_secs <= 0) {
-                    /* keepalive is <= normal timeout, close the session */
-                    reason = "keepalive expired";
-                    h2_session_shutdown(session, 0);
-                    goto out;
-                }
-                session->c->keepalive = AP_CONN_KEEPALIVE;
-                ap_update_child_status_from_conn(c->sbh, SERVER_BUSY_KEEPALIVE, c);
-                
-                if ((apr_time_sec(session->s->keep_alive_timeout) >= remain_secs)
-                    && async && session->c->cs
-                    && !session->r) {
-                    /* Async MPMs are able to handle keep-alive connections without
-                     * blocking a thread. For this to happen, we need to return from
-                     * processing, indicating the IO event we are waiting for, and
-                     * may be called again if the event happens.
-                     * TODO: this does not properly GOAWAY connections...
-                     * TODO: This currently does not work on upgraded requests...
-                     */
-                    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
-                                  "h2_session(%ld): async KEEPALIVE -> IDLE_READ", session->id);
-                    session->state = H2_SESSION_ST_IDLE_READ;
-                    session->c->cs->state = CONN_STATE_WRITE_COMPLETION;
-                    reason = "async keepalive";
-                    status = APR_SUCCESS;
-                    goto out;
-                }
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
-                              "h2_session(%ld): KEEPALIVE read", session->id);
-                h2_filter_cin_timeout_set(session->cin, remain_secs);
-                status = h2_session_read(session, 1, 1);
-                if (APR_STATUS_IS_TIMEUP(status)) {
-                    reason = "keepalive expired";
-                    h2_session_shutdown(session, 0);
-                    goto out;
-                }
-                else if (status != APR_SUCCESS) {
-                    reason = "keepalive error";
-                    goto out;
-                }
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
-                              "h2_session(%ld): KEEPALIVE -> BUSY", session->id);
-                session->state = H2_SESSION_ST_BUSY;
-                break;
-                
-            case H2_SESSION_ST_CLOSING:
-                if (nghttp2_session_want_write(session->ngh2)) {
-                    status = h2_session_send(session);
-                    if (status != APR_SUCCESS) {
-                        reason = "send error";
-                        goto out;
-                    }
-                    have_written = 1;
-                }
-                reason = "closing";
+            case H2_SESSION_ST_DONE:
+                status = APR_EOF;
                 goto out;
                 
-            case H2_SESSION_ST_ABORTED:
-                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c,
-                              "h2_session(%ld): processing ABORTED", session->id);
-                return APR_ECONNABORTED;
-                
             default:
                 ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EGENERAL, c,
-                              "h2_session(%ld): state %d", session->id, session->state);
-                return APR_EGENERAL;
+                              "h2_session(%ld): unknown state %d", session->id, session->state);
+                dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, 0, NULL);
+                break;
         }
 
-        if (!nghttp2_session_want_read(session->ngh2)
-            && !nghttp2_session_want_write(session->ngh2)) {
-            session->state = H2_SESSION_ST_CLOSING;
-        }        
-
         if (have_written) {
             h2_conn_io_flush(&session->io);
         }
+        else if (!nghttp2_session_want_read(session->ngh2) 
+                 && !nghttp2_session_want_write(session->ngh2)) {
+            dispatch_event(session, H2_SESSION_EV_NGH2_DONE, 0, NULL); 
+        }
     }
+    
 out:
     if (have_written) {
         h2_conn_io_flush(&session->io);
     }
+    
     ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
-                  "h2_session(%ld): process return, state %d, reason '%s'", 
-                  session->id, session->state, reason);
+                  "h2_session(%ld): [%s] process returns", 
+                  session->id, state_name(session->state));
+
+    if ((session->state != H2_SESSION_ST_DONE)
+        && (APR_STATUS_IS_EOF(status)
+            || APR_STATUS_IS_ECONNRESET(status) 
+            || APR_STATUS_IS_ECONNABORTED(status))) {
+            dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+        }
+
+    status = (session->state == H2_SESSION_ST_DONE)? APR_EOF : APR_SUCCESS;
+    if (session->state == H2_SESSION_ST_DONE) {
+        if (!session->eoc_written) {
+            session->eoc_written = 1;
+            h2_conn_io_write_eoc(&session->io, 
+                                 h2_bucket_eoc_create(session->c->bucket_alloc, session));
+        }
+    }
+    
     return status;
 }