]> granicus.if.org Git - apache/commitdiff
mod_http2: support for several different request engines per connection, fixes CVE...
authorStefan Eissing <icing@apache.org>
Sat, 5 Mar 2016 15:45:12 +0000 (15:45 +0000)
committerStefan Eissing <icing@apache.org>
Sat, 5 Mar 2016 15:45:12 +0000 (15:45 +0000)
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1733727 13f79535-47bb-0310-9956-ffa450edef68

20 files changed:
CMakeLists.txt
modules/http2/NWGNUmod_http2
modules/http2/config2.m4
modules/http2/h2_conn_io.c
modules/http2/h2_conn_io.h
modules/http2/h2_mplx.c
modules/http2/h2_mplx.h
modules/http2/h2_ngn_shed.c [new file with mode: 0644]
modules/http2/h2_ngn_shed.h [new file with mode: 0644]
modules/http2/h2_proxy_session.c
modules/http2/h2_request.c
modules/http2/h2_session.c
modules/http2/h2_task.c
modules/http2/h2_task.h
modules/http2/h2_task_output.c
modules/http2/h2_task_output.h
modules/http2/h2_version.h
modules/http2/mod_http2.c
modules/http2/mod_http2.h
modules/http2/mod_proxy_http2.c

index bd3fe405f28f24cba555a1ec6bed2e454b74dfcf..e68076630e55b1b9b512a08abbb95d96556a73b7 100644 (file)
@@ -407,7 +407,7 @@ SET(mod_http2_extra_sources
   modules/http2/h2_mplx.c            modules/http2/h2_push.c
   modules/http2/h2_request.c         modules/http2/h2_response.c
   modules/http2/h2_session.c         modules/http2/h2_stream.c 
-  modules/http2/h2_switch.c
+  modules/http2/h2_switch.c          modules/http2/h2_ngn_shed.c 
   modules/http2/h2_task.c            modules/http2/h2_task_input.c
   modules/http2/h2_task_output.c     modules/http2/h2_int_queue.c
   modules/http2/h2_util.c            modules/http2/h2_worker.c
index 12811bd2703575cee08133c0f2fe4142e08f9e51..c471ea470d1692b62e6e43cbbaa5430c6fc9825e 100644 (file)
@@ -198,6 +198,7 @@ FILES_nlm_objs = \
        $(OBJDIR)/h2_io.o \
        $(OBJDIR)/h2_io_set.o \
        $(OBJDIR)/h2_mplx.o \
+       $(OBJDIR)/h2_ngn_shed.o \
        $(OBJDIR)/h2_push.o \
        $(OBJDIR)/h2_request.o \
        $(OBJDIR)/h2_response.o \
index b47b2d21923de01062906c2586eaf9d2d9d39cad..1515c4dbcf55f8da9970bd25fcea5df6d5ab6909 100644 (file)
@@ -33,6 +33,7 @@ h2_int_queue.lo dnl
 h2_io.lo dnl
 h2_io_set.lo dnl
 h2_mplx.lo dnl
+h2_ngn_shed.lo dnl
 h2_push.lo dnl
 h2_request.lo dnl
 h2_response.lo dnl
@@ -200,7 +201,8 @@ is usually linked shared and requires loading. ], $http2_objs, , most, [
 ])
 
 # Ensure that other modules can pick up mod_http2.h
-APR_ADDTO(INCLUDES, [-I\$(top_srcdir)/$modpath_current])
+# icing: hold back for now until it is more stable
+#APR_ADDTO(INCLUDES, [-I\$(top_srcdir)/$modpath_current])
 
 
 
index 4a8375b9401721f8740837035d2d1be060be3dd6..56d01e6732c523d33a462d7d98120208f036fe58 100644 (file)
@@ -213,6 +213,16 @@ apr_status_t h2_conn_io_pass(h2_conn_io *io, int flush)
     return h2_conn_io_flush_int(io, flush, 0);
 }
 
+apr_status_t h2_conn_io_flush(h2_conn_io *io)
+{
+    /* make sure we always write a flush, even if our buffers are empty.
+     * We want to flush not only our buffers, but alse ones further down
+     * the connection filters. */
+    apr_bucket *b = apr_bucket_flush_create(io->connection->bucket_alloc);
+    APR_BRIGADE_INSERT_TAIL(io->output, b);
+    return h2_conn_io_flush_int(io, 0, 0);
+}
+
 apr_status_t h2_conn_io_consider_pass(h2_conn_io *io)
 {
     apr_off_t len = 0;
index f0243927d60c5074c3d6b86509552c372c6f2e16..8d71fffcd7b1c92eb3f5686a6428a24d1bb0b183 100644 (file)
@@ -78,6 +78,7 @@ apr_status_t h2_conn_io_write_eoc(h2_conn_io *io, struct h2_session *session);
  * @param flush if a flush bucket should be appended to any output
  */
 apr_status_t h2_conn_io_pass(h2_conn_io *io, int flush);
+apr_status_t h2_conn_io_flush(h2_conn_io *io);
 
 /**
  * Check the amount of buffered output and pass it on if enough has accumulated.
index 86451ecb56775202ded48f78ae8b9f755acb1406..43a4630e9f7a76ee3446705b2576e27659ed807b 100644 (file)
@@ -38,6 +38,7 @@
 #include "h2_io_set.h"
 #include "h2_response.h"
 #include "h2_mplx.h"
+#include "h2_ngn_shed.h"
 #include "h2_request.h"
 #include "h2_stream.h"
 #include "h2_task.h"
@@ -143,10 +144,7 @@ static void h2_mplx_destroy(h2_mplx *m)
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
                   "h2_mplx(%ld): destroy, ios=%d", 
                   m->id, (int)h2_io_set_size(m->stream_ios));
-    m->aborted = 1;
-    
     check_tx_free(m);
-    
     if (m->pool) {
         apr_pool_destroy(m->pool);
     }
@@ -197,7 +195,7 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
             return NULL;
         }
         
-        status = apr_thread_cond_create(&m->task_done, m->pool);
+        status = apr_thread_cond_create(&m->req_added, m->pool);
         if (status != APR_SUCCESS) {
             h2_mplx_destroy(m);
             return NULL;
@@ -217,6 +215,9 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
         
         m->tx_handles_reserved = 0;
         m->tx_chunk_size = 4;
+        
+        m->ngn_shed = h2_ngn_shed_create(m->pool, m->c, m->stream_max_mem);
+        h2_ngn_shed_set_ctx(m->ngn_shed , m);
     }
     return m;
 }
@@ -362,7 +363,7 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
         h2_mplx_set_consumed_cb(m, NULL, NULL);
         
         h2_iq_clear(m->q);
-        apr_thread_cond_broadcast(m->task_done);
+        apr_thread_cond_broadcast(m->req_added);
         while (!h2_io_set_iter(m->stream_ios, stream_done_iter, m)) {
             /* iterate until all ios have been orphaned or destroyed */
         }
@@ -397,8 +398,8 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
                         h2_io_set_iter(m->stream_ios, stream_print, m);
                     }
                 }
-                m->aborted = 1;
-                apr_thread_cond_broadcast(m->task_done);
+                h2_mplx_abort(m);
+                apr_thread_cond_broadcast(m->req_added);
             }
         }
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
@@ -412,15 +413,13 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
 
 void h2_mplx_abort(h2_mplx *m)
 {
-    apr_status_t status;
     int acquired;
     
     AP_DEBUG_ASSERT(m);
-    if (!m->aborted) {
-        if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-            m->aborted = 1;
-            leave_mutex(m, acquired);
-        }
+    if (!m->aborted && enter_mutex(m, &acquired) == APR_SUCCESS) {
+        m->aborted = 1;
+        h2_ngn_shed_abort(m->ngn_shed);
+        leave_mutex(m, acquired);
     }
 }
 
@@ -695,7 +694,8 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams)
 }
 
 static apr_status_t out_write(h2_mplx *m, h2_io *io, 
-                              ap_filter_t* f, apr_bucket_brigade *bb,
+                              ap_filter_t* f, int blocking,
+                              apr_bucket_brigade *bb,
                               apr_table_t *trailers,
                               struct apr_thread_cond_t *iowait)
 {
@@ -719,6 +719,9 @@ static apr_status_t out_write(h2_mplx *m, h2_io *io,
                && iowait
                && (m->stream_max_mem <= h2_io_out_length(io))
                && !is_aborted(m, &status)) {
+            if (!blocking) {
+                return APR_INCOMPLETE;
+            }
             trailers = NULL;
             if (f) {
                 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
@@ -757,7 +760,12 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response,
             check_tx_reservation(m);
         }
         if (bb) {
-            status = out_write(m, io, f, bb, response->trailers, iowait);
+            status = out_write(m, io, f, 0, bb, response->trailers, iowait);
+            if (status == APR_INCOMPLETE) {
+                /* write will have transferred as much data as possible.
+                   caller has to deal with non-empty brigade */
+                status = APR_SUCCESS;
+            }
         }
         have_out_data_for(m, stream_id);
     }
@@ -791,7 +799,8 @@ apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response,
 }
 
 apr_status_t h2_mplx_out_write(h2_mplx *m, int stream_id, 
-                               ap_filter_t* f, apr_bucket_brigade *bb,
+                               ap_filter_t* f, int blocking,
+                               apr_bucket_brigade *bb,
                                apr_table_t *trailers,
                                struct apr_thread_cond_t *iowait)
 {
@@ -802,7 +811,7 @@ apr_status_t h2_mplx_out_write(h2_mplx *m, int stream_id,
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
         if (io && !io->orphaned) {
-            status = out_write(m, io, f, bb, trailers, iowait);
+            status = out_write(m, io, f, blocking, bb, trailers, iowait);
             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
                           "h2_mplx(%ld-%d): write with trailers=%s", 
                           m->id, io->id, trailers? "yes" : "no");
@@ -1111,7 +1120,9 @@ static void task_done(h2_mplx *m, h2_task *task)
         if (task->frozen) {
             /* this task was handed over to an engine for processing */
             h2_task_thaw(task);
-            /* TODO: can we signal an engine that it can now start on this? */
+            /* TODO: not implemented yet... */
+            /*h2_task_set_io_blocking(task, 0);*/
+            apr_thread_cond_broadcast(m->req_added);
         }
         else {
             h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
@@ -1174,7 +1185,6 @@ static void task_done(h2_mplx *m, h2_task *task)
                     /* hang around until the stream deregisteres */
                 }
             }
-            apr_thread_cond_broadcast(m->task_done);
         }
     }
 }
@@ -1337,59 +1347,8 @@ apr_status_t h2_mplx_idle(h2_mplx *m)
  * HTTP/2 request engines
  ******************************************************************************/
 
-typedef struct h2_req_entry h2_req_entry;
-struct h2_req_entry {
-    APR_RING_ENTRY(h2_req_entry) link;
-    request_rec *r;
-};
-
-#define H2_REQ_ENTRY_NEXT(e)   APR_RING_NEXT((e), link)
-#define H2_REQ_ENTRY_PREV(e)   APR_RING_PREV((e), link)
-#define H2_REQ_ENTRY_REMOVE(e) APR_RING_REMOVE((e), link)
-
-typedef struct h2_req_engine_i h2_req_engine_i;
-struct h2_req_engine_i {
-    h2_req_engine pub;
-    conn_rec *c;               /* connection this engine is assigned to */
-    h2_mplx *m;
-    unsigned int shutdown : 1; /* engine is being shut down */
-    apr_thread_cond_t *io;     /* condition var for waiting on data */
-    APR_RING_HEAD(h2_req_entries, h2_req_entry) entries;
-    apr_size_t no_assigned;    /* # of assigned requests */
-    apr_size_t no_live;        /* # of live */
-    apr_size_t no_finished;    /* # of finished */
-};
-
-#define H2_REQ_ENTRIES_SENTINEL(b)     APR_RING_SENTINEL((b), h2_req_entry, link)
-#define H2_REQ_ENTRIES_EMPTY(b)        APR_RING_EMPTY((b), h2_req_entry, link)
-#define H2_REQ_ENTRIES_FIRST(b)        APR_RING_FIRST(b)
-#define H2_REQ_ENTRIES_LAST(b) APR_RING_LAST(b)
-
-#define H2_REQ_ENTRIES_INSERT_HEAD(b, e) do {                          \
-h2_req_entry *ap__b = (e);                                        \
-APR_RING_INSERT_HEAD((b), ap__b, h2_req_entry, link);  \
-} while (0)
-
-#define H2_REQ_ENTRIES_INSERT_TAIL(b, e) do {                          \
-h2_req_entry *ap__b = (e);                                     \
-APR_RING_INSERT_TAIL((b), ap__b, h2_req_entry, link);  \
-} while (0)
-
-static apr_status_t h2_mplx_engine_schedule(h2_mplx *m, 
-                                            h2_req_engine_i *engine, 
-                                            request_rec *r)
-{
-    h2_req_entry *entry = apr_pcalloc(r->pool, sizeof(*entry));
-
-    APR_RING_ELEM_INIT(entry, link);
-    entry->r = r;
-    H2_REQ_ENTRIES_INSERT_TAIL(&engine->entries, entry);
-    return APR_SUCCESS;
-}
-
-
-apr_status_t h2_mplx_engine_push(const char *engine_type, 
-                                 request_rec *r, h2_mplx_engine_init *einit)
+apr_status_t h2_mplx_req_engine_push(const char *ngn_type, 
+                              request_rec *r, h2_req_engine_init *einit)
 {
     apr_status_t status;
     h2_mplx *m;
@@ -1409,63 +1368,7 @@ apr_status_t h2_mplx_engine_push(const char *engine_type,
             status = APR_ECONNABORTED;
         }
         else {
-            h2_req_engine_i *engine = (h2_req_engine_i*)m->engine;
-            
-            apr_table_set(r->connection->notes, H2_TASK_ID_NOTE, task->id);
-            status = APR_EOF;
-            
-            if (task->ser_headers) {
-                /* Max compatibility, deny processing of this */
-            }
-            else if (engine && !strcmp(engine->pub.type, engine_type)) {
-                if (engine->shutdown 
-                    || engine->no_assigned >= H2MIN(engine->pub.capacity, 100)) {
-                    ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
-                                  "h2_mplx(%ld): engine shutdown or over %s", 
-                                  m->c->id, engine->pub.id);
-                    engine = NULL;
-                }
-                else if (h2_mplx_engine_schedule(m, engine, r) == APR_SUCCESS) {
-                    /* this task will be processed in another thread,
-                     * freeze any I/O for the time being. */
-                    h2_task_freeze(task, r);
-                    engine->no_assigned++;
-                    status = APR_SUCCESS;
-                    ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r,
-                                  "h2_mplx(%ld): push request %s", 
-                                  m->c->id, r->the_request);
-                }
-                else {
-                    ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
-                                  "h2_mplx(%ld): engine error adding req %s", 
-                                  m->c->id, engine->pub.id);
-                    engine = NULL;
-                }
-            }
-            
-            if (!engine && einit) {
-                engine = apr_pcalloc(task->c->pool, sizeof(*engine));
-                engine->pub.id = apr_psprintf(task->c->pool, "eng-%ld-%d", 
-                                               m->id, m->next_eng_id++);
-                engine->pub.pool = task->c->pool;
-                engine->pub.type = apr_pstrdup(task->c->pool, engine_type);
-                engine->pub.window_bits = 30;
-                engine->pub.req_window_bits = h2_log2(m->stream_max_mem);
-                engine->c = r->connection;
-                APR_RING_INIT(&engine->entries, h2_req_entry, link);
-                engine->m = m;
-                engine->io = task->io;
-                engine->no_assigned = 1;
-                engine->no_live = 1;
-                
-                status = einit(&engine->pub, r);
-                ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
-                              "h2_mplx(%ld): init engine %s (%s)", 
-                              m->c->id, engine->pub.id, engine->pub.type);
-                if (status == APR_SUCCESS) {
-                    m->engine = &engine->pub;
-                }
-            }
+            status = h2_ngn_shed_push_req(m->ngn_shed, ngn_type, task, r, einit);
         }
         
         leave_mutex(m, acquired);
@@ -1473,163 +1376,66 @@ apr_status_t h2_mplx_engine_push(const char *engine_type,
     return status;
 }
 
-static h2_req_entry *pop_non_frozen(h2_req_engine_i *engine)
-{
-    h2_req_entry *entry;
-    h2_task *task;
-
-    for (entry = H2_REQ_ENTRIES_FIRST(&engine->entries);
-         entry != H2_REQ_ENTRIES_SENTINEL(&engine->entries);
-         entry = H2_REQ_ENTRY_NEXT(entry)) {
-        task = h2_ctx_rget_task(entry->r);
-        AP_DEBUG_ASSERT(task);
-        if (!task->frozen) {
-            H2_REQ_ENTRY_REMOVE(entry);
-            return entry;
-        }
-    }
-    return NULL;
-}
-
-static apr_status_t engine_pull(h2_mplx *m, h2_req_engine_i *engine, 
-                                apr_read_type_e block, request_rec **pr)
-{   
-    h2_req_entry *entry;
-    
-    AP_DEBUG_ASSERT(m);
-    AP_DEBUG_ASSERT(engine);
-    while (1) {
-        if (m->aborted) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
-                          "h2_mplx(%ld): mplx abort while pulling requests %s", 
-                          m->id, engine->pub.id);
-            *pr = NULL;
-            return APR_EOF;
-        }
-        
-        if (!H2_REQ_ENTRIES_EMPTY(&engine->entries) 
-            && (entry = pop_non_frozen(engine))) {
-            ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, entry->r,
-                          "h2_mplx(%ld): request %s pulled by engine %s", 
-                          m->c->id, entry->r->the_request, engine->pub.id);
-            engine->no_live++;
-            entry->r->connection->current_thread = engine->c->current_thread;
-            *pr = entry->r;
-            return APR_SUCCESS;
-        }
-        else if (APR_NONBLOCK_READ == block) {
-            *pr = NULL;
-            return APR_EAGAIN;
-        }
-        else if (H2_REQ_ENTRIES_EMPTY(&engine->entries)) {
-            engine->shutdown = 1;
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                          "h2_mplx(%ld): emtpy queue, shutdown engine %s", 
-                          m->id, engine->pub.id);
-            *pr = NULL;
-            return APR_EOF;
-        }
-        apr_thread_cond_timedwait(m->task_done, m->lock, 
-                                  apr_time_from_msec(100));
-    }
-}
-                                 
-apr_status_t h2_mplx_engine_pull(h2_req_engine *pub_engine, 
-                                 apr_read_type_e block, request_rec **pr)
+apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn, 
+                                     apr_read_type_e block, 
+                                     apr_uint32_t capacity, 
+                                     request_rec **pr)
 {   
-    h2_req_engine_i *engine = (h2_req_engine_i*)pub_engine;
-    h2_mplx *m = engine->m;
+    h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn);
+    h2_mplx *m = h2_ngn_shed_get_ctx(shed);
     apr_status_t status;
     int acquired;
     
     *pr = NULL;
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        status = engine_pull(m, engine, block, pr);
+        int want_shutdown = (block == APR_BLOCK_READ);
+        if (0 && want_shutdown) {
+            /* For a blocking read, check first if requests are to be
+             * had and, if not, wait a short while before doing the
+             * blocking, and if unsuccessful, terminating read.
+             */
+            status = h2_ngn_shed_pull_req(shed, ngn, capacity, 0, pr);
+            if (status != APR_EAGAIN) {
+                return status;
+            }
+            ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, m->c,
+                          "h2_mplx(%ld): start block engine pull", m->id);
+            apr_thread_cond_timedwait(m->req_added, m->lock, 
+                                      apr_time_from_msec(100));
+            ap_log_cerror(APLOG_MARK, APLOG_INFO, status, m->c,
+                          "h2_mplx(%ld): done block engine pull", m->id);
+        }
+        status = h2_ngn_shed_pull_req(shed, ngn, capacity, want_shutdown, pr);
         leave_mutex(m, acquired);
     }
     return status;
 }
  
-static void engine_done(h2_mplx *m, h2_req_engine_i *engine, h2_task *task, 
-                        int waslive, int aborted)
+void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn)
 {
-    int acquired;
-    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
-                  "h2_mplx(%ld): task %s %s by %s", 
-                  m->id, task->id, aborted? "aborted":"done", 
-                  engine->pub.id);
-    h2_task_output_close(task->output);
-    engine->no_finished++;
-    if (waslive) engine->no_live--;
-    engine->no_assigned--;
-    if (task->c != engine->c) { /* do not release what the engine runs on */
-        if (enter_mutex(m, &acquired) == APR_SUCCESS) {
-            task_done(m, task);
-            leave_mutex(m, acquired);
-        }
-    }
-}
-                                
-void h2_mplx_engine_done(h2_req_engine *pub_engine, conn_rec *r_conn)
-{
-    h2_req_engine_i *engine = (h2_req_engine_i*)pub_engine;
-    h2_mplx *m = engine->m;
-    h2_task *task;
+    h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn);
+    h2_mplx *m = h2_ngn_shed_get_ctx(shed);
     int acquired;
 
-    task = h2_ctx_cget_task(r_conn);
-    if (task && (enter_mutex(m, &acquired) == APR_SUCCESS)) {
-        engine_done(m, engine, task, 1, 0);
+    if (enter_mutex(m, &acquired) == APR_SUCCESS) {
+        if (h2_ngn_shed_done_req(shed, ngn, r_conn) == APR_SUCCESS) {
+            h2_task *task = h2_ctx_cget_task(r_conn);
+            if (task) {
+                task_done(m, task);
+            }
+        }
         leave_mutex(m, acquired);
     }
 }
                                 
-void h2_mplx_engine_exit(h2_req_engine *pub_engine)
+void h2_mplx_req_engine_exit(h2_req_engine *ngn)
 {
-    h2_req_engine_i *engine = (h2_req_engine_i*)pub_engine;
-    h2_mplx *m = engine->m;
+    h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn);
+    h2_mplx *m = h2_ngn_shed_get_ctx(shed);
     int acquired;
     
     if (enter_mutex(m, &acquired) == APR_SUCCESS) {
-        if (!m->aborted 
-            && !H2_REQ_ENTRIES_EMPTY(&engine->entries)) {
-            h2_req_entry *entry;
-            ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
-                          "h2_mplx(%ld): exit engine %s (%s), "
-                          "has still requests queued, shutdown=%d,"
-                          "assigned=%ld, live=%ld, finished=%ld", 
-                          m->c->id, engine->pub.id, engine->pub.type,
-                          engine->shutdown, 
-                          (long)engine->no_assigned, (long)engine->no_live,
-                          (long)engine->no_finished);
-            for (entry = H2_REQ_ENTRIES_FIRST(&engine->entries);
-                 entry != H2_REQ_ENTRIES_SENTINEL(&engine->entries);
-                 entry = H2_REQ_ENTRY_NEXT(entry)) {
-                request_rec *r = entry->r;
-                h2_task *task = h2_ctx_rget_task(r);
-                ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
-                              "h2_mplx(%ld): engine %s has queued task %s, "
-                              "frozen=%d, aborting",
-                              m->c->id, engine->pub.id, task->id, task->frozen);
-                engine_done(m, engine, task, 0, 1);
-            }
-        }
-        if (!m->aborted && (engine->no_assigned > 1 || engine->no_live > 1)) {
-            ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
-                          "h2_mplx(%ld): exit engine %s (%s), "
-                          "assigned=%ld, live=%ld, finished=%ld", 
-                          m->c->id, engine->pub.id, engine->pub.type,
-                          (long)engine->no_assigned, (long)engine->no_live,
-                          (long)engine->no_finished);
-        }
-        else {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                          "h2_mplx(%ld): exit engine %s (%s)", 
-                          m->c->id, engine->pub.id, engine->pub.type);
-        }
-        if (m->engine == &engine->pub) {
-            m->engine = NULL; /* TODO */
-        }
+        h2_ngn_shed_done_ngn(shed, ngn);
         leave_mutex(m, acquired);
     }
 }
index 4d6ce7c0d5a5d0d4181cd567367803ba525fab13..368d92fc967f84ee658994ff3733ddcc18229bff 100644 (file)
@@ -47,6 +47,7 @@ struct h2_io_set;
 struct apr_thread_cond_t;
 struct h2_workers;
 struct h2_int_queue;
+struct h2_ngn_shed;
 struct h2_req_engine;
 
 #include <apr_queue.h>
@@ -75,19 +76,19 @@ struct h2_mplx {
     struct h2_io_set *ready_ios;
     struct h2_io_set *redo_ios;
     
-    int max_stream_started;      /* highest stream id that started processing */
-    int workers_busy;            /* # of workers processing on this mplx */
-    int workers_limit;           /* current # of workers limit, dynamic */
-    int workers_def_limit;       /* default # of workers limit */
-    int workers_max;             /* max, hard limit # of workers in a process */
-    apr_time_t last_idle_block;  /* last time, this mplx entered IDLE while
-                                  * streams were ready */
-    apr_time_t last_limit_change;/* last time, worker limit changed */
+    apr_uint32_t max_stream_started; /* highest stream id that started processing */
+    apr_uint32_t workers_busy;       /* # of workers processing on this mplx */
+    apr_uint32_t workers_limit;      /* current # of workers limit, dynamic */
+    apr_uint32_t workers_def_limit;  /* default # of workers limit */
+    apr_uint32_t workers_max;        /* max, hard limit # of workers in a process */
+    apr_time_t last_idle_block;      /* last time, this mplx entered IDLE while
+                                      * streams were ready */
+    apr_time_t last_limit_change;    /* last time, worker limit changed */
     apr_interval_time_t limit_change_interval;
 
     apr_thread_mutex_t *lock;
     struct apr_thread_cond_t *added_output;
-    struct apr_thread_cond_t *task_done;
+    struct apr_thread_cond_t *req_added;
     struct apr_thread_cond_t *join_wait;
     
     apr_size_t stream_max_mem;
@@ -102,11 +103,8 @@ struct h2_mplx {
     
     h2_mplx_consumed_cb *input_consumed;
     void *input_consumed_ctx;
-    
-    struct h2_req_engine *engine;
-    /* TODO: signal for waiting tasks*/
-    apr_queue_t *engine_queue;
-    int next_eng_id;
+
+    struct h2_ngn_shed *ngn_shed;
 };
 
 
@@ -308,12 +306,16 @@ apr_status_t h2_mplx_out_open(h2_mplx *mplx, int stream_id,
  * of bytes buffered reaches configured max.
  * @param stream_id the stream identifier
  * @param filter the apache filter context of the data
+ * @param blocking == 0 iff call should return with APR_INCOMPLETE if
+ *                 the full brigade cannot be written at once
  * @param bb the bucket brigade to append
  * @param trailers optional trailers for response, maybe NULL
  * @param iowait a conditional used for block/signalling in h2_mplx
  */
 apr_status_t h2_mplx_out_write(h2_mplx *mplx, int stream_id, 
-                               ap_filter_t* filter, apr_bucket_brigade *bb,
+                               ap_filter_t* filter, 
+                               int blocking,
+                               apr_bucket_brigade *bb,
                                apr_table_t *trailers,
                                struct apr_thread_cond_t *iowait);
 
@@ -408,20 +410,24 @@ APR_RING_INSERT_TAIL((b), ap__b, h2_mplx, link);  \
 apr_status_t h2_mplx_idle(h2_mplx *m);
 
 /*******************************************************************************
- * h2_mplx h2_req_engine handling.
+ * h2_req_engine handling
  ******************************************************************************/
-typedef apr_status_t h2_mplx_engine_init(struct h2_req_engine *engine, 
-                                         request_rec *r);
-
-apr_status_t h2_mplx_engine_push(const char *engine_type, 
-                                 request_rec *r, h2_mplx_engine_init *einit);
-                                 
-apr_status_t h2_mplx_engine_pull(struct h2_req_engine *engine, 
-                                 apr_read_type_e block, request_rec **pr);
-
-void h2_mplx_engine_done(struct h2_req_engine *engine, conn_rec *r_conn);
-                                 
-void h2_mplx_engine_exit(struct h2_req_engine *engine);
+
+typedef apr_status_t h2_mplx_req_engine_init(struct h2_req_engine *engine, 
+                                             const char *id, 
+                                             const char *type,
+                                             apr_pool_t *pool, 
+                                             apr_uint32_t req_buffer_size,
+                                             request_rec *r);
+
+apr_status_t h2_mplx_req_engine_push(const char *ngn_type, 
+                                     request_rec *r, 
+                                     h2_mplx_req_engine_init *einit);
+apr_status_t h2_mplx_req_engine_pull(struct h2_req_engine *ngn, 
+                                     apr_read_type_e block, 
+                                     apr_uint32_t capacity, 
+                                     request_rec **pr);
+void h2_mplx_req_engine_done(struct h2_req_engine *ngn, conn_rec *r_conn);
+void h2_mplx_req_engine_exit(struct h2_req_engine *ngn);
 
 #endif /* defined(__mod_h2__h2_mplx__) */
diff --git a/modules/http2/h2_ngn_shed.c b/modules/http2/h2_ngn_shed.c
new file mode 100644 (file)
index 0000000..e8e6755
--- /dev/null
@@ -0,0 +1,333 @@
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <assert.h>
+#include <stddef.h>
+#include <stdlib.h>
+
+#include <apr_thread_mutex.h>
+#include <apr_thread_cond.h>
+#include <apr_strings.h>
+#include <apr_time.h>
+
+#include <httpd.h>
+#include <http_core.h>
+#include <http_log.h>
+
+#include "mod_http2.h"
+
+#include "h2_private.h"
+#include "h2_config.h"
+#include "h2_conn.h"
+#include "h2_ctx.h"
+#include "h2_h2.h"
+#include "h2_int_queue.h"
+#include "h2_response.h"
+#include "h2_request.h"
+#include "h2_task.h"
+#include "h2_task_output.h"
+#include "h2_util.h"
+#include "h2_ngn_shed.h"
+
+
+typedef struct h2_ngn_entry h2_ngn_entry;
+struct h2_ngn_entry {
+    APR_RING_ENTRY(h2_ngn_entry) link;
+    request_rec *r;
+};
+
+#define H2_NGN_ENTRY_NEXT(e)   APR_RING_NEXT((e), link)
+#define H2_NGN_ENTRY_PREV(e)   APR_RING_PREV((e), link)
+#define H2_NGN_ENTRY_REMOVE(e) APR_RING_REMOVE((e), link)
+
+#define H2_REQ_ENTRIES_SENTINEL(b)     APR_RING_SENTINEL((b), h2_ngn_entry, link)
+#define H2_REQ_ENTRIES_EMPTY(b)        APR_RING_EMPTY((b), h2_ngn_entry, link)
+#define H2_REQ_ENTRIES_FIRST(b)        APR_RING_FIRST(b)
+#define H2_REQ_ENTRIES_LAST(b) APR_RING_LAST(b)
+
+#define H2_REQ_ENTRIES_INSERT_HEAD(b, e) do {                          \
+h2_ngn_entry *ap__b = (e);                                        \
+APR_RING_INSERT_HEAD((b), ap__b, h2_ngn_entry, link);  \
+} while (0)
+
+#define H2_REQ_ENTRIES_INSERT_TAIL(b, e) do {                          \
+h2_ngn_entry *ap__b = (e);                                     \
+APR_RING_INSERT_TAIL((b), ap__b, h2_ngn_entry, link);  \
+} while (0)
+
+struct h2_req_engine {
+    const char *id;        /* identifier */
+    const char *type;      /* name of the engine type */
+    apr_pool_t *pool;      /* pool for engine specific allocations */
+    conn_rec *c;           /* connection this engine is assigned to */
+    h2_ngn_shed *shed;
+
+    unsigned int shutdown : 1; /* engine is being shut down */
+
+    APR_RING_HEAD(h2_req_entries, h2_ngn_entry) entries;
+    apr_uint32_t capacity;     /* maximum concurrent requests */
+    apr_uint32_t no_assigned;  /* # of assigned requests */
+    apr_uint32_t no_live;      /* # of live */
+    apr_uint32_t no_finished;  /* # of finished */
+
+    apr_thread_cond_t *io;     /* condition var for waiting on data */
+};
+
+h2_ngn_shed *h2_ngn_shed_create(apr_pool_t *pool, conn_rec *c,
+                                apr_uint32_t req_buffer_size)
+{
+    h2_ngn_shed *shed;
+    
+    shed = apr_pcalloc(pool, sizeof(*shed));
+    shed->c = c;
+    shed->pool = pool;
+    shed->req_buffer_size = req_buffer_size;
+    shed->ngns = apr_hash_make(pool);
+    
+    return shed;
+}
+
+void h2_ngn_shed_set_ctx(h2_ngn_shed *shed, void *user_ctx)
+{
+    shed->user_ctx = user_ctx;
+}
+
+void *h2_ngn_shed_get_ctx(h2_ngn_shed *shed)
+{
+    return shed->user_ctx;
+}
+
+h2_ngn_shed *h2_ngn_shed_get_shed(h2_req_engine *ngn)
+{
+    return ngn->shed;
+}
+
+void h2_ngn_shed_abort(h2_ngn_shed *shed)
+{
+    shed->aborted = 1;
+}
+
+static apr_status_t ngn_schedule(h2_req_engine *ngn, request_rec *r)
+{
+    h2_ngn_entry *entry = apr_pcalloc(r->pool, sizeof(*entry));
+
+    APR_RING_ELEM_INIT(entry, link);
+    entry->r = r;
+    H2_REQ_ENTRIES_INSERT_TAIL(&ngn->entries, entry);
+    return APR_SUCCESS;
+}
+
+
+apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type, 
+                                  h2_task *task, request_rec *r, 
+                                  h2_req_engine_init *einit){
+    h2_req_engine *ngn;
+    apr_status_t status = APR_EOF;
+
+    AP_DEBUG_ASSERT(shed);
+    
+    apr_table_set(r->connection->notes, H2_TASK_ID_NOTE, task->id);
+    if (task->ser_headers) {
+        /* Max compatibility, deny processing of this */
+        return APR_EOF;
+    }
+    
+    ngn = apr_hash_get(shed->ngns, ngn_type, APR_HASH_KEY_STRING);
+    if (ngn) {
+        if (ngn->shutdown) {
+            ap_log_rerror(APLOG_MARK, APLOG_INFO, 0, r,
+                          "h2_ngn_shed(%ld): %s in shutdown", 
+                          shed->c->id, ngn->id);
+            ngn = NULL;
+        }
+        else if (ngn->no_assigned >= ngn->capacity) {
+            ap_log_rerror(APLOG_MARK, APLOG_INFO, 0, r,
+                          "h2_ngn_shed(%ld): %s over capacity %d/%d", 
+                          shed->c->id, ngn->id, ngn->no_assigned,
+                          ngn->capacity);
+            ngn = NULL;
+        }
+        else if (ngn_schedule(ngn, r) == APR_SUCCESS) {
+            /* this task will be processed in another thread,
+             * freeze any I/O for the time being. */
+            h2_task_freeze(task, r);
+            ngn->no_assigned++;
+            status = APR_SUCCESS;
+            ap_log_rerror(APLOG_MARK, APLOG_INFO, status, r,
+                          "h2_ngn_shed(%ld): pushed request %s to %s", 
+                          shed->c->id, task->id, ngn->id);
+        }
+        else {
+            ap_log_rerror(APLOG_MARK, APLOG_INFO, status, r,
+                          "h2_ngn_shed(%ld): engine error adding req %s", 
+                          shed->c->id, ngn->id);
+            ngn = NULL;
+        }
+    }
+    
+    if (!ngn && einit) {
+        ngn = apr_pcalloc(task->c->pool, sizeof(*ngn));
+        ngn->id = apr_psprintf(task->c->pool, "ngn-%ld-%d", 
+                                   shed->c->id, shed->next_ngn_id++);
+        ngn->pool = task->c->pool;
+        ngn->type = apr_pstrdup(task->c->pool, ngn_type);
+        ngn->c = r->connection;
+        APR_RING_INIT(&ngn->entries, h2_ngn_entry, link);
+        ngn->shed = shed;
+        ngn->capacity = 100;
+        ngn->io = task->io;
+        ngn->no_assigned = 1;
+        ngn->no_live = 1;
+        
+        status = einit(ngn, ngn->id, ngn->type, ngn->pool,
+                       shed->req_buffer_size, r);
+        ap_log_rerror(APLOG_MARK, APLOG_INFO, status, r,
+                      "h2_ngn_shed(%ld): init engine %s (%s)", 
+                      shed->c->id, ngn->id, ngn->type);
+        if (status == APR_SUCCESS) {
+            apr_hash_set(shed->ngns, ngn->type, APR_HASH_KEY_STRING, ngn);
+        }
+    }
+    return status;
+}
+
+static h2_ngn_entry *pop_non_frozen(h2_req_engine *ngn)
+{
+    h2_ngn_entry *entry;
+    h2_task *task;
+
+    for (entry = H2_REQ_ENTRIES_FIRST(&ngn->entries);
+         entry != H2_REQ_ENTRIES_SENTINEL(&ngn->entries);
+         entry = H2_NGN_ENTRY_NEXT(entry)) {
+        task = h2_ctx_rget_task(entry->r);
+        AP_DEBUG_ASSERT(task);
+        if (!task->frozen) {
+            H2_NGN_ENTRY_REMOVE(entry);
+            return entry;
+        }
+    }
+    return NULL;
+}
+
+apr_status_t h2_ngn_shed_pull_req(h2_ngn_shed *shed, 
+                                  h2_req_engine *ngn, 
+                                  apr_uint32_t capacity, 
+                                  int want_shutdown,
+                                  request_rec **pr)
+{   
+    h2_ngn_entry *entry;
+    
+    AP_DEBUG_ASSERT(ngn);
+    *pr = NULL;
+    if (shed->aborted) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, shed->c,
+                      "h2_ngn_shed(%ld): abort while pulling requests %s", 
+                      shed->c->id, ngn->id);
+        return APR_EOF;
+    }
+    
+    ngn->capacity = capacity;
+    if (!H2_REQ_ENTRIES_EMPTY(&ngn->entries) 
+        && (entry = pop_non_frozen(ngn))) {
+        ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, entry->r,
+                      "h2_ngn_shed(%ld): pulled request %s for engine %s", 
+                      shed->c->id, entry->r->the_request, ngn->id);
+        ngn->no_live++;
+        entry->r->connection->current_thread = ngn->c->current_thread;
+        *pr = entry->r;
+        return APR_SUCCESS;
+    }
+    else if (want_shutdown) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c,
+                      "h2_ngn_shed(%ld): emtpy queue, shutdown engine %s", 
+                      shed->c->id, ngn->id);
+        ngn->shutdown = 1;
+        return APR_EOF;
+    }
+    return APR_EAGAIN;
+}
+                                 
+static apr_status_t ngn_done_task(h2_ngn_shed *shed, h2_req_engine *ngn, h2_task *task, 
+                                  int waslive, int aborted)
+{
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c,
+                  "h2_ngn_shed(%ld): task %s %s by %s", 
+                  shed->c->id, task->id, aborted? "aborted":"done", ngn->id);
+    h2_task_output_close(task->output);
+    ngn->no_finished++;
+    if (waslive) ngn->no_live--;
+    ngn->no_assigned--;
+    if (task->c != ngn->c) { /* do not release what the engine runs on */
+        return APR_SUCCESS;
+    }
+    return APR_EAGAIN;
+}
+                                
+apr_status_t h2_ngn_shed_done_req(h2_ngn_shed *shed, 
+                                  h2_req_engine *ngn, conn_rec *r_conn)
+{
+    h2_task *task = h2_ctx_cget_task(r_conn);
+    if (task) {
+        return ngn_done_task(shed, ngn, task, 1, 0);
+    }
+    return APR_ECONNABORTED;
+}
+                                
+void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn)
+{
+    h2_req_engine *existing;
+    
+    if (!shed->aborted 
+        && !H2_REQ_ENTRIES_EMPTY(&ngn->entries)) {
+        h2_ngn_entry *entry;
+        ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, shed->c,
+                      "h2_ngn_shed(%ld): exit engine %s (%s), "
+                      "has still requests queued, shutdown=%d,"
+                      "assigned=%ld, live=%ld, finished=%ld", 
+                      shed->c->id, ngn->id, ngn->type,
+                      ngn->shutdown, 
+                      (long)ngn->no_assigned, (long)ngn->no_live,
+                      (long)ngn->no_finished);
+        for (entry = H2_REQ_ENTRIES_FIRST(&ngn->entries);
+             entry != H2_REQ_ENTRIES_SENTINEL(&ngn->entries);
+             entry = H2_NGN_ENTRY_NEXT(entry)) {
+            request_rec *r = entry->r;
+            h2_task *task = h2_ctx_rget_task(r);
+            ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, shed->c,
+                          "h2_ngn_shed(%ld): engine %s has queued task %s, "
+                          "frozen=%d, aborting",
+                          shed->c->id, ngn->id, task->id, task->frozen);
+            ngn_done_task(shed, ngn, task, 0, 1);
+        }
+    }
+    if (!shed->aborted && (ngn->no_assigned > 1 || ngn->no_live > 1)) {
+        ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, shed->c,
+                      "h2_ngn_shed(%ld): exit engine %s (%s), "
+                      "assigned=%ld, live=%ld, finished=%ld", 
+                      shed->c->id, ngn->id, ngn->type,
+                      (long)ngn->no_assigned, (long)ngn->no_live,
+                      (long)ngn->no_finished);
+    }
+    else {
+        ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, shed->c,
+                      "h2_ngn_shed(%ld): exit engine %s (%s)", 
+                      shed->c->id, ngn->id, ngn->type);
+    }
+    
+    existing = apr_hash_get(shed->ngns, ngn->type, APR_HASH_KEY_STRING);
+    if (existing == ngn) {
+        apr_hash_set(shed->ngns, ngn->type, APR_HASH_KEY_STRING, NULL);
+    }
+}
diff --git a/modules/http2/h2_ngn_shed.h b/modules/http2/h2_ngn_shed.h
new file mode 100644 (file)
index 0000000..abed58a
--- /dev/null
@@ -0,0 +1,64 @@
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef h2_req_shed_h
+#define h2_req_shed_h
+
+struct h2_req_engine;
+
+typedef struct h2_ngn_shed h2_ngn_shed;
+struct h2_ngn_shed {
+    conn_rec *c;
+    apr_pool_t *pool;
+    apr_hash_t *ngns;
+    int next_ngn_id;
+    void *user_ctx;
+    
+    unsigned int aborted : 1;
+    apr_uint32_t req_buffer_size; /* preferred buffer size for responses */
+};
+
+typedef apr_status_t h2_shed_ngn_init(h2_req_engine *engine, 
+                                      const char *id, 
+                                      const char *type,
+                                      apr_pool_t *pool, 
+                                      apr_uint32_t req_buffer_size,
+                                      request_rec *r);
+
+h2_ngn_shed *h2_ngn_shed_create(apr_pool_t *pool, conn_rec *c,
+                                apr_uint32_t req_buffer_size); 
+
+void h2_ngn_shed_set_ctx(h2_ngn_shed *shed, void *user_ctx);
+void *h2_ngn_shed_get_ctx(h2_ngn_shed *shed);
+
+h2_ngn_shed *h2_ngn_shed_get_shed(struct h2_req_engine *ngn);
+
+void h2_ngn_shed_abort(h2_ngn_shed *shed);
+
+apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type, 
+                                  struct h2_task *task, request_rec *r, 
+                                  h2_shed_ngn_init *init_cb);
+
+apr_status_t h2_ngn_shed_pull_req(h2_ngn_shed *shed, h2_req_engine *pub_ngn, 
+                                  apr_uint32_t capacity, 
+                                  int want_shutdown, request_rec **pr);
+
+apr_status_t h2_ngn_shed_done_req(h2_ngn_shed *shed, 
+                                  struct h2_req_engine *ngn, conn_rec *r_conn);
+
+void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn);
+
+
+#endif /* h2_req_shed_h */
index 9e6ef35755ee26819d48e150afc2b47b800d85de..a5c30e77faa9c9e0ec2b2e418233fad54d188d22 100644 (file)
@@ -17,6 +17,7 @@
 #include <apr_strings.h>
 #include <nghttp2/nghttp2.h>
 
+#include <mpm_common.h>
 #include <httpd.h>
 #include <mod_proxy.h>
 #include <mod_http2.h>
@@ -91,23 +92,11 @@ static int proxy_pass_brigade(apr_bucket_alloc_t *bucket_alloc,
      * issues in case of error returned below. */
     apr_brigade_cleanup(bb);
     if (status != APR_SUCCESS) {
-        ap_log_cerror(APLOG_MARK, APLOG_ERR, status, origin, APLOGNO(01084)
-                      "pass request body failed to %pI (%s)",
+        ap_log_cerror(APLOG_MARK, APLOG_ERR, status, origin, APLOGNO()
+                      "pass output failed to %pI (%s)",
                       p_conn->addr, p_conn->hostname);
-        if (origin->aborted) {
-            const char *ssl_note;
-
-            if (((ssl_note = apr_table_get(origin->notes, "SSL_connect_rv"))
-                 != NULL) && (strcmp(ssl_note, "err") == 0)) {
-                return HTTP_INTERNAL_SERVER_ERROR;
-            }
-            return HTTP_GATEWAY_TIME_OUT;
-        }
-        else {
-            return HTTP_BAD_REQUEST;
-        }
     }
-    return OK;
+    return status;
 }
 
 static ssize_t raw_send(nghttp2_session *ngh2, const uint8_t *data,
@@ -118,19 +107,19 @@ static ssize_t raw_send(nghttp2_session *ngh2, const uint8_t *data,
     apr_status_t status;
     int flush = 1;
 
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, 
-                  "h2_proxy_sesssion(%s): raw_send %d bytes, flush=%d", 
-                  session->id, (int)length, flush);
-    b = apr_bucket_transient_create((const char*)data, length, 
-                                    session->c->bucket_alloc);
-    APR_BRIGADE_INSERT_TAIL(session->output, b);
+    if (data) {
+        b = apr_bucket_transient_create((const char*)data, length, 
+                                        session->c->bucket_alloc);
+        APR_BRIGADE_INSERT_TAIL(session->output, b);
+    }
 
     status = proxy_pass_brigade(session->c->bucket_alloc,  
                                 session->p_conn, session->c, 
                                 session->output, flush);
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, 
+                  "h2_proxy_sesssion(%s): raw_send %d bytes, flush=%d", 
+                  session->id, (int)length, flush);
     if (status != APR_SUCCESS) {
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, 
-                      "h2_proxy_sesssion(%s): sending", session->id);
         return NGHTTP2_ERR_CALLBACK_FAILURE;
     }
     return length;
@@ -146,7 +135,7 @@ static int on_frame_recv(nghttp2_session *ngh2, const nghttp2_frame *frame,
         
         h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03341)
-                      "h2_session(%s): recv FRAME[%s]",
+                      "h2_proxy_session(%s): recv FRAME[%s]",
                       session->id, buffer);
     }
 
@@ -167,7 +156,7 @@ static int on_frame_recv(nghttp2_session *ngh2, const nghttp2_frame *frame,
                 
                 h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
                 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03342)
-                              "h2_session(%s): recv FRAME[%s]",
+                              "h2_proxy_session(%s): recv FRAME[%s]",
                               session->id, buffer);
             }
             break;
@@ -186,7 +175,7 @@ static int before_frame_send(nghttp2_session *ngh2,
 
         h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03343)
-                      "h2_session(%s): sent FRAME[%s]",
+                      "h2_proxy_session(%s): sent FRAME[%s]",
                       session->id, buffer);
     }
     return 0;
@@ -339,9 +328,13 @@ static int on_data_chunk_recv(nghttp2_session *ngh2, uint8_t flags,
     apr_bucket *b;
     apr_status_t status;
     
-    nghttp2_session_consume(ngh2, stream_id, len);
+    /*nghttp2_session_consume(ngh2, stream_id, len);*/
     stream = nghttp2_session_get_stream_user_data(ngh2, stream_id);
     if (!stream) {
+        ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, stream->r, 
+                      "h2_proxy_session(%s): recv data chunk for "
+                      "unknown stream %d, ignored", 
+                      session->id, stream_id);
         return 0;
     }
     
@@ -359,10 +352,14 @@ static int on_data_chunk_recv(nghttp2_session *ngh2, uint8_t flags,
         b = apr_bucket_flush_create(stream->r->connection->bucket_alloc);
         APR_BRIGADE_INSERT_TAIL(stream->output, b);
     }
+    
+    ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, stream->r, 
+                  "h2_proxy_session(%s): pass response data for "
+                  "stream %d, %d bytes", session->id, stream_id, (int)len);
     status = ap_pass_brigade(stream->r->output_filters, stream->output);
     if (status != APR_SUCCESS) {
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, APLOGNO(03344)
-                      "h2_session(%s-%d): passing output", 
+                      "h2_proxy_session(%s): passing output on stream %d", 
                       session->id, stream->id);
         nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE,
                                   stream_id, NGHTTP2_STREAM_CLOSED);
@@ -375,6 +372,9 @@ static int on_stream_close(nghttp2_session *ngh2, int32_t stream_id,
                            uint32_t error_code, void *user_data) 
 {
     h2_proxy_session *session = user_data;
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO()
+                  "h2_proxy_session(%s): stream=%d, closed, err=%d", 
+                  session->id, stream_id, error_code);
     dispatch_event(session, H2_PROXYS_EV_STREAM_DONE, stream_id, NULL);
     return 0;
 }
@@ -415,6 +415,9 @@ static ssize_t stream_data_read(nghttp2_session *ngh2, int32_t stream_id,
     *data_flags = 0;
     stream = nghttp2_session_get_stream_user_data(ngh2, stream_id);
     if (!stream) {
+        ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, stream->r, 
+                      "h2_proxy_stream(%s): data_read, stream %d not found", 
+                      stream->session->id, stream_id);
         return NGHTTP2_ERR_CALLBACK_FAILURE;
     }
     
@@ -492,6 +495,7 @@ h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn,
         h2_proxy_session *session;
         nghttp2_session_callbacks *cbs;
         nghttp2_option *option;
+        ap_filter_t *f;
         
         session = apr_pcalloc(pool, sizeof(*session));
         apr_pool_pre_cleanup_register(pool, p_conn, proxy_session_pre_close);
@@ -522,6 +526,7 @@ h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn,
         
         nghttp2_option_new(&option);
         nghttp2_option_set_peer_max_concurrent_streams(option, 100);
+        nghttp2_option_set_no_auto_window_update(option, 0);
         
         nghttp2_session_client_new2(&session->ngh2, cbs, session, option);
         
@@ -530,6 +535,14 @@ h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn,
 
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
                       "setup session for %s", p_conn->hostname);
+                      
+        f = session->c->input_filters;
+        while (f) {
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
+                          "h2_proxy_session(%s): c->input_filter %s", 
+                          session->id, f->frec->name);
+            f = f->next;
+        }
         
     }
     return p_conn->data;
@@ -539,6 +552,12 @@ static apr_status_t session_start(h2_proxy_session *session)
 {
     nghttp2_settings_entry settings[2];
     int rv, add_conn_window;
+    apr_socket_t *s;
+    
+    s = ap_get_conn_socket(session->c);
+    if (s) {
+        ap_sock_disable_nagle(s);
+    }
     
     settings[0].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH;
     settings[0].value = 0;
@@ -557,47 +576,6 @@ static apr_status_t session_start(h2_proxy_session *session)
     return rv? APR_EGENERAL : APR_SUCCESS;
 }
 
-static apr_status_t feed_brigade(h2_proxy_session *session, apr_bucket_brigade *bb)
-{
-    apr_status_t status = APR_SUCCESS;
-    apr_size_t readlen = 0;
-    ssize_t n;
-    
-    while (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) {
-        apr_bucket* b = APR_BRIGADE_FIRST(bb);
-        
-        if (!APR_BUCKET_IS_METADATA(b)) {
-            const char *bdata = NULL;
-            apr_size_t blen = 0;
-            
-            status = apr_bucket_read(b, &bdata, &blen, APR_NONBLOCK_READ);
-            if (status == APR_SUCCESS && blen > 0) {
-                n = nghttp2_session_mem_recv(session->ngh2, (const uint8_t *)bdata, blen);
-                if (n < 0) {
-                    if (nghttp2_is_fatal((int)n)) {
-                        return APR_EGENERAL;
-                    }
-                }
-                else {
-                    readlen += n;
-                    if (n < blen) {
-                        apr_bucket_split(b, n);
-                    }
-                }
-            }
-        }
-        apr_bucket_delete(b);
-    }
-    
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, 
-                  "h2_session(%s): fed %ld bytes of input to session", 
-                  session->id, (long)readlen);
-    if (readlen == 0 && status == APR_SUCCESS) {
-        return APR_EAGAIN;
-    }
-    return status;
-}
-
 static apr_status_t open_stream(h2_proxy_session *session, const char *url,
                                 request_rec *r, h2_proxy_stream **pstream)
 {
@@ -668,13 +646,13 @@ static apr_status_t submit_stream(h2_proxy_session *session, h2_proxy_stream *st
         const char *task_id = apr_table_get(stream->r->connection->notes, 
                                             H2_TASK_ID_NOTE);
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
-                      "h2_session(%s): submit %s%s -> %d (task %s)", 
+                      "h2_proxy_session(%s): submit %s%s -> %d (task %s)", 
                       session->id, stream->req->authority, stream->req->path,
                       rv, task_id);
     }
     else {
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
-                      "h2_session(%s-%d): submit %s%s", 
+                      "h2_proxy_session(%s-%d): submit %s%s", 
                       session->id, rv, stream->req->authority, stream->req->path);
     }
     
@@ -689,50 +667,114 @@ static apr_status_t submit_stream(h2_proxy_session *session, h2_proxy_stream *st
     return APR_EGENERAL;
 }
 
-static apr_status_t h2_proxy_session_read(h2_proxy_session *session, int block, 
-                                          apr_interval_time_t timeout)
+static apr_status_t feed_brigade(h2_proxy_session *session, apr_bucket_brigade *bb)
 {
-    apr_status_t status;
-    apr_socket_t *socket = NULL;
-    apr_time_t save_timeout = -1;
+    apr_status_t status = APR_SUCCESS;
+    apr_size_t readlen = 0;
+    ssize_t n;
     
-    if (block) {
-        socket = ap_get_conn_socket(session->c);
-        if (socket) {
-            apr_socket_timeout_get(socket, &save_timeout);
-            apr_socket_timeout_set(socket, timeout);
+    while (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) {
+        apr_bucket* b = APR_BRIGADE_FIRST(bb);
+        
+        if (APR_BUCKET_IS_METADATA(b)) {
+            if (APR_BUCKET_IS_EOS(b)) {
+                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
+                              "h2_proxy_session(%s): read EOS from conn", 
+                              session->id);
+            }
+            else if (APR_BUCKET_IS_FLUSH(b)) {
+                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
+                              "h2_proxy_session(%s): read FLUSH from conn", 
+                              session->id);
+            }
+            else {
+                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
+                              "h2_proxy_session(%s): read unkown META from conn", 
+                              session->id);
+            }
         }
         else {
-            /* cannot block on timeout */
-            ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, session->c, 
-                          "h2_session(%s): unable to get conn socket", 
-                          session->id);
-            return APR_ENOTIMPL;
+            const char *bdata = NULL;
+            apr_size_t blen = 0;
+            
+            status = apr_bucket_read(b, &bdata, &blen, APR_BLOCK_READ);
+            if (status == APR_SUCCESS && blen > 0) {
+                n = nghttp2_session_mem_recv(session->ngh2, (const uint8_t *)bdata, blen);
+                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
+                              "h2_proxy_session(%s): feeding %ld bytes -> %ld", 
+                              session->id, (long)blen, (long)n);
+                if (n < 0) {
+                    if (nghttp2_is_fatal((int)n)) {
+                        status = APR_EGENERAL;
+                    }
+                }
+                else {
+                    readlen += n;
+                    if (n < blen) {
+                        apr_bucket_split(b, n);
+                    }
+                }
+            }
         }
+        apr_bucket_delete(b);
     }
     
-    status = ap_get_brigade(session->c->input_filters, session->input, 
-                            AP_MODE_READBYTES, 
-                            block? APR_BLOCK_READ : APR_NONBLOCK_READ, 
-                            64 * 1024);
-    if (socket && save_timeout != -1) {
-            apr_socket_timeout_set(socket, save_timeout);
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, 
+                  "h2_proxy_session(%s): fed %ld bytes of input to session", 
+                  session->id, (long)readlen);
+    if (readlen == 0 && status == APR_SUCCESS) {
+        return APR_EAGAIN;
     }
+    return status;
+}
+
+static apr_status_t h2_proxy_session_read(h2_proxy_session *session, int block, 
+                                          apr_interval_time_t timeout)
+{
+    apr_status_t status = APR_SUCCESS;
     
-    if (status == APR_SUCCESS) {
-        if (APR_BRIGADE_EMPTY(session->input)) {
-            status = APR_EAGAIN;
+    if (APR_BRIGADE_EMPTY(session->input)) {
+        apr_socket_t *socket = NULL;
+        apr_time_t save_timeout = -1;
+        
+        if (block) {
+            socket = ap_get_conn_socket(session->c);
+            if (socket) {
+                apr_socket_timeout_get(socket, &save_timeout);
+                apr_socket_timeout_set(socket, timeout);
+            }
+            else {
+                /* cannot block on timeout */
+                ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, session->c, 
+                              "h2_proxy_session(%s): unable to get conn socket", 
+                              session->id);
+                return APR_ENOTIMPL;
+            }
         }
-        else {
-            feed_brigade(session, session->input);
+        
+        status = ap_get_brigade(session->c->input_filters, session->input, 
+                                AP_MODE_READBYTES, 
+                                block? APR_BLOCK_READ : APR_NONBLOCK_READ, 
+                                64 * 1024);
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, 
+                      "h2_proxy_session(%s): read from conn", session->id);
+        if (socket && save_timeout != -1) {
+            apr_socket_timeout_set(socket, save_timeout);
         }
     }
+    
+    if (status == APR_SUCCESS) {
+        status = feed_brigade(session, session->input);
+    }
     else if (APR_STATUS_IS_TIMEUP(status)) {
         /* nop */
     }
     else if (!APR_STATUS_IS_EAGAIN(status)) {
-        dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, 0, NULL);
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, 
+                      "h2_proxy_session(%s): read error", session->id);
+        dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, status, NULL);
     }
+
     return status;
 }
 
@@ -918,8 +960,8 @@ static void ev_conn_error(h2_proxy_session *session, int arg, const char *msg)
             break;
         
         default:
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
-                          "h2_session(%s): conn error -> shutdown", session->id);
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, arg, session->c,
+                          "h2_proxy_session(%s): conn error -> shutdown", session->id);
             session_shutdown(session, arg, msg);
             break;
     }
@@ -936,7 +978,7 @@ static void ev_proto_error(h2_proxy_session *session, int arg, const char *msg)
         
         default:
             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
-                          "h2_session(%s): proto error -> shutdown", session->id);
+                          "h2_proxy_session(%s): proto error -> shutdown", session->id);
             session_shutdown(session, arg, msg);
             break;
     }
@@ -984,7 +1026,6 @@ static void ev_no_io(h2_proxy_session *session, int arg, const char *msg)
                  * task processing in other threads. Do a busy wait with
                  * backoff timer. */
                 transit(session, "no io", H2_PROXYS_ST_WAIT);
-                session->wait_timeout = 25;
             }
             break;
         default:
@@ -1133,7 +1174,7 @@ static void dispatch_event(h2_proxy_session *session, h2_proxys_event_t ev,
             break;
         default:
             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
-                          "h2_session(%s): unknown event %d", 
+                          "h2_proxy_session(%s): unknown event %d", 
                           session->id, ev);
             break;
     }
@@ -1145,7 +1186,7 @@ apr_status_t h2_proxy_session_process(h2_proxy_session *session)
     int have_written = 0, have_read = 0;
 
     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, 
-                  "h2_session(%s): process", session->id);
+                  "h2_proxy_session(%s): process", session->id);
            
     switch (session->state) {
         case H2_PROXYS_ST_INIT:
@@ -1154,7 +1195,7 @@ apr_status_t h2_proxy_session_process(h2_proxy_session *session)
                 dispatch_event(session, H2_PROXYS_EV_INIT, 0, NULL);
             }
             else {
-                dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, 0, NULL);
+                dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, status, NULL);
             }
             break;
             
@@ -1165,8 +1206,8 @@ apr_status_t h2_proxy_session_process(h2_proxy_session *session)
                 int rv = nghttp2_session_send(session->ngh2);
                 if (rv < 0 && nghttp2_is_fatal(rv)) {
                     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, 
-                                  "h2_session(%s): write, rv=%d", session->id, rv);
-                    dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, 0, NULL);
+                                  "h2_proxy_session(%s): write, rv=%d", session->id, rv);
+                    dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, rv, NULL);
                     break;
                 }
                 have_written = 1;
@@ -1189,14 +1230,27 @@ apr_status_t h2_proxy_session_process(h2_proxy_session *session)
             if (check_suspended(session) == APR_EAGAIN) {
                 /* no stream has become resumed. Do a blocking read with
                  * ever increasing timeouts... */
-                status = h2_proxy_session_read(session, 0, session->wait_timeout);
-                if (status == APR_SUCCESS) {
-                    dispatch_event(session, H2_PROXYS_EV_DATA_READ, 0, NULL);
+                if (session->wait_timeout < 25) {
+                    session->wait_timeout = 25;
                 }
-                else if (APR_STATUS_IS_TIMEUP(status)) {
+                else {
                     session->wait_timeout = H2MIN(apr_time_from_msec(100), 
                                                   2*session->wait_timeout);
                 }
+                
+                status = h2_proxy_session_read(session, 1, session->wait_timeout);
+                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, 
+                              "h2_proxy_session(%s): WAIT read, timeout=%fms", 
+                              session->id, (float)session->wait_timeout/1000.0);
+                if (status == APR_SUCCESS) {
+                    have_read = 1;
+                    dispatch_event(session, H2_PROXYS_EV_DATA_READ, 0, NULL);
+                }
+                else if (APR_STATUS_IS_TIMEUP(status)
+                    || APR_STATUS_IS_EAGAIN(status)) {
+                    /* go back to checking all inputs again */
+                    transit(session, "wait cycle", H2_PROXYS_ST_BUSY);
+                }
             }
             break;
             
@@ -1208,13 +1262,17 @@ apr_status_t h2_proxy_session_process(h2_proxy_session *session)
             
         default:
             ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EGENERAL, session->c,
-                          APLOGNO(03346)"h2_session(%s): unknown state %d", 
+                          APLOGNO(03346)"h2_proxy_session(%s): unknown state %d", 
                           session->id, session->state);
             dispatch_event(session, H2_PROXYS_EV_PROTO_ERROR, 0, NULL);
             break;
     }
 
 
+    if (have_read || have_written) {
+        session->wait_timeout = 0;
+    }
+    
     if (!nghttp2_session_want_read(session->ngh2)
         && !nghttp2_session_want_write(session->ngh2)) {
         dispatch_event(session, H2_PROXYS_EV_NGH2_DONE, 0, NULL);
index 2767ef538a465c6ce4896749192394714578bece..18509dfc12510765b1919ca3a302ca04b3716d1b 100644 (file)
@@ -451,6 +451,9 @@ request_rec *h2_request_create_rec(const h2_request *req, conn_rec *conn)
         /* Request check post hooks failed. An example of this would be a
          * request for a vhost where h2 is disabled --> 421.
          */
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, conn, APLOGNO()
+                      "h2_request(%d): access_status=%d, request_create failed",
+                      req->id, access_status);
         ap_die(access_status, r);
         ap_update_child_status(conn->sbh, SERVER_BUSY_LOG, r);
         ap_run_log_transaction(r);
index 855e8b9c8554d94ce86aefbf77818e82c65ca337..ecd8ae0061bbcf94a1cdf25fbd3b7a9b85870abd 100644 (file)
@@ -597,15 +597,6 @@ static int on_frame_send_cb(nghttp2_session *ngh2,
                      (long)session->frames_sent);
     }
     ++session->frames_sent;
-    switch (frame->hd.type) {
-        case NGHTTP2_HEADERS:
-        case NGHTTP2_DATA:
-            /* no explicit flushing necessary */
-            break;
-        default:
-            session->flush = 1;
-            break;
-    }
     return 0;
 }
 
@@ -2021,6 +2012,8 @@ apr_status_t h2_session_process(h2_session *session, int async)
                 no_streams = h2_ihash_is_empty(session->streams);
                 update_child_status(session, (no_streams? SERVER_BUSY_KEEPALIVE
                                               : SERVER_BUSY_READ), "idle");
+                /* make certain, the client receives everything before we idle */
+                h2_conn_io_flush(&session->io);
                 if (async && no_streams && !session->r && session->requests_received) {
                     ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
                                   "h2_session(%ld): async idle, nonblock read", session->id);
@@ -2176,6 +2169,8 @@ apr_status_t h2_session_process(h2_session *session, int async)
                                   "h2_session: wait for data, %ld micros", 
                                   (long)session->wait_us);
                 }
+                /* make certain, the client receives everything before we idle */
+                h2_conn_io_flush(&session->io);
                 status = h2_mplx_out_trywait(session->mplx, session->wait_us, 
                                              session->iowait);
                 if (status == APR_SUCCESS) {
@@ -2212,8 +2207,7 @@ apr_status_t h2_session_process(h2_session *session, int async)
     }
     
 out:
-    h2_conn_io_pass(&session->io, session->flush);
-    session->flush = 0;
+    h2_conn_io_flush(&session->io);
     
     ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
                   "h2_session(%ld): [%s] process returns", 
index 06543d670c82ef4d1bf111312eccad168e77946c..0a781f63d4ff5788785cf3314c245a8de433db31 100644 (file)
@@ -93,16 +93,16 @@ static apr_status_t h2_response_freeze_filter(ap_filter_t* f,
     AP_DEBUG_ASSERT(task);
     
     if (task->frozen) {
-        ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, f->r,
+        ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, f->r,
                       "h2_response_freeze_filter, saving");
-        return ap_save_brigade(f, &task->frozen_out, &bb, task->c->pool);
+        return ap_save_brigade(f, &task->output->frozen_bb, &bb, task->c->pool);
     }
     
     if (APR_BRIGADE_EMPTY(bb)) {
         return APR_SUCCESS;
     }
 
-    ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, f->r,
+    ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, f->r,
                   "h2_response_freeze_filter, passing");
     return ap_pass_brigade(f->next, bb);
 }
@@ -197,12 +197,18 @@ h2_task *h2_task_create(long session_id, const h2_request *req,
     task->request     = req;
     task->input_eos   = !req->body;
     task->ser_headers = req->serialize;
+    task->blocking    = 1;
 
     h2_ctx_create_for(c, task);
 
     return task;
 }
 
+void h2_task_set_io_blocking(h2_task *task, int blocking)
+{
+    task->blocking = blocking;
+}
+
 apr_status_t h2_task_do(h2_task *task, apr_thread_cond_t *cond)
 {
     apr_status_t status;
@@ -212,6 +218,8 @@ apr_status_t h2_task_do(h2_task *task, apr_thread_cond_t *cond)
     task->input = h2_task_input_create(task, task->c);
     task->output = h2_task_output_create(task, task->c);
     
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
+                  "h2_task(%s): process connection", task->id);
     ap_process_connection(task->c, ap_get_conn_socket(task->c));
     
     if (task->frozen) {
@@ -236,6 +244,8 @@ static apr_status_t h2_task_process_request(h2_task *task, conn_rec *c)
     conn_state_t *cs = c->cs;
     request_rec *r;
 
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+                  "h2_task(%s): create request_rec", task->id);
     r = h2_request_create_rec(req, c);
     if (r && (r->status == HTTP_OK)) {
         ap_update_child_status(c->sbh, SERVER_BUSY_READ, r);
@@ -264,6 +274,15 @@ static apr_status_t h2_task_process_request(h2_task *task, conn_rec *c)
             cs->state = CONN_STATE_WRITE_COMPLETION;
         r = NULL;
     }
+    else if (!r) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+                      "h2_task(%s): create request_rec failed, r=NULL", task->id);
+    }
+    else {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+                      "h2_task(%s): create request_rec failed, r->status=%d", 
+                      task->id, r->status);
+    }
     c->sbh = NULL;
 
     return APR_SUCCESS;
@@ -297,7 +316,7 @@ apr_status_t h2_task_freeze(h2_task *task, request_rec *r)
         conn_rec *c = task->c;
         
         task->frozen = 1;
-        task->frozen_out = apr_brigade_create(c->pool, c->bucket_alloc);
+        task->output->frozen_bb = apr_brigade_create(c->pool, c->bucket_alloc);
         ap_add_output_filter("H2_RESPONSE_FREEZE", task, r, r->connection);
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, 
                       "h2_task(%s), frozen", task->id);
index 24bde946f302460fb0c8b448feedc0c87aac6bae..462b2566a6edd990c7985a1eea1562a7391dfbf4 100644 (file)
@@ -58,12 +58,11 @@ struct h2_task {
     unsigned int input_eos   : 1;
     unsigned int ser_headers : 1;
     unsigned int frozen      : 1;
+    unsigned int blocking    : 1;
     
     struct h2_task_input *input;
     struct h2_task_output *output;
     struct apr_thread_cond_t *io;   /* used to wait for events on */
-
-    apr_bucket_brigade *frozen_out;
 };
 
 h2_task *h2_task_create(long session_id, const struct h2_request *req, 
@@ -83,4 +82,6 @@ extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_out) *h2_task_logio_add_bytes_out
 apr_status_t h2_task_freeze(h2_task *task, request_rec *r);
 apr_status_t h2_task_thaw(h2_task *task);
 
+void h2_task_set_io_blocking(h2_task *task, int blocking);
+
 #endif /* defined(__mod_h2__h2_task__) */
index 1a2bd863315940c96edfa43b6f9bbcb228f0f9dc..f1f52c005b6e17c7e14f4263e6e8ca60ac8b17d4 100644 (file)
@@ -77,14 +77,14 @@ static apr_status_t open_if_needed(h2_task_output *output, ap_filter_t *f,
             if (f) {
                 /* This happens currently when ap_die(status, r) is invoked
                  * by a read request filter. */
-                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c, APLOGNO(03204)
+                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, output->c, APLOGNO(03204)
                               "h2_task_output(%s): write without response by %s "
                               "for %s %s %s",
                               output->task->id, caller, 
                               output->task->request->method, 
                               output->task->request->authority, 
                               output->task->request->path);
-                f->c->aborted = 1;
+                output->c->aborted = 1;
             }
             if (output->task->io) {
                 apr_thread_cond_broadcast(output->task->io);
@@ -94,37 +94,48 @@ static apr_status_t open_if_needed(h2_task_output *output, ap_filter_t *f,
         
         if (h2_task_logio_add_bytes_out) {
             /* counter headers as if we'd do a HTTP/1.1 serialization */
-            /* TODO: counter a virtual status line? */
-            apr_off_t bytes_written;
-            apr_brigade_length(bb, 0, &bytes_written);
-            bytes_written += h2_util_table_bytes(response->headers, 3)+1;
-            h2_task_logio_add_bytes_out(f->c, bytes_written);
+            output->written = h2_util_table_bytes(response->headers, 3)+1;
+            h2_task_logio_add_bytes_out(output->c, output->written);
         }
         get_trailers(output);
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c, APLOGNO(03348)
-                      "h2_task_output(%s): open as needed %s %s %s",
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, output->c, APLOGNO(03348)
+                      "h2_task(%s): open response to %s %s %s",
                       output->task->id, output->task->request->method, 
                       output->task->request->authority, 
                       output->task->request->path);
         return h2_mplx_out_open(output->task->mplx, output->task->stream_id, 
                                 response, f, bb, output->task->io);
     }
-    return APR_EOF;
+    return APR_SUCCESS;
 }
 
-void h2_task_output_close(h2_task_output *output)
+static apr_status_t write_brigade_raw(h2_task_output *output, 
+                                      ap_filter_t* f, apr_bucket_brigade* bb)
 {
-    open_if_needed(output, NULL, NULL, "close");
-    if (output->state != H2_TASK_OUT_DONE) {
-        if (output->task->frozen_out 
-            && !APR_BRIGADE_EMPTY(output->task->frozen_out)) {
-            h2_mplx_out_write(output->task->mplx, output->task->stream_id, 
-                NULL, output->task->frozen_out, NULL, NULL);
+    apr_off_t written, left;
+    apr_status_t status;
+
+    apr_brigade_length(bb, 0, &written);
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, output->c,
+                  "h2_task(%s): write response body (%ld bytes)", 
+                  output->task->id, (long)written);
+    
+    status = h2_mplx_out_write(output->task->mplx, output->task->stream_id, 
+                               f, output->task->blocking, bb, 
+                               get_trailers(output), output->task->io);
+    if (status == APR_INCOMPLETE) {
+        apr_brigade_length(bb, 0, &left);
+        written -= left;
+        status = APR_SUCCESS;
+    }
+
+    if (status == APR_SUCCESS) {
+        output->written += written;
+        if (h2_task_logio_add_bytes_out) {
+            h2_task_logio_add_bytes_out(output->c, written);
         }
-        h2_mplx_out_close(output->task->mplx, output->task->stream_id, 
-                          get_trailers(output));
-        output->state = H2_TASK_OUT_DONE;
     }
+    return status;
 }
 
 /* Bring the data from the brigade (which represents the result of the
@@ -137,34 +148,57 @@ apr_status_t h2_task_output_write(h2_task_output *output,
     apr_status_t status;
     
     if (APR_BRIGADE_EMPTY(bb)) {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
-                      "h2_task_output(%s): empty write", output->task->id);
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, output->c,
+                      "h2_task(%s): empty write", output->task->id);
         return APR_SUCCESS;
     }
     
     if (output->task->frozen) {
         h2_util_bb_log(output->c, output->task->stream_id, APLOG_TRACE2,
                        "frozen task output write", bb);
-        return ap_save_brigade(f, &output->task->frozen_out, &bb, 
-                               output->c->pool);
+        return ap_save_brigade(f, &output->frozen_bb, &bb, output->c->pool);
     }
     
     status = open_if_needed(output, f, bb, "write");
-    if (status != APR_EOF) {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
-                      "h2_task_output(%s): opened and passed brigade", 
+    
+    /* Attempt to write saved brigade first */
+    if (status == APR_SUCCESS && output->bb 
+        && !APR_BRIGADE_EMPTY(output->bb)) {
+        status = write_brigade_raw(output, f, output->bb);
+    }
+    
+    /* If there is nothing saved (anymore), try to write the brigade passed */
+    if (status == APR_SUCCESS
+        && (!output->bb || APR_BRIGADE_EMPTY(output->bb))
+        && !APR_BRIGADE_EMPTY(bb)) {
+        status = write_brigade_raw(output, f, bb);
+    }
+    
+    /* If the passed brigade is not empty, save it before return */
+    if (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) {
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, output->c,
+                      "h2_task(%s): could not write all, saving brigade", 
                       output->task->id);
-        return status;
+        if (!output->bb) {
+            output->bb = apr_brigade_create(output->c->pool, output->c->bucket_alloc);
+        }
+        return ap_save_brigade(f, &output->bb, &bb, output->c->pool);
     }
     
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
-                  "h2_task_output(%s): write brigade", output->task->id);
-    if (h2_task_logio_add_bytes_out) {
-        apr_off_t bytes_written;
-        apr_brigade_length(bb, 0, &bytes_written);
-        h2_task_logio_add_bytes_out(f->c, bytes_written);
+    return status;
+}
+
+void h2_task_output_close(h2_task_output *output)
+{
+    open_if_needed(output, NULL, NULL, "close");
+    if (output->state != H2_TASK_OUT_DONE) {
+        if (output->frozen_bb && !APR_BRIGADE_EMPTY(output->frozen_bb)) {
+            h2_mplx_out_write(output->task->mplx, output->task->stream_id, 
+                NULL, 1, output->frozen_bb, NULL, NULL);
+        }
+        h2_mplx_out_close(output->task->mplx, output->task->stream_id, 
+                          get_trailers(output));
+        output->state = H2_TASK_OUT_DONE;
     }
-    return h2_mplx_out_write(output->task->mplx, output->task->stream_id, 
-                             f, bb, get_trailers(output), output->task->io);
 }
 
index aa719cdeea3970e1435f4d2c008c88ad7274d828..26326f0908b33e8f4e4459a4bd783cc7630823db 100644 (file)
@@ -30,16 +30,21 @@ typedef enum {
     H2_TASK_OUT_INIT,
     H2_TASK_OUT_STARTED,
     H2_TASK_OUT_DONE,
-} h2_task_output_state_t;
+} h2_task_out_state_t;
 
 typedef struct h2_task_output h2_task_output;
 
 struct h2_task_output {
     conn_rec *c;
     struct h2_task *task;
-    h2_task_output_state_t state;
+    h2_task_out_state_t state;
     struct h2_from_h1 *from_h1;
+    
     unsigned int trailers_passed : 1;
+
+    apr_off_t written;
+    apr_bucket_brigade *bb;
+    apr_bucket_brigade *frozen_bb;
 };
 
 h2_task_output *h2_task_output_create(struct h2_task *task, conn_rec *c);
index 0cc90f9ed133bdfc18fca1f2f3e2fce80cf7a4c3..7c3605a1279403d4ae7a634fccba32d1ab821422 100644 (file)
@@ -26,7 +26,7 @@
  * @macro
  * Version number of the http2 module as c string
  */
-#define MOD_HTTP2_VERSION "1.3.3-DEV"
+#define MOD_HTTP2_VERSION "1.4.0-DEV"
 
 /**
  * @macro
@@ -34,7 +34,7 @@
  * release. This is a 24 bit number with 8 bits for major number, 8 bits
  * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
  */
-#define MOD_HTTP2_VERSION_NUM 0x010303
+#define MOD_HTTP2_VERSION_NUM 0x010400
 
 
 #endif /* mod_h2_h2_version_h */
index 87cac2f678d63ccf024a333a36abf601badd759e..5216523022eb5930a980fadca0f8dfa962e976df 100644 (file)
@@ -128,28 +128,29 @@ static char *http2_var_lookup(apr_pool_t *, server_rec *,
                          conn_rec *, request_rec *, char *name);
 static int http2_is_h2(conn_rec *);
 
-static apr_status_t http2_req_engine_push(const char *engine_type, 
+static apr_status_t http2_req_engine_push(const char *ngn_type, 
                                           request_rec *r, 
                                           h2_req_engine_init *einit)
 {
-    return h2_mplx_engine_push(engine_type, r, einit);
+    return h2_mplx_req_engine_push(ngn_type, r, einit);
 }
 
-static apr_status_t http2_req_engine_pull(h2_req_engine *engine
+static apr_status_t http2_req_engine_pull(h2_req_engine *ngn
                                           apr_read_type_e block, 
+                                          apr_uint32_t capacity, 
                                           request_rec **pr)
 {
-    return h2_mplx_engine_pull(engine, block, pr);
+    return h2_mplx_req_engine_pull(ngn, block, capacity, pr);
 }
 
-static void http2_req_engine_done(h2_req_engine *engine, conn_rec *r_conn)
+static void http2_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn)
 {
-    h2_mplx_engine_done(engine, r_conn);
+    h2_mplx_req_engine_done(ngn, r_conn);
 }
 
-static void http2_req_engine_exit(h2_req_engine *engine)
+static void http2_req_engine_exit(h2_req_engine *ngn)
 {
-    h2_mplx_engine_exit(engine);
+    h2_mplx_req_engine_exit(ngn);
 }
 
 
index ae135293101ec029eadd5ecebf03e8a87db8e22f..d5af1d3ce27805e104173fd8c91fd2ad118cb0f1 100644 (file)
@@ -43,27 +43,12 @@ typedef struct h2_req_engine h2_req_engine;
  * @param engine the allocated, partially filled structure
  * @param r      the first request to process, or NULL
  */
-typedef apr_status_t h2_req_engine_init(h2_req_engine *engine, request_rec *r);
-
-/**
- * The public structure of a h2_req_engine. It gets allocated by the http2
- * infrastructure, assigned id, type, pool, io and connection and passed to the
- * h2_req_engine_init() callback to complete initialization.
- * This happens whenever a new request gets "push"ed for an engine type and
- * no instance, or no free instance, for the type is available.
- */
-struct h2_req_engine {
-    const char *id;        /* identifier */
-    apr_pool_t *pool;      /* pool for engine specific allocations */
-    const char *type;      /* name of the engine type */
-    unsigned char window_bits;/* preferred size of overall response data
-                            * mod_http2 is willing to buffer as log2 */
-    unsigned char req_window_bits;/* preferred size of response body data
-                            * mod_http2 is willing to buffer per request,
-                            * as log2 */
-    apr_size_t capacity;   /* maximum concurrent requests */
-    void *user_data;       /* user specific data */
-};
+typedef apr_status_t h2_req_engine_init(h2_req_engine *engine, 
+                                        const char *id, 
+                                        const char *type,
+                                        apr_pool_t *pool, 
+                                        apr_uint32_t req_buffer_size,
+                                        request_rec *r);
 
 /**
  * Push a request to an engine with the specified name for further processing.
@@ -95,6 +80,7 @@ APR_DECLARE_OPTIONAL_FN(apr_status_t,
 APR_DECLARE_OPTIONAL_FN(apr_status_t, 
                         http2_req_engine_pull, (h2_req_engine *engine, 
                                                 apr_read_type_e block,
+                                                apr_uint32_t capacity,
                                                 request_rec **pr));
 APR_DECLARE_OPTIONAL_FN(void, 
                         http2_req_engine_done, (h2_req_engine *engine, 
index 3349c1201258298c4486b9ee9b9c76145cae66d2..c5f827f29af73aa904c929922333e9077a81ccc7 100644 (file)
@@ -44,7 +44,9 @@ static int (*is_h2)(conn_rec *c);
 static apr_status_t (*req_engine_push)(const char *name, request_rec *r, 
                                        h2_req_engine_init *einit);
 static apr_status_t (*req_engine_pull)(h2_req_engine *engine, 
-                                       apr_read_type_e block, request_rec **pr);
+                                       apr_read_type_e block, 
+                                       apr_uint32_t capacity, 
+                                       request_rec **pr);
 static void (*req_engine_done)(h2_req_engine *engine, conn_rec *r_conn);
 static void (*req_engine_exit)(h2_req_engine *engine);
                                        
@@ -59,9 +61,16 @@ typedef struct h2_proxy_ctx {
     proxy_server_conf *conf;
     
     h2_req_engine *engine;
+    const char *engine_id;
+    const char *engine_type;
+    apr_pool_t *engine_pool;    
+    apr_uint32_t req_buffer_size;
+    
     unsigned standalone : 1;
     unsigned is_ssl : 1;
     unsigned flushall : 1;
+    
+    apr_status_t r_status; /* status of our first request work */
 } h2_proxy_ctx;
 
 static int h2_proxy_post_config(apr_pool_t *p, apr_pool_t *plog,
@@ -189,13 +198,21 @@ static int proxy_http2_canon(request_rec *r, char *url)
     return OK;
 }
 
-static apr_status_t proxy_engine_init(h2_req_engine *engine, request_rec *r)
+static apr_status_t proxy_engine_init(h2_req_engine *engine, 
+                                        const char *id, 
+                                        const char *type,
+                                        apr_pool_t *pool, 
+                                        apr_uint32_t req_buffer_size,
+                                        request_rec *r)
 {
     h2_proxy_ctx *ctx = ap_get_module_config(r->connection->conn_config, 
                                              &proxy_http2_module);
     if (ctx) {
-        engine->capacity = 100; /* guess until we know */
         ctx->engine = engine;
+        ctx->engine_id = id;
+        ctx->engine_type = type;
+        ctx->engine_pool = pool;
+        ctx->req_buffer_size = req_buffer_size;
         return APR_SUCCESS;
     }
     ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, r, 
@@ -225,28 +242,34 @@ static void request_done(h2_proxy_session *session, request_rec *r)
 {   
     h2_proxy_ctx *ctx = session->user_data;
     
-    if (req_engine_done && r != ctx->rbase) {
+    if (r == ctx->rbase) {
+        ctx->r_status = APR_SUCCESS;
+    }
+    else if (req_engine_done && ctx->engine) {
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r->connection, 
                       "h2_proxy_session(%s): request %s",
-                      ctx->engine->id, r->the_request);
+                      ctx->engine_id, r->the_request);
         req_engine_done(ctx->engine, r->connection);
     }
+    
 }
 
-static request_rec *next_request(h2_proxy_ctx *ctx, h2_proxy_session *session, 
-                                 request_rec *r, int before_leave)
+static apr_status_t next_request(h2_proxy_ctx *ctx, h2_proxy_session *session, 
+                                 request_rec *r, int before_leave,
+                                 request_rec **pr)
 {
-    if (!r && !ctx->standalone) {
-        ctx->engine->capacity = session->remote_max_concurrent;
-        if (req_engine_pull(ctx->engine, 
-                            before_leave? APR_BLOCK_READ: APR_NONBLOCK_READ, 
-                            &r) == APR_SUCCESS) {
-            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
-                          "h2_proxy_session(%s): pulled request %s", 
-                          session->id, r->the_request);
-        }
-    }
-    return r; 
+    *pr = r;
+    if (!r && ctx->engine) {
+        apr_status_t status;
+        status = req_engine_pull(ctx->engine, 
+                                 before_leave? APR_BLOCK_READ: APR_NONBLOCK_READ, 
+                                 H2MAX(1, session->remote_max_concurrent), pr);
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c, 
+                      "h2_proxy_session(%s): pulled request %s", 
+                      session->id, (*pr? (*pr)->the_request : "NULL"));
+        return status; 
+    }
+    return *pr? APR_SUCCESS : APR_EAGAIN;
 }
 
 static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx, request_rec *r) {
@@ -255,7 +278,7 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx, request_rec *r) {
     
 setup_backend:    
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, ctx->owner, 
-                  "eng(%s): setup backend", ctx->engine->id);
+                  "eng(%s): setup backend", ctx->engine_id);
     /* Step Two: Make the Connection (or check that an already existing
      * socket is still usable). On success, we have a socket connected to
      * backend->hostname. */
@@ -299,10 +322,9 @@ setup_backend:
      * loop until we got the response or encounter errors.
      */
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, ctx->owner, 
-                  "eng(%s): setup session", ctx->engine->id);
-    session = h2_proxy_session_setup(ctx->engine->id, ctx->p_conn, ctx->conf, 
-                                     ctx->engine->window_bits, 
-                                     ctx->engine->req_window_bits, 
+                  "eng(%s): setup session", ctx->engine_id);
+    session = h2_proxy_session_setup(ctx->engine_id, ctx->p_conn, ctx->conf, 
+                                     30, h2_log2(ctx->req_buffer_size), 
                                      request_done);
     if (!session) {
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->p_conn->connection, 
@@ -312,27 +334,32 @@ setup_backend:
     
 run_session:
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, ctx->owner, 
-                  "eng(%s): run session %s", ctx->engine->id, session->id);
+                  "eng(%s): run session %s", ctx->engine_id, session->id);
     session->user_data = ctx;
     status = h2_proxy_session_process(session);
     while (APR_STATUS_IS_EAGAIN(status)) {
-        r = next_request(ctx, session, r, 0);
-        if (r) {
+        status = next_request(ctx, session, r, 0, &r);
+        if (status == APR_SUCCESS) {
             add_request(session, r);
             r = NULL;
         }
+        else if (!APR_STATUS_IS_EAGAIN(status)) {
+            break;
+        }
         
         status = h2_proxy_session_process(session);
     }
     
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, ctx->owner, 
-                  "eng(%s): end of session run", ctx->engine->id);
+                  "eng(%s): end of session run", ctx->engine_id);
     if (session->state == H2_PROXYS_ST_DONE || status != APR_SUCCESS) {
         ctx->p_conn->close = 1;
     }
     
-    r = next_request(ctx, session, r, 1);
-    if (r) { 
+    if (status == APR_SUCCESS) {
+        status = next_request(ctx, session, r, 1, &r);
+    }
+    if (status == APR_SUCCESS) { 
         if (ctx->p_conn->close) {
             /* the connection is/willbe closed, the session is terminated.
              * Any open stream of that session needs to
@@ -355,7 +382,8 @@ run_session:
     }
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, 
                   ctx->p_conn->connection, "eng(%s): session run done",
-                  ctx->engine->id);
+                  ctx->engine_id);
+                  
     session->user_data = NULL;
     return status;
 }
@@ -412,6 +440,7 @@ static int proxy_http2_handler(request_rec *r,
     ctx->worker     = worker;
     ctx->conf       = conf;
     ctx->flushall   = apr_table_get(r->subprocess_env, "proxy-flushall")? 1 : 0;
+    ctx->r_status   = HTTP_SERVICE_UNAVAILABLE;
     
     ap_set_module_config(c->conn_config, &proxy_http2_module, ctx);
     apr_table_setn(r->notes, H2_PROXY_REQ_URL_NOTE, url);
@@ -460,6 +489,7 @@ static int proxy_http2_handler(request_rec *r,
             ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r, 
                           "H2: pushed request %s to engine type %s", 
                           url, engine_type);
+            ctx->r_status = APR_SUCCESS;
             goto cleanup;
         }
     }
@@ -467,14 +497,10 @@ static int proxy_http2_handler(request_rec *r,
     if (!ctx->engine) {
         /* No engine was available or has been initialized, handle this
         * request just by ourself. */
-        h2_req_engine *engine = apr_pcalloc(p, sizeof(*engine));
-        engine->id = apr_psprintf(p, "eng-proxy-%ld", c->id);
-        engine->type = engine_type;
-        engine->pool = p;
-        engine->capacity = 1;
-        engine->window_bits = 30;
-        engine->req_window_bits = 16;
-        ctx->engine = engine;
+        ctx->engine_id = apr_psprintf(p, "eng-proxy-%ld", c->id);
+        ctx->engine_type = engine_type;
+        ctx->engine_pool = p;
+        ctx->req_buffer_size = (32*1024);
         ctx->standalone = 1;
         ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r, 
                       "h2_proxy_http2(%ld): setup standalone engine for type %s", 
@@ -482,16 +508,16 @@ static int proxy_http2_handler(request_rec *r,
     }
     else {
         ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r, 
-                      "H2: hosting engine %s for request %s", ctx->engine->id, url);
+                      "H2: hosting engine %s for request %s", ctx->engine_id, url);
     }
     
     status = proxy_engine_run(ctx, r);
 
 cleanup:
-    if (!ctx->standalone && ctx->engine && req_engine_exit) {
+    if (ctx->engine && req_engine_exit) {
         req_engine_exit(ctx->engine);
+        ctx->engine = NULL;
     }
-    ctx->engine = NULL;
     
     if (ctx) {
         if (ctx->p_conn) {
@@ -507,7 +533,7 @@ cleanup:
         ctx->p_conn = NULL;
     }
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c, "leaving handler");
-    return status;
+    return ctx->r_status;
 }
 
 static void register_hook(apr_pool_t *p)