]> granicus.if.org Git - apache/commitdiff
mod_proxy_http2: stability improvements
authorStefan Eissing <icing@apache.org>
Tue, 8 Mar 2016 14:22:34 +0000 (14:22 +0000)
committerStefan Eissing <icing@apache.org>
Tue, 8 Mar 2016 14:22:34 +0000 (14:22 +0000)
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1734097 13f79535-47bb-0310-9956-ffa450edef68

13 files changed:
modules/http2/h2_mplx.c
modules/http2/h2_mplx.h
modules/http2/h2_ngn_shed.c
modules/http2/h2_ngn_shed.h
modules/http2/h2_proxy_session.c
modules/http2/h2_proxy_session.h
modules/http2/h2_request.c
modules/http2/h2_task.c
modules/http2/h2_task.h
modules/http2/h2_task_output.c
modules/http2/mod_http2.c
modules/http2/mod_http2.h
modules/http2/mod_proxy_http2.c

index 43a4630e9f7a76ee3446705b2576e27659ed807b..117becac810716147119d7941056c2bebc0b9177 100644 (file)
@@ -402,6 +402,12 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
                 apr_thread_cond_broadcast(m->req_added);
             }
         }
+        
+        if (!h2_io_set_is_empty(m->stream_ios)) {
+            ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, m->c, 
+                          "h2_mplx(%ld): release_join, %d streams still open", 
+                          m->id, (int)h2_io_set_size(m->stream_ios));
+        }
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
                       "h2_mplx(%ld): release_join -> destroy", m->id);
         leave_mutex(m, acquired);
@@ -844,7 +850,7 @@ apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id, apr_table_t *trailers)
                 h2_response *r = h2_response_die(stream_id, APR_EGENERAL, 
                                                  io->request, m->pool);
                 status = out_open(m, stream_id, r, NULL, NULL, NULL);
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
+                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c,
                               "h2_mplx(%ld-%d): close, no response, no rst", 
                               m->id, io->id);
             }
@@ -1135,12 +1141,27 @@ static void task_done(h2_mplx *m, h2_task *task)
              * long as it has requests to handle. Might no be fair to
              * other mplx's. Perhaps leave after n requests? */
             h2_mplx_out_close(m, task->stream_id, NULL);
+            
+            if (task->engine) {
+                /* should already have been done by the task, but as
+                 * a last resort, we get rid of it here. */
+                if (!h2_req_engine_is_shutdown(task->engine)) {
+                    ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+                                  "h2_mplx(%ld): task(%s) has not-shutdown "
+                                  "engine(%s)", m->id, task->id, 
+                                  h2_req_engine_get_id(task->engine));
+                }
+                h2_ngn_shed_done_ngn(m->ngn_shed, task->engine);
+            }
+            
             if (m->spare_allocator) {
                 apr_allocator_destroy(m->spare_allocator);
                 m->spare_allocator = NULL;
             }
+            
             h2_slave_destroy(task->c, &m->spare_allocator);
             task = NULL;
+            
             if (io) {
                 apr_time_t now = apr_time_now();
                 if (!io->orphaned && m->redo_ios
@@ -1389,23 +1410,23 @@ apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn,
     *pr = NULL;
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         int want_shutdown = (block == APR_BLOCK_READ);
-        if (0 && want_shutdown) {
+        if (want_shutdown && !h2_iq_empty(m->q)) {
             /* For a blocking read, check first if requests are to be
              * had and, if not, wait a short while before doing the
              * blocking, and if unsuccessful, terminating read.
              */
-            status = h2_ngn_shed_pull_req(shed, ngn, capacity, 0, pr);
-            if (status != APR_EAGAIN) {
-                return status;
+            status = h2_ngn_shed_pull_req(shed, ngn, capacity, 1, pr);
+            if (APR_STATUS_IS_EAGAIN(status)) {
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                              "h2_mplx(%ld): start block engine pull", m->id);
+                apr_thread_cond_timedwait(m->req_added, m->lock, 
+                                          apr_time_from_msec(20));
+                status = h2_ngn_shed_pull_req(shed, ngn, capacity, 1, pr);
             }
-            ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, m->c,
-                          "h2_mplx(%ld): start block engine pull", m->id);
-            apr_thread_cond_timedwait(m->req_added, m->lock, 
-                                      apr_time_from_msec(100));
-            ap_log_cerror(APLOG_MARK, APLOG_INFO, status, m->c,
-                          "h2_mplx(%ld): done block engine pull", m->id);
         }
-        status = h2_ngn_shed_pull_req(shed, ngn, capacity, want_shutdown, pr);
+        else {
+            status = h2_ngn_shed_pull_req(shed, ngn, capacity, want_shutdown, pr);
+        }
         leave_mutex(m, acquired);
     }
     return status;
@@ -1413,29 +1434,23 @@ apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn,
  
 void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn)
 {
-    h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn);
-    h2_mplx *m = h2_ngn_shed_get_ctx(shed);
-    int acquired;
+    h2_task *task = h2_ctx_cget_task(r_conn);
+    
+    if (task) {
+        h2_mplx *m = task->mplx;
+        int acquired;
 
-    if (enter_mutex(m, &acquired) == APR_SUCCESS) {
-        if (h2_ngn_shed_done_req(shed, ngn, r_conn) == APR_SUCCESS) {
-            h2_task *task = h2_ctx_cget_task(r_conn);
-            if (task) {
+        if (enter_mutex(m, &acquired) == APR_SUCCESS) {
+            h2_ngn_shed_done_task(m->ngn_shed, ngn, task);
+            if (task->engine) { 
+                /* cannot report that as done until engine returns */
+            }
+            else {
+                h2_task_output_close(task->output);
                 task_done(m, task);
             }
+            leave_mutex(m, acquired);
         }
-        leave_mutex(m, acquired);
     }
 }
                                 
-void h2_mplx_req_engine_exit(h2_req_engine *ngn)
-{
-    h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn);
-    h2_mplx *m = h2_ngn_shed_get_ctx(shed);
-    int acquired;
-    
-    if (enter_mutex(m, &acquired) == APR_SUCCESS) {
-        h2_ngn_shed_done_ngn(shed, ngn);
-        leave_mutex(m, acquired);
-    }
-}
index 368d92fc967f84ee658994ff3733ddcc18229bff..497cf99213d48a743f74c88c25ea1ed5ca3b0ea8 100644 (file)
@@ -428,6 +428,5 @@ apr_status_t h2_mplx_req_engine_pull(struct h2_req_engine *ngn,
                                      apr_uint32_t capacity, 
                                      request_rec **pr);
 void h2_mplx_req_engine_done(struct h2_req_engine *ngn, conn_rec *r_conn);
-void h2_mplx_req_engine_exit(struct h2_req_engine *ngn);
 
 #endif /* defined(__mod_h2__h2_mplx__) */
index e8e6755e942cb2288dc53b9f6ef159c6e0c88c96..b23a5e95f337457ff17cd421818a5e912511b877 100644 (file)
@@ -45,6 +45,7 @@
 typedef struct h2_ngn_entry h2_ngn_entry;
 struct h2_ngn_entry {
     APR_RING_ENTRY(h2_ngn_entry) link;
+    h2_task *task;
     request_rec *r;
 };
 
@@ -72,6 +73,7 @@ struct h2_req_engine {
     const char *type;      /* name of the engine type */
     apr_pool_t *pool;      /* pool for engine specific allocations */
     conn_rec *c;           /* connection this engine is assigned to */
+    h2_task *task;         /* the task this engine is base on, running in */
     h2_ngn_shed *shed;
 
     unsigned int shutdown : 1; /* engine is being shut down */
@@ -81,10 +83,18 @@ struct h2_req_engine {
     apr_uint32_t no_assigned;  /* # of assigned requests */
     apr_uint32_t no_live;      /* # of live */
     apr_uint32_t no_finished;  /* # of finished */
-
-    apr_thread_cond_t *io;     /* condition var for waiting on data */
 };
 
+const char *h2_req_engine_get_id(h2_req_engine *engine)
+{
+    return engine->id;
+}
+
+int h2_req_engine_is_shutdown(h2_req_engine *engine)
+{
+    return engine->shutdown;
+}
+
 h2_ngn_shed *h2_ngn_shed_create(apr_pool_t *pool, conn_rec *c,
                                 apr_uint32_t req_buffer_size)
 {
@@ -119,14 +129,13 @@ void h2_ngn_shed_abort(h2_ngn_shed *shed)
     shed->aborted = 1;
 }
 
-static apr_status_t ngn_schedule(h2_req_engine *ngn, request_rec *r)
+static void ngn_add_req(h2_req_engine *ngn, h2_task *task, request_rec *r)
 {
-    h2_ngn_entry *entry = apr_pcalloc(r->pool, sizeof(*entry));
-
+    h2_ngn_entry *entry = apr_pcalloc(task->c->pool, sizeof(*entry));
     APR_RING_ELEM_INIT(entry, link);
+    entry->task = task;
     entry->r = r;
     H2_REQ_ENTRIES_INSERT_TAIL(&ngn->entries, entry);
-    return APR_SUCCESS;
 }
 
 
@@ -134,7 +143,6 @@ apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type,
                                   h2_task *task, request_rec *r, 
                                   h2_req_engine_init *einit){
     h2_req_engine *ngn;
-    apr_status_t status = APR_EOF;
 
     AP_DEBUG_ASSERT(shed);
     
@@ -147,73 +155,69 @@ apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type,
     ngn = apr_hash_get(shed->ngns, ngn_type, APR_HASH_KEY_STRING);
     if (ngn) {
         if (ngn->shutdown) {
-            ap_log_rerror(APLOG_MARK, APLOG_INFO, 0, r,
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
                           "h2_ngn_shed(%ld): %s in shutdown", 
                           shed->c->id, ngn->id);
-            ngn = NULL;
         }
         else if (ngn->no_assigned >= ngn->capacity) {
-            ap_log_rerror(APLOG_MARK, APLOG_INFO, 0, r,
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->c,
                           "h2_ngn_shed(%ld): %s over capacity %d/%d", 
                           shed->c->id, ngn->id, ngn->no_assigned,
                           ngn->capacity);
-            ngn = NULL;
         }
-        else if (ngn_schedule(ngn, r) == APR_SUCCESS) {
+        else {
             /* this task will be processed in another thread,
              * freeze any I/O for the time being. */
             h2_task_freeze(task, r);
+            ngn_add_req(ngn, task, r);
             ngn->no_assigned++;
-            status = APR_SUCCESS;
-            ap_log_rerror(APLOG_MARK, APLOG_INFO, status, r,
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->c,
                           "h2_ngn_shed(%ld): pushed request %s to %s", 
                           shed->c->id, task->id, ngn->id);
-        }
-        else {
-            ap_log_rerror(APLOG_MARK, APLOG_INFO, status, r,
-                          "h2_ngn_shed(%ld): engine error adding req %s", 
-                          shed->c->id, ngn->id);
-            ngn = NULL;
+            return APR_SUCCESS;
         }
     }
     
-    if (!ngn && einit) {
-        ngn = apr_pcalloc(task->c->pool, sizeof(*ngn));
-        ngn->id = apr_psprintf(task->c->pool, "ngn-%ld-%d", 
+    /* none of the existing engines has capacity */
+    if (einit) {
+        apr_status_t status;
+        h2_req_engine *newngn;
+        
+        newngn = apr_pcalloc(task->c->pool, sizeof(*ngn));
+        newngn->id = apr_psprintf(task->c->pool, "ngn-%ld-%d", 
                                    shed->c->id, shed->next_ngn_id++);
-        ngn->pool = task->c->pool;
-        ngn->type = apr_pstrdup(task->c->pool, ngn_type);
-        ngn->c = r->connection;
-        APR_RING_INIT(&ngn->entries, h2_ngn_entry, link);
-        ngn->shed = shed;
-        ngn->capacity = 100;
-        ngn->io = task->io;
-        ngn->no_assigned = 1;
-        ngn->no_live = 1;
+        newngn->pool = task->c->pool;
+        newngn->type = apr_pstrdup(task->c->pool, ngn_type);
+        newngn->c = r->connection;
+        APR_RING_INIT(&newngn->entries, h2_ngn_entry, link);
+        newngn->shed = shed;
+        newngn->capacity = 100;
+        newngn->no_assigned = 1;
+        newngn->no_live = 1;
         
-        status = einit(ngn, ngn->id, ngn->type, ngn->pool,
+        status = einit(newngn, newngn->id, newngn->type, newngn->pool,
                        shed->req_buffer_size, r);
-        ap_log_rerror(APLOG_MARK, APLOG_INFO, status, r,
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, task->c,
                       "h2_ngn_shed(%ld): init engine %s (%s)", 
-                      shed->c->id, ngn->id, ngn->type);
+                      shed->c->id, newngn->id, newngn->type);
         if (status == APR_SUCCESS) {
-            apr_hash_set(shed->ngns, ngn->type, APR_HASH_KEY_STRING, ngn);
+            newngn->task = task;
+            AP_DEBUG_ASSERT(task->engine == NULL);
+            task->engine = newngn;
+            apr_hash_set(shed->ngns, newngn->type, APR_HASH_KEY_STRING, newngn);
         }
+        return status;
     }
-    return status;
+    return APR_EOF;
 }
 
 static h2_ngn_entry *pop_non_frozen(h2_req_engine *ngn)
 {
     h2_ngn_entry *entry;
-    h2_task *task;
-
     for (entry = H2_REQ_ENTRIES_FIRST(&ngn->entries);
          entry != H2_REQ_ENTRIES_SENTINEL(&ngn->entries);
          entry = H2_NGN_ENTRY_NEXT(entry)) {
-        task = h2_ctx_rget_task(entry->r);
-        AP_DEBUG_ASSERT(task);
-        if (!task->frozen) {
+        if (!entry->task->frozen) {
             H2_NGN_ENTRY_REMOVE(entry);
             return entry;
         }
@@ -235,62 +239,60 @@ apr_status_t h2_ngn_shed_pull_req(h2_ngn_shed *shed,
         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, shed->c,
                       "h2_ngn_shed(%ld): abort while pulling requests %s", 
                       shed->c->id, ngn->id);
-        return APR_EOF;
+        ngn->shutdown = 1;
+        return APR_ECONNABORTED;
     }
     
     ngn->capacity = capacity;
-    if (!H2_REQ_ENTRIES_EMPTY(&ngn->entries) 
-        && (entry = pop_non_frozen(ngn))) {
-        ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, entry->r,
+    if (H2_REQ_ENTRIES_EMPTY(&ngn->entries)) {
+        if (want_shutdown) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c,
+                          "h2_ngn_shed(%ld): emtpy queue, shutdown engine %s", 
+                          shed->c->id, ngn->id);
+            ngn->shutdown = 1;
+        }
+        return ngn->shutdown? APR_EOF : APR_EAGAIN;
+    }
+    
+    if ((entry = pop_non_frozen(ngn))) {
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, entry->task->c,
                       "h2_ngn_shed(%ld): pulled request %s for engine %s", 
-                      shed->c->id, entry->r->the_request, ngn->id);
+                      shed->c->id, entry->task->id, ngn->id);
         ngn->no_live++;
-        entry->r->connection->current_thread = ngn->c->current_thread;
         *pr = entry->r;
         return APR_SUCCESS;
     }
-    else if (want_shutdown) {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c,
-                      "h2_ngn_shed(%ld): emtpy queue, shutdown engine %s", 
-                      shed->c->id, ngn->id);
-        ngn->shutdown = 1;
-        return APR_EOF;
-    }
     return APR_EAGAIN;
 }
                                  
-static apr_status_t ngn_done_task(h2_ngn_shed *shed, h2_req_engine *ngn, h2_task *task, 
-                                  int waslive, int aborted)
+static apr_status_t ngn_done_task(h2_ngn_shed *shed, h2_req_engine *ngn, 
+                                  h2_task *task, int waslive, int aborted, 
+                                  int close)
 {
     ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c,
                   "h2_ngn_shed(%ld): task %s %s by %s", 
                   shed->c->id, task->id, aborted? "aborted":"done", ngn->id);
-    h2_task_output_close(task->output);
     ngn->no_finished++;
     if (waslive) ngn->no_live--;
     ngn->no_assigned--;
-    if (task->c != ngn->c) { /* do not release what the engine runs on */
-        return APR_SUCCESS;
+
+    if (close) {
+        h2_task_output_close(task->output);
     }
-    return APR_EAGAIN;
+    return APR_SUCCESS;
 }
                                 
-apr_status_t h2_ngn_shed_done_req(h2_ngn_shed *shed, 
-                                  h2_req_engine *ngn, conn_rec *r_conn)
+apr_status_t h2_ngn_shed_done_task(h2_ngn_shed *shed, 
+                                    struct h2_req_engine *ngn, h2_task *task)
 {
-    h2_task *task = h2_ctx_cget_task(r_conn);
-    if (task) {
-        return ngn_done_task(shed, ngn, task, 1, 0);
-    }
-    return APR_ECONNABORTED;
+    return ngn_done_task(shed, ngn, task, 1, 0, 0);
 }
                                 
 void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn)
 {
     h2_req_engine *existing;
     
-    if (!shed->aborted 
-        && !H2_REQ_ENTRIES_EMPTY(&ngn->entries)) {
+    if (!shed->aborted && !H2_REQ_ENTRIES_EMPTY(&ngn->entries)) {
         h2_ngn_entry *entry;
         ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, shed->c,
                       "h2_ngn_shed(%ld): exit engine %s (%s), "
@@ -309,7 +311,7 @@ void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn)
                           "h2_ngn_shed(%ld): engine %s has queued task %s, "
                           "frozen=%d, aborting",
                           shed->c->id, ngn->id, task->id, task->frozen);
-            ngn_done_task(shed, ngn, task, 0, 1);
+            ngn_done_task(shed, ngn, task, 0, 1, 1);
         }
     }
     if (!shed->aborted && (ngn->no_assigned > 1 || ngn->no_live > 1)) {
@@ -321,7 +323,7 @@ void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn)
                       (long)ngn->no_finished);
     }
     else {
-        ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, shed->c,
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c,
                       "h2_ngn_shed(%ld): exit engine %s (%s)", 
                       shed->c->id, ngn->id, ngn->type);
     }
index abed58a45841fbb03fc8cb68227a634466e7621e..887f750d138975e728778475e19767c20cb6c86f 100644 (file)
@@ -17,6 +17,7 @@
 #define h2_req_shed_h
 
 struct h2_req_engine;
+struct h2_task;
 
 typedef struct h2_ngn_shed h2_ngn_shed;
 struct h2_ngn_shed {
@@ -30,6 +31,9 @@ struct h2_ngn_shed {
     apr_uint32_t req_buffer_size; /* preferred buffer size for responses */
 };
 
+const char *h2_req_engine_get_id(h2_req_engine *engine);
+int h2_req_engine_is_shutdown(h2_req_engine *engine);
+
 typedef apr_status_t h2_shed_ngn_init(h2_req_engine *engine, 
                                       const char *id, 
                                       const char *type,
@@ -55,8 +59,9 @@ apr_status_t h2_ngn_shed_pull_req(h2_ngn_shed *shed, h2_req_engine *pub_ngn,
                                   apr_uint32_t capacity, 
                                   int want_shutdown, request_rec **pr);
 
-apr_status_t h2_ngn_shed_done_req(h2_ngn_shed *shed, 
-                                  struct h2_req_engine *ngn, conn_rec *r_conn);
+apr_status_t h2_ngn_shed_done_task(h2_ngn_shed *shed, 
+                                   struct h2_req_engine *ngn, 
+                                   struct h2_task *task);
 
 void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn);
 
index a5c30e77faa9c9e0ec2b2e418233fad54d188d22..d2581becc51fac0779a25ba421e30c240e9e5b1a 100644 (file)
@@ -64,6 +64,7 @@ static apr_status_t proxy_session_pre_close(void *theconn)
                       "proxy_session(%s): pool cleanup, state=%d, streams=%d",
                       session->id, session->state, 
                       (int)h2_ihash_count(session->streams));
+        session->aborted = 1;
         dispatch_event(session, H2_PROXYS_EV_PRE_CLOSE, 0, NULL);
         nghttp2_session_del(session->ngh2);
         session->ngh2 = NULL;
@@ -116,7 +117,7 @@ static ssize_t raw_send(nghttp2_session *ngh2, const uint8_t *data,
     status = proxy_pass_brigade(session->c->bucket_alloc,  
                                 session->p_conn, session->c, 
                                 session->output, flush);
-    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, 
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c, 
                   "h2_proxy_sesssion(%s): raw_send %d bytes, flush=%d", 
                   session->id, (int)length, flush);
     if (status != APR_SUCCESS) {
@@ -155,7 +156,7 @@ static int on_frame_recv(nghttp2_session *ngh2, const nghttp2_frame *frame,
                 char buffer[256];
                 
                 h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
-                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03342)
+                ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, session->c, APLOGNO(03342)
                               "h2_proxy_session(%s): recv FRAME[%s]",
                               session->id, buffer);
             }
@@ -351,6 +352,8 @@ static int on_data_chunk_recv(nghttp2_session *ngh2, uint8_t flags,
     if (flags & NGHTTP2_DATA_FLAG_EOF) {
         b = apr_bucket_flush_create(stream->r->connection->bucket_alloc);
         APR_BRIGADE_INSERT_TAIL(stream->output, b);
+        /*b = apr_bucket_eos_create(stream->r->connection->bucket_alloc);
+        APR_BRIGADE_INSERT_TAIL(stream->output, b);*/
     }
     
     ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, stream->r, 
@@ -372,10 +375,12 @@ static int on_stream_close(nghttp2_session *ngh2, int32_t stream_id,
                            uint32_t error_code, void *user_data) 
 {
     h2_proxy_session *session = user_data;
-    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO()
-                  "h2_proxy_session(%s): stream=%d, closed, err=%d", 
-                  session->id, stream_id, error_code);
-    dispatch_event(session, H2_PROXYS_EV_STREAM_DONE, stream_id, NULL);
+    if (!session->aborted) {
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO()
+                      "h2_proxy_session(%s): stream=%d, closed, err=%d", 
+                      session->id, stream_id, error_code);
+        dispatch_event(session, H2_PROXYS_EV_STREAM_DONE, stream_id, NULL);
+    }
     return 0;
 }
 
@@ -495,8 +500,7 @@ h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn,
         h2_proxy_session *session;
         nghttp2_session_callbacks *cbs;
         nghttp2_option *option;
-        ap_filter_t *f;
-        
+
         session = apr_pcalloc(pool, sizeof(*session));
         apr_pool_pre_cleanup_register(pool, p_conn, proxy_session_pre_close);
         p_conn->data = session;
@@ -535,15 +539,6 @@ h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn,
 
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
                       "setup session for %s", p_conn->hostname);
-                      
-        f = session->c->input_filters;
-        while (f) {
-            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
-                          "h2_proxy_session(%s): c->input_filter %s", 
-                          session->id, f->frec->name);
-            f = f->next;
-        }
-        
     }
     return p_conn->data;
 }
@@ -677,21 +672,7 @@ static apr_status_t feed_brigade(h2_proxy_session *session, apr_bucket_brigade *
         apr_bucket* b = APR_BRIGADE_FIRST(bb);
         
         if (APR_BUCKET_IS_METADATA(b)) {
-            if (APR_BUCKET_IS_EOS(b)) {
-                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
-                              "h2_proxy_session(%s): read EOS from conn", 
-                              session->id);
-            }
-            else if (APR_BUCKET_IS_FLUSH(b)) {
-                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
-                              "h2_proxy_session(%s): read FLUSH from conn", 
-                              session->id);
-            }
-            else {
-                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
-                              "h2_proxy_session(%s): read unkown META from conn", 
-                              session->id);
-            }
+            /* nop */
         }
         else {
             const char *bdata = NULL;
@@ -700,7 +681,7 @@ static apr_status_t feed_brigade(h2_proxy_session *session, apr_bucket_brigade *
             status = apr_bucket_read(b, &bdata, &blen, APR_BLOCK_READ);
             if (status == APR_SUCCESS && blen > 0) {
                 n = nghttp2_session_mem_recv(session->ngh2, (const uint8_t *)bdata, blen);
-                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, 
                               "h2_proxy_session(%s): feeding %ld bytes -> %ld", 
                               session->id, (long)blen, (long)n);
                 if (n < 0) {
@@ -719,7 +700,7 @@ static apr_status_t feed_brigade(h2_proxy_session *session, apr_bucket_brigade *
         apr_bucket_delete(b);
     }
     
-    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, 
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c, 
                   "h2_proxy_session(%s): fed %ld bytes of input to session", 
                   session->id, (long)readlen);
     if (readlen == 0 && status == APR_SUCCESS) {
@@ -756,7 +737,7 @@ static apr_status_t h2_proxy_session_read(h2_proxy_session *session, int block,
                                 AP_MODE_READBYTES, 
                                 block? APR_BLOCK_READ : APR_NONBLOCK_READ, 
                                 64 * 1024);
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, 
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, 
                       "h2_proxy_session(%s): read from conn", session->id);
         if (socket && save_timeout != -1) {
             apr_socket_timeout_set(socket, save_timeout);
@@ -770,7 +751,7 @@ static apr_status_t h2_proxy_session_read(h2_proxy_session *session, int block,
         /* nop */
     }
     else if (!APR_STATUS_IS_EAGAIN(status)) {
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, 
+        ap_log_cerror(APLOG_MARK, APLOG_INFO, status, session->c, 
                       "h2_proxy_session(%s): read error", session->id);
         dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, status, NULL);
     }
@@ -1016,7 +997,7 @@ static void ev_no_io(h2_proxy_session *session, int arg, const char *msg)
                     transit(session, "no io", H2_PROXYS_ST_DONE);
                 }
                 else {
-                    /* When we have no streams, no task event are possible,
+                    /* When we have no streams, no task events are possible,
                      * switch to blocking reads */
                     transit(session, "no io", H2_PROXYS_ST_IDLE);
                 }
@@ -1059,10 +1040,19 @@ static void ev_stream_done(h2_proxy_session *session, int stream_id,
                       "h2_proxy_sesssion(%s): stream(%d) closed", 
                       session->id, stream_id);
         if (!stream->data_received) {
-            /* last chance to manipulate response headers.
-             * after this, only trailers */
+            apr_bucket *b;
+            /* if the response had no body, this is the time to flush
+             * an empty brigade which will also "write" the resonse
+             * headers */
+            h2_proxy_stream_end_headers_out(stream);
             stream->data_received = 1;
+            b = apr_bucket_flush_create(stream->r->connection->bucket_alloc);
+            APR_BRIGADE_INSERT_TAIL(stream->output, b);
+            b = apr_bucket_eos_create(stream->r->connection->bucket_alloc);
+            APR_BRIGADE_INSERT_TAIL(stream->output, b);
+            ap_pass_brigade(stream->r->output_filters, stream->output);
         }
+        
         stream->state = H2_STREAM_ST_CLOSED;
         h2_ihash_remove(session->streams, stream_id);
         h2_iq_remove(session->suspended, stream_id);
@@ -1188,11 +1178,13 @@ apr_status_t h2_proxy_session_process(h2_proxy_session *session)
     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, 
                   "h2_proxy_session(%s): process", session->id);
            
+run_loop:
     switch (session->state) {
         case H2_PROXYS_ST_INIT:
             status = session_start(session);
             if (status == APR_SUCCESS) {
                 dispatch_event(session, H2_PROXYS_EV_INIT, 0, NULL);
+                goto run_loop;
             }
             else {
                 dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, status, NULL);
@@ -1223,6 +1215,7 @@ apr_status_t h2_proxy_session_process(h2_proxy_session *session)
             if (!have_written && !have_read 
                 && !nghttp2_session_want_write(session->ngh2)) {
                 dispatch_event(session, H2_PROXYS_EV_NO_IO, 0, NULL);
+                goto run_loop;
             }
             break;
             
@@ -1255,10 +1248,10 @@ apr_status_t h2_proxy_session_process(h2_proxy_session *session)
             break;
             
         case H2_PROXYS_ST_IDLE:
-            return APR_SUCCESS;
+            break;
 
-        case H2_PROXYS_ST_DONE:
-            return APR_SUCCESS;
+        case H2_PROXYS_ST_DONE: /* done, session terminated */
+            return APR_EOF;
             
         default:
             ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EGENERAL, session->c,
@@ -1278,7 +1271,7 @@ apr_status_t h2_proxy_session_process(h2_proxy_session *session)
         dispatch_event(session, H2_PROXYS_EV_NGH2_DONE, 0, NULL);
     }
     
-    return APR_EAGAIN;
+    return APR_SUCCESS; /* needs to be called again */
 }
 
 typedef struct {
@@ -1297,10 +1290,15 @@ static int cleanup_iter(void *udata, void *val)
 void h2_proxy_session_cleanup(h2_proxy_session *session, 
                               h2_proxy_request_done *done)
 {
-    cleanup_iter_ctx ctx;
-    ctx.session = session;
-    ctx.done = done;
-    h2_ihash_iter(session->streams, cleanup_iter, &ctx);
-    h2_ihash_clear(session->streams);
+    if (session->streams && !h2_ihash_is_empty(session->streams)) {
+        cleanup_iter_ctx ctx;
+        ctx.session = session;
+        ctx.done = done;
+        ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, session->c, 
+                      "h2_proxy_session(%s): terminated, %d streams unfinished",
+                      session->id, (int)h2_ihash_count(session->streams));
+        h2_ihash_iter(session->streams, cleanup_iter, &ctx);
+        h2_ihash_clear(session->streams);
+    }
 }
 
index 3fad2b6003a0a6b970a9764e9d88d46c8502e575..94e5131961615cc66e3b000475b3752ada365de1 100644 (file)
@@ -61,6 +61,8 @@ struct h2_proxy_session {
     apr_pool_t *pool;
     nghttp2_session *ngh2;   /* the nghttp2 session itself */
     
+    unsigned int aborted : 1;
+
     h2_proxy_request_done *done;
     void *user_data;
     
@@ -87,7 +89,15 @@ h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn,
 
 apr_status_t h2_proxy_session_submit(h2_proxy_session *s, const char *url,
                                      request_rec *r);
-                                     
+                       
+/** 
+ * Perform a step in processing the proxy session. Will return aftert
+ * one read/write cycle and indicate session status by status code.
+ * @param s the session to process
+ * @return APR_EAGAIN  when processing needs to be invoked again
+ *         APR_SUCCESS when all streams have been processed, session still live
+ *         APR_EOF     when the session has been terminated
+ */
 apr_status_t h2_proxy_session_process(h2_proxy_session *s);
 
 void h2_proxy_session_cleanup(h2_proxy_session *s, h2_proxy_request_done *done);
index 2c227505598ee79c2c94f3312cd98b6fc4419f41..2e358cdb14bf2f5466963f13a2b0652b9a5938df 100644 (file)
@@ -354,7 +354,7 @@ h2_request *h2_request_clone(apr_pool_t *p, const h2_request *src)
     return nreq;
 }
 
-request_rec *h2_request_create_rec(const h2_request *req, conn_rec *conn)
+request_rec *h2_request_create_rec(const h2_request *req, conn_rec *c)
 {
     int access_status = HTTP_OK;    
     
@@ -362,7 +362,7 @@ request_rec *h2_request_create_rec(const h2_request *req, conn_rec *conn)
 
     r->headers_in = apr_table_clone(r->pool, req->headers);
 
-    ap_run_pre_read_request(r, conn);
+    ap_run_pre_read_request(r, c);
     
     /* Time to populate r with the data we have. */
     r->request_time = req->request_time;
@@ -405,11 +405,11 @@ request_rec *h2_request_create_rec(const h2_request *req, conn_rec *conn)
         /* Request check post hooks failed. An example of this would be a
          * request for a vhost where h2 is disabled --> 421.
          */
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, conn, APLOGNO()
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO()
                       "h2_request(%d): access_status=%d, request_create failed",
                       req->id, access_status);
         ap_die(access_status, r);
-        ap_update_child_status(conn->sbh, SERVER_BUSY_LOG, r);
+        ap_update_child_status(c->sbh, SERVER_BUSY_LOG, r);
         ap_run_log_transaction(r);
         r = NULL;
         goto traceout;
index 0a781f63d4ff5788785cf3314c245a8de433db31..9b1dc6b9469836da36c87d3259cca78c0c2af018 100644 (file)
@@ -299,14 +299,18 @@ static int h2_task_process_conn(conn_rec* c)
     ctx = h2_ctx_get(c, 0);
     if (h2_ctx_is_task(ctx)) {
         if (!ctx->task->ser_headers) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c, 
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, 
                           "h2_h2, processing request directly");
             h2_task_process_request(ctx->task, c);
             return DONE;
         }
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c, 
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, 
                       "h2_task(%s), serialized handling", ctx->task->id);
     }
+    else {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, 
+                      "slave_conn(%ld): has no task", c->id);
+    }
     return DECLINED;
 }
 
index 462b2566a6edd990c7985a1eea1562a7391dfbf4..1a9dba54566bcbfdb184a4532b9dd66e5459a669 100644 (file)
@@ -41,6 +41,7 @@ struct apr_thread_cond_t;
 struct h2_conn;
 struct h2_mplx;
 struct h2_task;
+struct h2_req_engine;
 struct h2_request;
 struct h2_resp_head;
 struct h2_worker;
@@ -63,6 +64,8 @@ struct h2_task {
     struct h2_task_input *input;
     struct h2_task_output *output;
     struct apr_thread_cond_t *io;   /* used to wait for events on */
+    
+    struct h2_req_engine *engine;
 };
 
 h2_task *h2_task_create(long session_id, const struct h2_request *req, 
index f1f52c005b6e17c7e14f4263e6e8ca60ac8b17d4..025c13987321cf573dc008951a50775fb6a766b3 100644 (file)
@@ -190,15 +190,18 @@ apr_status_t h2_task_output_write(h2_task_output *output,
 
 void h2_task_output_close(h2_task_output *output)
 {
+    if (output->task->frozen) {
+        return;
+    }
     open_if_needed(output, NULL, NULL, "close");
     if (output->state != H2_TASK_OUT_DONE) {
         if (output->frozen_bb && !APR_BRIGADE_EMPTY(output->frozen_bb)) {
             h2_mplx_out_write(output->task->mplx, output->task->stream_id, 
                 NULL, 1, output->frozen_bb, NULL, NULL);
         }
+        output->state = H2_TASK_OUT_DONE;
         h2_mplx_out_close(output->task->mplx, output->task->stream_id, 
                           get_trailers(output));
-        output->state = H2_TASK_OUT_DONE;
     }
 }
 
index 5216523022eb5930a980fadca0f8dfa962e976df..c3b01733a95d61b5a6d69db96b90360bdf3bc0d6 100644 (file)
@@ -148,12 +148,6 @@ static void http2_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn)
     h2_mplx_req_engine_done(ngn, r_conn);
 }
 
-static void http2_req_engine_exit(h2_req_engine *ngn)
-{
-    h2_mplx_req_engine_exit(ngn);
-}
-
-
 /* Runs once per created child process. Perform any process 
  * related initionalization here.
  */
@@ -179,7 +173,6 @@ static void h2_hooks(apr_pool_t *pool)
     APR_REGISTER_OPTIONAL_FN(http2_req_engine_push);
     APR_REGISTER_OPTIONAL_FN(http2_req_engine_pull);
     APR_REGISTER_OPTIONAL_FN(http2_req_engine_done);
-    APR_REGISTER_OPTIONAL_FN(http2_req_engine_exit);
 
     ap_log_perror(APLOG_MARK, APLOG_TRACE1, 0, pool, "installing hooks");
     
index d5af1d3ce27805e104173fd8c91fd2ad118cb0f1..3220700d876a2d68ebf8c52e03c16808110e7f87 100644 (file)
@@ -74,8 +74,9 @@ APR_DECLARE_OPTIONAL_FN(apr_status_t,
  * @param timeout     wait a maximum amount of time for a new slave, 0 will not wait
  * @param pslave      the slave connection that needs processing or NULL
  * @return APR_SUCCESS if new request was assigned
- *         APR_EAGAIN/APR_TIMEUP if no new request is available
- *         APR_ECONNABORTED if the engine needs to shut down
+ *         APR_EAGAIN  if no new request is available
+ *         APR_EOF          if engine may shut down, as no more request will be scheduled
+ *         APR_ECONNABORTED if the engine needs to shut down immediately
  */
 APR_DECLARE_OPTIONAL_FN(apr_status_t, 
                         http2_req_engine_pull, (h2_req_engine *engine, 
@@ -85,15 +86,6 @@ APR_DECLARE_OPTIONAL_FN(apr_status_t,
 APR_DECLARE_OPTIONAL_FN(void, 
                         http2_req_engine_done, (h2_req_engine *engine, 
                                                 conn_rec *rconn));
-/**
- * The given request engine is done processing and needs to be excluded
- * from further handling. 
- * @param engine      the engine to exit
- */
-APR_DECLARE_OPTIONAL_FN(void,
-                        http2_req_engine_exit, (h2_req_engine *engine));
-
-
 #define H2_TASK_ID_NOTE     "http2-task-id"
 
 #endif
index c5f827f29af73aa904c929922333e9077a81ccc7..0a752a8410321b81b4c68f97d97d6f5b6114257c 100644 (file)
@@ -48,7 +48,6 @@ static apr_status_t (*req_engine_pull)(h2_req_engine *engine,
                                        apr_uint32_t capacity, 
                                        request_rec **pr);
 static void (*req_engine_done)(h2_req_engine *engine, conn_rec *r_conn);
-static void (*req_engine_exit)(h2_req_engine *engine);
                                        
 typedef struct h2_proxy_ctx {
     conn_rec *owner;
@@ -65,6 +64,8 @@ typedef struct h2_proxy_ctx {
     const char *engine_type;
     apr_pool_t *engine_pool;    
     apr_uint32_t req_buffer_size;
+    request_rec *next;
+    apr_size_t capacity;
     
     unsigned standalone : 1;
     unsigned is_ssl : 1;
@@ -98,15 +99,12 @@ static int h2_proxy_post_config(apr_pool_t *p, apr_pool_t *plog,
     req_engine_push = APR_RETRIEVE_OPTIONAL_FN(http2_req_engine_push);
     req_engine_pull = APR_RETRIEVE_OPTIONAL_FN(http2_req_engine_pull);
     req_engine_done = APR_RETRIEVE_OPTIONAL_FN(http2_req_engine_done);
-    req_engine_exit = APR_RETRIEVE_OPTIONAL_FN(http2_req_engine_exit);
     
     /* we need all of them */
-    if (!req_engine_push || !req_engine_pull 
-        || !req_engine_done || !req_engine_exit) {
+    if (!req_engine_push || !req_engine_pull || !req_engine_done) {
         req_engine_push = NULL;
         req_engine_pull = NULL;
         req_engine_done = NULL;
-        req_engine_exit = NULL;
     }
     
     return status;
@@ -213,6 +211,7 @@ static apr_status_t proxy_engine_init(h2_req_engine *engine,
         ctx->engine_type = type;
         ctx->engine_pool = pool;
         ctx->req_buffer_size = req_buffer_size;
+        ctx->capacity = 100;
         return APR_SUCCESS;
     }
     ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, r, 
@@ -245,79 +244,38 @@ static void request_done(h2_proxy_session *session, request_rec *r)
     if (r == ctx->rbase) {
         ctx->r_status = APR_SUCCESS;
     }
-    else if (req_engine_done && ctx->engine) {
+    
+    if (req_engine_done && ctx->engine) {
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r->connection, 
                       "h2_proxy_session(%s): request %s",
                       ctx->engine_id, r->the_request);
         req_engine_done(ctx->engine, r->connection);
     }
-    
 }
 
-static apr_status_t next_request(h2_proxy_ctx *ctx, h2_proxy_session *session, 
-                                 request_rec *r, int before_leave,
-                                 request_rec **pr)
+static apr_status_t next_request(h2_proxy_ctx *ctx, int before_leave)
 {
-    *pr = r;
-    if (!r && ctx->engine) {
+    if (ctx->next) {
+        return APR_SUCCESS;
+    }
+    else if (req_engine_pull && ctx->engine) {
         apr_status_t status;
         status = req_engine_pull(ctx->engine, 
                                  before_leave? APR_BLOCK_READ: APR_NONBLOCK_READ, 
-                                 H2MAX(1, session->remote_max_concurrent), pr);
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c, 
-                      "h2_proxy_session(%s): pulled request %s", 
-                      session->id, (*pr? (*pr)->the_request : "NULL"));
-        return status; 
-    }
-    return *pr? APR_SUCCESS : APR_EAGAIN;
+                                 ctx->capacity, &ctx->next);
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, ctx->owner, 
+                      "h2_proxy_engine(%s): pulled request %s", 
+                      ctx->engine_id, 
+                      (ctx->next? ctx->next->the_request : "NULL"));
+        return APR_STATUS_IS_EAGAIN(status)? APR_SUCCESS : status;
+    }
+    return APR_EOF;
 }
 
-static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx, request_rec *r) {
+static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) {
     apr_status_t status = OK;
     h2_proxy_session *session;
     
-setup_backend:    
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, ctx->owner, 
-                  "eng(%s): setup backend", ctx->engine_id);
-    /* Step Two: Make the Connection (or check that an already existing
-     * socket is still usable). On success, we have a socket connected to
-     * backend->hostname. */
-    if (ap_proxy_connect_backend(ctx->proxy_func, ctx->p_conn, ctx->worker, 
-                                 ctx->server)) {
-        ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, ctx->owner, APLOGNO(03352)
-                      "H2: failed to make connection to backend: %s",
-                      ctx->p_conn->hostname);
-        return HTTP_SERVICE_UNAVAILABLE;
-    }
-    
-    /* Step Three: Create conn_rec for the socket we have open now. */
-    if (!ctx->p_conn->connection) {
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, ctx->owner, APLOGNO(03353)
-                      "setup new connection: is_ssl=%d %s %s %s", 
-                      ctx->p_conn->is_ssl, ctx->p_conn->ssl_hostname, 
-                      r->hostname, ctx->p_conn->hostname);
-        if ((status = ap_proxy_connection_create(ctx->proxy_func, ctx->p_conn,
-                                                 ctx->owner, 
-                                                 ctx->server)) != OK) {
-            return status;
-        }
-        
-        /*
-         * On SSL connections set a note on the connection what CN is
-         * requested, such that mod_ssl can check if it is requested to do
-         * so.
-         */
-        if (ctx->p_conn->ssl_hostname) {
-            apr_table_setn(ctx->p_conn->connection->notes,
-                           "proxy-request-hostname", ctx->p_conn->ssl_hostname);
-        }
-        
-        if (ctx->is_ssl) {
-            apr_table_setn(ctx->p_conn->connection->notes,
-                           "proxy-request-alpn-protos", "h2");
-        }
-    }
-
     /* Step Four: Send the Request in a new HTTP/2 stream and
      * loop until we got the response or encounter errors.
      */
@@ -327,67 +285,103 @@ setup_backend:
                                      30, h2_log2(ctx->req_buffer_size), 
                                      request_done);
     if (!session) {
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->p_conn->connection
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner
                       "session unavailable");
         return HTTP_SERVICE_UNAVAILABLE;
     }
     
-run_session:
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, ctx->owner, 
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner, 
                   "eng(%s): run session %s", ctx->engine_id, session->id);
     session->user_data = ctx;
-    status = h2_proxy_session_process(session);
-    while (APR_STATUS_IS_EAGAIN(status)) {
-        status = next_request(ctx, session, r, 0, &r);
-        if (status == APR_SUCCESS) {
-            add_request(session, r);
-            r = NULL;
-        }
-        else if (!APR_STATUS_IS_EAGAIN(status)) {
-            break;
+    
+    while (1) {
+        if (ctx->next) {
+            add_request(session, ctx->next);
+            ctx->next = NULL;
         }
         
         status = h2_proxy_session_process(session);
-    }
-    
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, ctx->owner, 
-                  "eng(%s): end of session run", ctx->engine_id);
-    if (session->state == H2_PROXYS_ST_DONE || status != APR_SUCCESS) {
-        ctx->p_conn->close = 1;
-    }
-    
-    if (status == APR_SUCCESS) {
-        status = next_request(ctx, session, r, 1, &r);
-    }
-    if (status == APR_SUCCESS) { 
-        if (ctx->p_conn->close) {
-            /* the connection is/willbe closed, the session is terminated.
+        
+        if (status == APR_SUCCESS) {
+            apr_status_t s2;
+            /* ongoing processing, call again */
+            ctx->capacity = H2MAX(100, session->remote_max_concurrent);
+            s2 = next_request(ctx, 0);
+            if (s2 == APR_ECONNABORTED) {
+                /* master connection gone */
+                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, s2, ctx->owner, 
+                              "eng(%s): pull request", ctx->engine_id);
+                status = s2;
+                break;
+            }
+            if (!ctx->next && h2_ihash_is_empty(session->streams)) {
+                break;
+            }
+        }
+        else {
+            /* end of processing, maybe error */
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, ctx->owner, 
+                          "eng(%s): end of session run", ctx->engine_id);
+            /*
              * Any open stream of that session needs to
              * a) be reopened on the new session iff safe to do so
              * b) reported as done (failed) otherwise
              */
             h2_proxy_session_cleanup(session, request_done);
-            goto setup_backend;
+            break;
         }
-        add_request(session, r);
-        r = NULL;
-        goto run_session;
     }
-
-    if (session->streams && !h2_ihash_is_empty(session->streams)) {
-        ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, 
-                      ctx->p_conn->connection, 
-                      "session run done with %d streams unfinished",
-                      (int)h2_ihash_count(session->streams));
-    }
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, 
-                  ctx->p_conn->connection, "eng(%s): session run done",
-                  ctx->engine_id);
-                  
+    
     session->user_data = NULL;
+    
     return status;
 }
 
+static apr_status_t setup_engine(h2_proxy_ctx *ctx)
+{
+    conn_rec *c = ctx->owner;
+    const char *engine_type, *hostname;
+    
+    hostname = (ctx->p_conn->ssl_hostname? 
+                ctx->p_conn->ssl_hostname : ctx->p_conn->hostname);
+    engine_type = apr_psprintf(c->pool, "proxy_http2 %s%s", hostname, 
+                               ctx->server_portstr);
+    
+    if (c->master && req_engine_push && ctx->next && is_h2 && is_h2(c)) {
+        /* If we are have req_engine capabilities, push the handling of this
+         * request (e.g. slave connection) to a proxy_http2 engine which 
+         * uses the same backend. We may be called to create an engine 
+         * ourself. */
+        if (req_engine_push(engine_type, ctx->next, proxy_engine_init)
+            == APR_SUCCESS && ctx->engine == NULL) {
+            /* Another engine instance has taken over processing of this
+             * request. */
+            ctx->r_status = APR_SUCCESS;
+            ctx->next = NULL;
+            
+            return APR_EOF;
+        }
+    }
+    
+    if (!ctx->engine) {
+        /* No engine was available or has been initialized, handle this
+         * request just by ourself. */
+        ctx->engine_id = apr_psprintf(c->pool, "eng-proxy-%ld", c->id);
+        ctx->engine_type = engine_type;
+        ctx->engine_pool = c->pool;
+        ctx->req_buffer_size = (32*1024);
+        ctx->standalone = 1;
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, 
+                      "h2_proxy_http2(%ld): setup standalone engine for type %s", 
+                      c->id, engine_type);
+    }
+    else {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, 
+                      "H2: hosting engine %s", ctx->engine_id);
+    }
+    return APR_SUCCESS;
+}
+
 static int proxy_http2_handler(request_rec *r, 
                                proxy_worker *worker,
                                proxy_server_conf *conf,
@@ -405,7 +399,6 @@ static int proxy_http2_handler(request_rec *r,
     apr_pool_t *p = c->pool;
     apr_uri_t *uri = apr_palloc(p, sizeof(*uri));
     h2_proxy_ctx *ctx;
-    const char *engine_type, *hostname;
 
     /* find the scheme */
     if ((url[0] != 'h' && url[0] != 'H') || url[1] != '2') {
@@ -441,13 +434,16 @@ static int proxy_http2_handler(request_rec *r,
     ctx->conf       = conf;
     ctx->flushall   = apr_table_get(r->subprocess_env, "proxy-flushall")? 1 : 0;
     ctx->r_status   = HTTP_SERVICE_UNAVAILABLE;
-    
+    ctx->next       = r;
+    r = NULL;
     ap_set_module_config(c->conn_config, &proxy_http2_module, ctx);
-    apr_table_setn(r->notes, H2_PROXY_REQ_URL_NOTE, url);
 
     /* scheme says, this is for us. */
-    ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r, "H2: serving URL %s", url);
-
+    apr_table_setn(ctx->rbase->notes, H2_PROXY_REQ_URL_NOTE, url);
+    ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, ctx->rbase, 
+                  "H2: serving URL %s", url);
+    
+run_connect:    
     /* Get a proxy_conn_rec from the worker, might be a new one, might
      * be one still open from another request, or it might fail if the
      * worker is stopped or in error. */
@@ -460,79 +456,96 @@ static int proxy_http2_handler(request_rec *r,
     if (ctx->is_ssl) {
         /* If there is still some data on an existing ssl connection, now
          * would be a good timne to get rid of it. */
-        ap_proxy_ssl_connection_cleanup(ctx->p_conn, r);
+        ap_proxy_ssl_connection_cleanup(ctx->p_conn, ctx->rbase);
     }
 
     /* Step One: Determine the URL to connect to (might be a proxy),
      * initialize the backend accordingly and determine the server 
      * port string we can expect in responses. */
-    if ((status = ap_proxy_determine_connection(p, r, conf, worker, ctx->p_conn,
-                                                uri, &locurl, proxyname,
-                                                proxyport, ctx->server_portstr,
+    if ((status = ap_proxy_determine_connection(p, ctx->rbase, conf, worker, 
+                                                ctx->p_conn, uri, &locurl, 
+                                                proxyname, proxyport, 
+                                                ctx->server_portstr,
                                                 sizeof(ctx->server_portstr))) != OK) {
         goto cleanup;
     }
     
-    hostname = (ctx->p_conn->ssl_hostname? 
-                ctx->p_conn->ssl_hostname : ctx->p_conn->hostname);
-    engine_type = apr_psprintf(p, "proxy_http2 %s%s", hostname, ctx->server_portstr);
+    if (!ctx->engine && setup_engine(ctx) != APR_SUCCESS) {
+        goto cleanup;
+    }
     
-    if (c->master && req_engine_push && is_h2 && is_h2(ctx->owner)) {
-        /* If we are have req_engine capabilities, push the handling of this
-         * request (e.g. slave connection) to a proxy_http2 engine which uses 
-         * the same backend. We may be called to create an engine ourself.
-         */
-        status = req_engine_push(engine_type, r, proxy_engine_init);
-        if (status == APR_SUCCESS && ctx->engine == NULL) {
-            /* Another engine instance has taken over processing of this
-             * request. */
-            ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r, 
-                          "H2: pushed request %s to engine type %s", 
-                          url, engine_type);
-            ctx->r_status = APR_SUCCESS;
-            goto cleanup;
-        }
+    /* Step Two: Make the Connection (or check that an already existing
+     * socket is still usable). On success, we have a socket connected to
+     * backend->hostname. */
+    if (ap_proxy_connect_backend(ctx->proxy_func, ctx->p_conn, ctx->worker, 
+                                 ctx->server)) {
+        ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, ctx->owner, APLOGNO(03352)
+                      "H2: failed to make connection to backend: %s",
+                      ctx->p_conn->hostname);
+        goto cleanup;
     }
     
-    if (!ctx->engine) {
-        /* No engine was available or has been initialized, handle this
-        * request just by ourself. */
-        ctx->engine_id = apr_psprintf(p, "eng-proxy-%ld", c->id);
-        ctx->engine_type = engine_type;
-        ctx->engine_pool = p;
-        ctx->req_buffer_size = (32*1024);
-        ctx->standalone = 1;
-        ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r, 
-                      "h2_proxy_http2(%ld): setup standalone engine for type %s", 
-                      c->id, engine_type);
+    /* Step Three: Create conn_rec for the socket we have open now. */
+    if (!ctx->p_conn->connection) {
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, ctx->owner, APLOGNO(03353)
+                      "setup new connection: is_ssl=%d %s %s %s", 
+                      ctx->p_conn->is_ssl, ctx->p_conn->ssl_hostname, 
+                      locurl, ctx->p_conn->hostname);
+        if ((status = ap_proxy_connection_create(ctx->proxy_func, ctx->p_conn,
+                                                 ctx->owner, 
+                                                 ctx->server)) != OK) {
+            goto cleanup;
+        }
+        
+        /*
+         * On SSL connections set a note on the connection what CN is
+         * requested, such that mod_ssl can check if it is requested to do
+         * so.
+         */
+        if (ctx->p_conn->ssl_hostname) {
+            apr_table_setn(ctx->p_conn->connection->notes,
+                           "proxy-request-hostname", ctx->p_conn->ssl_hostname);
+        }
+        
+        if (ctx->is_ssl) {
+            apr_table_setn(ctx->p_conn->connection->notes,
+                           "proxy-request-alpn-protos", "h2");
+        }
     }
-    else {
-        ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r, 
-                      "H2: hosting engine %s for request %s", ctx->engine_id, url);
+
+run_session:
+    status = proxy_engine_run(ctx);
+    if (status == APR_SUCCESS) {
+        /* session and connection still ok */
+        if (next_request(ctx, 1) == APR_SUCCESS) {
+            /* more requests, run again */
+            goto run_session;
+        }
+        /* done */
+        ctx->engine = NULL;
     }
-    
-    status = proxy_engine_run(ctx, r);
 
 cleanup:
-    if (ctx->engine && req_engine_exit) {
-        req_engine_exit(ctx->engine);
-        ctx->engine = NULL;
+    if (ctx->engine && next_request(ctx, 1) == APR_SUCCESS) {
+        /* Still more to do, tear down old conn and start over */
+        ctx->p_conn->close = 1;
+        proxy_run_detach_backend(r, ctx->p_conn);
+        ap_proxy_release_connection(ctx->proxy_func, ctx->p_conn, ctx->server);
+        ctx->p_conn = NULL;
+        goto run_connect;
     }
     
-    if (ctx) {
-        if (ctx->p_conn) {
-            if (status != APR_SUCCESS) {
-                ctx->p_conn->close = 1;
-            }
-            proxy_run_detach_backend(r, ctx->p_conn);
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "cleanup, releasing connection");
-            ap_proxy_release_connection(ctx->proxy_func, ctx->p_conn, ctx->server);
+    if (ctx->p_conn) {
+        if (status != APR_SUCCESS) {
+            /* close socket when errors happened or session shut down (EOF) */
+            ctx->p_conn->close = 1;
         }
-        ctx->worker = NULL;
-        ctx->conf = NULL;
+        proxy_run_detach_backend(ctx->rbase, ctx->p_conn);
+        ap_proxy_release_connection(ctx->proxy_func, ctx->p_conn, ctx->server);
         ctx->p_conn = NULL;
     }
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c, "leaving handler");
+
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c, "leaving handler");
     return ctx->r_status;
 }