]> granicus.if.org Git - apache/commitdiff
On the 2.4.x branch:
authorStefan Eissing <icing@apache.org>
Mon, 10 Apr 2017 15:04:55 +0000 (15:04 +0000)
committerStefan Eissing <icing@apache.org>
Mon, 10 Apr 2017 15:04:55 +0000 (15:04 +0000)
Merged /httpd/httpd/trunk:r1789740,1790102,1790113,1790284,1790754,1790826-1790827,1790842

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/branches/2.4.x@1790847 13f79535-47bb-0310-9956-ffa450edef68

19 files changed:
CHANGES
modules/http2/h2_bucket_beam.c
modules/http2/h2_filter.c
modules/http2/h2_mplx.c
modules/http2/h2_mplx.h
modules/http2/h2_ngn_shed.c
modules/http2/h2_proxy_session.c
modules/http2/h2_proxy_util.c
modules/http2/h2_proxy_util.h
modules/http2/h2_session.c
modules/http2/h2_session.h
modules/http2/h2_stream.c
modules/http2/h2_task.c
modules/http2/h2_util.c
modules/http2/h2_util.h
modules/http2/h2_version.h
modules/http2/h2_workers.c
modules/http2/h2_workers.h
modules/http2/mod_proxy_http2.c

diff --git a/CHANGES b/CHANGES
index 25198ab9174ddcb9b69a5ffea2413f7265f937ae..42d1f0b048756aceb6948c6612feaa208299389b 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -2,6 +2,10 @@
 
 Changes with Apache 2.4.26
 
+  *) mod_proxy_http2: Fixed bug in re-attempting proxy requests after 
+     connection error. Reliability of reconnect handling improved. 
+     [Stefan Eissing]
+  
   *) mod_http2: better performance, eliminated need for nested locks and
      thread privates. Moving request setups from the main connection to the
      worker threads. Increase number of spare connections kept.
@@ -22,9 +26,6 @@ Changes with Apache 2.4.26
      format from 2.2 in the Last Modified column. PR60846.
      [Hank Ibell <hwibell gmail.com>]
 
-  *) mod_http2: fixed PR60869 by making h2 workers exit explicitly waking up
-     all threads to exit in a defined way. [Stefan Eissing]
-     
   *) core: Add %{REMOTE_PORT} to the expression parser. PR59938
      [Hank Ibell <hwibell gmail.com>]
 
index 17ad3d95f139b178eb64bbac4424ce5f3d0b6556..872c67544d09dd140a623a508aea91335fe367c8 100644 (file)
@@ -1035,7 +1035,11 @@ transfer:
                 ++transferred;
             }
             else {
+                /* let outside hook determine how bucket is beamed */
+                leave_yellow(beam, &bl);
                 brecv = h2_beam_bucket(beam, bb, bsender);
+                enter_yellow(beam, &bl);
+                
                 while (brecv && brecv != APR_BRIGADE_SENTINEL(bb)) {
                     ++transferred;
                     remain -= brecv->length;
index 3a8a3b1ad10edd3606874e40156d5fea36bcb1ac..c1f1a847d21c8d4c32bdee6974e7cc243b72e9ed 100644 (file)
@@ -428,38 +428,41 @@ static void add_stats(apr_bucket_brigade *bb, h2_session *s,
 
 static apr_status_t h2_status_insert(h2_task *task, apr_bucket *b)
 {
-    h2_mplx *m = task->mplx;
-    h2_stream *stream = h2_mplx_stream_get(m, task->stream_id);
-    h2_session *s;
-    conn_rec *c;
-    
+    conn_rec *c = task->c->master;
+    h2_ctx *h2ctx = h2_ctx_get(c, 0);
+    h2_session *session;
+    h2_stream *stream;
     apr_bucket_brigade *bb;
     apr_bucket *e;
     int32_t connFlowIn, connFlowOut;
     
+    
+    if (!h2ctx || (session = h2_ctx_session_get(h2ctx)) == NULL) {
+        return APR_SUCCESS;
+    }
+    
+    stream = h2_session_stream_get(session, task->stream_id);
     if (!stream) {
         /* stream already done */
         return APR_SUCCESS;
     }
-    s = stream->session;
-    c = s->c;
     
     bb = apr_brigade_create(stream->pool, c->bucket_alloc);
     
-    connFlowIn = nghttp2_session_get_effective_local_window_size(s->ngh2); 
-    connFlowOut = nghttp2_session_get_remote_window_size(s->ngh2);
+    connFlowIn = nghttp2_session_get_effective_local_window_size(session->ngh2); 
+    connFlowOut = nghttp2_session_get_remote_window_size(session->ngh2);
      
     bbout(bb, "{\n");
     bbout(bb, "  \"version\": \"draft-01\",\n");
-    add_settings(bb, s, 0);
-    add_peer_settings(bb, s, 0);
+    add_settings(bb, session, 0);
+    add_peer_settings(bb, session, 0);
     bbout(bb, "  \"connFlowIn\": %d,\n", connFlowIn);
     bbout(bb, "  \"connFlowOut\": %d,\n", connFlowOut);
-    bbout(bb, "  \"sentGoAway\": %d,\n", s->local.shutdown);
+    bbout(bb, "  \"sentGoAway\": %d,\n", session->local.shutdown);
 
-    add_streams(bb, s, 0);
+    add_streams(bb, session, 0);
     
-    add_stats(bb, s, stream, 1);
+    add_stats(bb, session, stream, 1);
     bbout(bb, "}\n");
     
     while ((e = APR_BRIGADE_FIRST(bb)) != APR_BRIGADE_SENTINEL(bb)) {
index 04fbbd05cbce638a631cb21014d65c34eeff169b..357bf5eaadfde12571472ea20c8d77e3776b0af9 100644 (file)
@@ -55,58 +55,29 @@ typedef struct {
     apr_time_t now;
 } stream_iter_ctx;
 
-/* NULL or the mutex hold by this thread, used for recursive calls
- */
-static const int nested_lock = 0;
-
-static apr_threadkey_t *thread_lock;
-
 apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s)
 {
-    if (nested_lock) {
-        return apr_threadkey_private_create(&thread_lock, NULL, pool);
-    }
     return APR_SUCCESS;
 }
 
-static apr_status_t enter_mutex(h2_mplx *m, int *pacquired)
-{
-    apr_status_t status;
-    
-    if (nested_lock) {
-        void *mutex = NULL;
-        /* Enter the mutex if this thread already holds the lock or
-         * if we can acquire it. Only on the later case do we unlock
-         * onleaving the mutex.
-         * This allow recursive entering of the mutex from the saem thread,
-         * which is what we need in certain situations involving callbacks
-         */
-        apr_threadkey_private_get(&mutex, thread_lock);
-        if (mutex == m->lock) {
-            *pacquired = 0;
-            ap_assert(NULL); /* nested, why? */
-            return APR_SUCCESS;
-        }
-    }
-    status = apr_thread_mutex_lock(m->lock);
-    *pacquired = (status == APR_SUCCESS);
-    if (nested_lock && *pacquired) {
-        apr_threadkey_private_set(m->lock, thread_lock);
-    }
-    return status;
-}
+#define H2_MPLX_ENTER(m)    \
+    do { apr_status_t rv; if ((rv = apr_thread_mutex_lock(m->lock)) != APR_SUCCESS) {\
+        return rv;\
+    } } while(0)
 
-static void leave_mutex(h2_mplx *m, int acquired)
-{
-    if (acquired) {
-        if (nested_lock) {
-            apr_threadkey_private_set(NULL, thread_lock);
-        }
-        apr_thread_mutex_unlock(m->lock);
-    }
-}
+#define H2_MPLX_LEAVE(m)    \
+    apr_thread_mutex_unlock(m->lock)
+#define H2_MPLX_ENTER_ALWAYS(m)    \
+    apr_thread_mutex_lock(m->lock)
 
-static void check_data_for(h2_mplx *m, int stream_id);
+#define H2_MPLX_ENTER_MAYBE(m, lock)    \
+    if (lock) apr_thread_mutex_lock(m->lock)
+
+#define H2_MPLX_LEAVE_MAYBE(m, lock)    \
+    if (lock) apr_thread_mutex_unlock(m->lock)
+
+static void check_data_for(h2_mplx *m, h2_stream *stream, int lock);
 
 static void stream_output_consumed(void *ctx, 
                                    h2_bucket_beam *beam, apr_off_t length)
@@ -155,6 +126,7 @@ static void stream_cleanup(h2_mplx *m, h2_stream *stream)
     h2_stream_cleanup(stream);
 
     h2_iq_remove(m->q, stream->id);
+    h2_fifo_remove(m->readyq, stream);
     h2_ihash_remove(m->streams, stream->id);
     h2_ihash_add(m->shold, stream);
     
@@ -240,7 +212,12 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
         m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id));
         m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id));
         m->q = h2_iq_create(m->pool, m->max_streams);
-        m->readyq = h2_iq_create(m->pool, m->max_streams);
+
+        status = h2_fifo_set_create(&m->readyq, m->pool, m->max_streams);
+        if (status != APR_SUCCESS) {
+            apr_pool_destroy(m->pool);
+            return NULL;
+        }
 
         m->workers = workers;
         m->max_active = workers->max_workers;
@@ -259,14 +236,15 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
 
 int h2_mplx_shutdown(h2_mplx *m)
 {
-    int acquired, max_stream_started = 0;
+    int max_stream_started = 0;
     
-    if (enter_mutex(m, &acquired) == APR_SUCCESS) {
-        max_stream_started = m->max_stream_started;
-        /* Clear schedule queue, disabling existing streams from starting */ 
-        h2_iq_clear(m->q);
-        leave_mutex(m, acquired);
-    }
+    H2_MPLX_ENTER(m);
+
+    max_stream_started = m->max_stream_started;
+    /* Clear schedule queue, disabling existing streams from starting */ 
+    h2_iq_clear(m->q);
+
+    H2_MPLX_LEAVE(m);
     return max_stream_started;
 }
 
@@ -341,12 +319,14 @@ static int stream_destroy_iter(void *ctx, void *val)
     return 0;
 }
 
-static void purge_streams(h2_mplx *m)
+static void purge_streams(h2_mplx *m, int lock)
 {
     if (!h2_ihash_empty(m->spurge)) {
+        H2_MPLX_ENTER_MAYBE(m, lock);
         while (!h2_ihash_iter(m->spurge, stream_destroy_iter, m)) {
             /* repeat until empty */
         }
+        H2_MPLX_LEAVE_MAYBE(m, lock);
     }
 }
 
@@ -363,18 +343,16 @@ static int stream_iter_wrap(void *ctx, void *stream)
 
 apr_status_t h2_mplx_stream_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx)
 {
-    apr_status_t status;
-    int acquired;
+    stream_iter_ctx_t x;
     
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        stream_iter_ctx_t x;
-        x.cb = cb;
-        x.ctx = ctx;
-        h2_ihash_iter(m->streams, stream_iter_wrap, &x);
+    H2_MPLX_ENTER(m);
+
+    x.cb = cb;
+    x.ctx = ctx;
+    h2_ihash_iter(m->streams, stream_iter_wrap, &x);
         
-        leave_mutex(m, acquired);
-    }
-    return status;
+    H2_MPLX_LEAVE(m);
+    return APR_SUCCESS;
 }
 
 static int report_stream_iter(void *ctx, void *val) {
@@ -430,14 +408,13 @@ void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
 {
     apr_status_t status;
     int i, wait_secs = 60;
-    int acquired;
 
     /* How to shut down a h2 connection:
      * 0. abort and tell the workers that no more tasks will come from us */
     m->aborted = 1;
     h2_workers_unregister(m->workers, m);
     
-    enter_mutex(m, &acquired);
+    H2_MPLX_ENTER_ALWAYS(m);
 
     /* How to shut down a h2 connection:
      * 1. cancel all streams still active */
@@ -482,7 +459,7 @@ void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
         h2_ihash_iter(m->shold, unexpected_stream_iter, m);
     }
     
-    leave_mutex(m, acquired);
+    H2_MPLX_LEAVE(m);
 
     /* 5. unregister again, now that our workers are done */
     h2_workers_unregister(m->workers, m);
@@ -493,41 +470,34 @@ void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
 
 apr_status_t h2_mplx_stream_cleanup(h2_mplx *m, h2_stream *stream)
 {
-    apr_status_t status = APR_SUCCESS;
-    int acquired;
+    H2_MPLX_ENTER(m);
     
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
-                      H2_STRM_MSG(stream, "cleanup"));
-        stream_cleanup(m, stream);        
-        leave_mutex(m, acquired);
-    }
-    return status;
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
+                  H2_STRM_MSG(stream, "cleanup"));
+    stream_cleanup(m, stream);        
+    
+    H2_MPLX_LEAVE(m);
+    return APR_SUCCESS;
 }
 
 h2_stream *h2_mplx_stream_get(h2_mplx *m, int id)
 {
     h2_stream *s = NULL;
-    int acquired;
     
-    if ((enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        s = h2_ihash_get(m->streams, id);
-        leave_mutex(m, acquired);
-    }
+    H2_MPLX_ENTER_ALWAYS(m);
+
+    s = h2_ihash_get(m->streams, id);
+
+    H2_MPLX_LEAVE(m);
     return s;
 }
 
 static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
 {
-    h2_mplx *m = ctx;
-    int acquired;
+    h2_stream *stream = ctx;
+    h2_mplx *m = stream->session->mplx;
     
-    if (enter_mutex(m, &acquired) == APR_SUCCESS) {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
-                      "h2_mplx(%ld-%d): output_produced", m->c->id, beam->id);
-        check_data_for(m, beam->id);
-        leave_mutex(m, acquired);
-    }
+    check_data_for(m, stream, 1);
 }
 
 static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
@@ -551,7 +521,7 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
     }
     
     h2_beam_on_consumed(stream->output, NULL, stream_output_consumed, stream);
-    h2_beam_on_produced(stream->output, output_produced, m);
+    h2_beam_on_produced(stream->output, output_produced, stream);
     if (stream->task->output.copy_files) {
         h2_beam_on_file_beam(stream->output, h2_beam_no_files, NULL);
     }
@@ -561,24 +531,24 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
     
     /* we might see some file buckets in the output, see
      * if we have enough handles reserved. */
-    check_data_for(m, stream->id);
+    check_data_for(m, stream, 0);
     return status;
 }
 
 apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
 {
     apr_status_t status;
-    int acquired;
     
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        if (m->aborted) {
-            status = APR_ECONNABORTED;
-        }
-        else {
-            status = out_open(m, stream_id, beam);
-        }
-        leave_mutex(m, acquired);
+    H2_MPLX_ENTER(m);
+
+    if (m->aborted) {
+        status = APR_ECONNABORTED;
     }
+    else {
+        status = out_open(m, stream_id, beam);
+    }
+
+    H2_MPLX_LEAVE(m);
     return status;
 }
 
@@ -601,7 +571,7 @@ static apr_status_t out_close(h2_mplx *m, h2_task *task)
     status = h2_beam_close(task->output.beam);
     h2_beam_log(task->output.beam, m->c, APLOG_TRACE2, "out_close");
     output_consumed_signal(m, task);
-    check_data_for(m, task->stream_id);
+    check_data_for(m, stream, 0);
     return status;
 }
 
@@ -609,58 +579,61 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
                                  apr_thread_cond_t *iowait)
 {
     apr_status_t status;
-    int acquired;
     
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        if (m->aborted) {
-            status = APR_ECONNABORTED;
-        }
-        else if (apr_atomic_read32(&m->event_pending) > 0) {
-            status = APR_SUCCESS;
-        }
-        else {
-            purge_streams(m);
-            h2_ihash_iter(m->streams, report_consumption_iter, m);
-            m->added_output = iowait;
-            status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
-            if (APLOGctrace2(m->c)) {
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
-                              "h2_mplx(%ld): trywait on data for %f ms)",
-                              m->id, timeout/1000.0);
-            }
-            m->added_output = NULL;
+    H2_MPLX_ENTER(m);
+
+    if (m->aborted) {
+        status = APR_ECONNABORTED;
+    }
+    else if (h2_mplx_has_master_events(m)) {
+        status = APR_SUCCESS;
+    }
+    else {
+        purge_streams(m, 0);
+        h2_ihash_iter(m->streams, report_consumption_iter, m);
+        m->added_output = iowait;
+        status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
+        if (APLOGctrace2(m->c)) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+                          "h2_mplx(%ld): trywait on data for %f ms)",
+                          m->id, timeout/1000.0);
         }
-        leave_mutex(m, acquired);
+        m->added_output = NULL;
     }
+
+    H2_MPLX_LEAVE(m);
     return status;
 }
 
-static void check_data_for(h2_mplx *m, int stream_id)
+static void check_data_for(h2_mplx *m, h2_stream *stream, int lock)
 {
-    ap_assert(m);
-    h2_iq_append(m->readyq, stream_id);
-    apr_atomic_set32(&m->event_pending, 1);
-    if (m->added_output) {
-        apr_thread_cond_signal(m->added_output);
+    if (h2_fifo_push(m->readyq, stream) == APR_SUCCESS) {
+        apr_atomic_set32(&m->event_pending, 1);
+        H2_MPLX_ENTER_MAYBE(m, lock);
+        if (m->added_output) {
+            apr_thread_cond_signal(m->added_output);
+        }
+        H2_MPLX_LEAVE_MAYBE(m, lock);
     }
 }
 
 apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx)
 {
     apr_status_t status;
-    int acquired;
     
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        if (m->aborted) {
-            status = APR_ECONNABORTED;
-        }
-        else {
-            h2_iq_sort(m->q, cmp, ctx);
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                          "h2_mplx(%ld): reprioritize tasks", m->id);
-        }
-        leave_mutex(m, acquired);
+    H2_MPLX_ENTER(m);
+
+    if (m->aborted) {
+        status = APR_ECONNABORTED;
+    }
+    else {
+        h2_iq_sort(m->q, cmp, ctx);
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                      "h2_mplx(%ld): reprioritize tasks", m->id);
+        status = APR_SUCCESS;
     }
+
+    H2_MPLX_LEAVE(m);
     return status;
 }
 
@@ -682,29 +655,30 @@ apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream,
                              h2_stream_pri_cmp *cmp, void *ctx)
 {
     apr_status_t status;
-    int acquired;
     
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        if (m->aborted) {
-            status = APR_ECONNABORTED;
+    H2_MPLX_ENTER(m);
+
+    if (m->aborted) {
+        status = APR_ECONNABORTED;
+    }
+    else {
+        status = APR_SUCCESS;
+        h2_ihash_add(m->streams, stream);
+        if (h2_stream_is_ready(stream)) {
+            /* already have a response */
+            check_data_for(m, stream, 0);
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                          H2_STRM_MSG(stream, "process, add to readyq")); 
         }
         else {
-            h2_ihash_add(m->streams, stream);
-            if (h2_stream_is_ready(stream)) {
-                /* already have a response */
-                check_data_for(m, stream->id);
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                              H2_STRM_MSG(stream, "process, add to readyq")); 
-            }
-            else {
-                h2_iq_add(m->q, stream->id, cmp, ctx);
-                register_if_needed(m);                
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                              H2_STRM_MSG(stream, "process, added to q")); 
-            }
+            h2_iq_add(m->q, stream->id, cmp, ctx);
+            register_if_needed(m);                
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                          H2_STRM_MSG(stream, "process, added to q")); 
         }
-        leave_mutex(m, acquired);
     }
+
+    H2_MPLX_LEAVE(m);
     return status;
 }
 
@@ -762,22 +736,21 @@ static h2_task *next_stream_task(h2_mplx *m)
 h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more)
 {
     h2_task *task = NULL;
-    apr_status_t status;
-    int acquired;
     
+    H2_MPLX_ENTER_ALWAYS(m);
+
     *has_more = 0;
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        if (!m->aborted) {
-            task = next_stream_task(m);
-            if (task != NULL && !h2_iq_empty(m->q)) {
-                *has_more = 1;
-            }
-            else {
-                m->is_registered = 0; /* h2_workers will discard this mplx */
-            }
+    if (!m->aborted) {
+        task = next_stream_task(m);
+        if (task != NULL && !h2_iq_empty(m->q)) {
+            *has_more = 1;
+        }
+        else {
+            m->is_registered = 0; /* h2_workers will discard this mplx */
         }
-        leave_mutex(m, acquired);
     }
+
+    H2_MPLX_LEAVE(m);
     return task;
 }
 
@@ -814,7 +787,7 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
     if (task->engine) {
         if (!m->aborted && !task->c->aborted 
             && !h2_req_engine_is_shutdown(task->engine)) {
-            ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(10022)
                           "h2_mplx(%ld): task(%s) has not-shutdown "
                           "engine(%s)", m->id, task->id, 
                           h2_req_engine_get_id(task->engine));
@@ -845,35 +818,37 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
     }
     
     stream = h2_ihash_get(m->streams, task->stream_id);
-    if (stream && !m->aborted && h2_ihash_get(m->sredo, stream->id)) {
-        /* reset and schedule again */
-        h2_task_redo(task);
-        h2_ihash_remove(m->sredo, stream->id);
-        h2_iq_add(m->q, stream->id, NULL, NULL);
-        return;
-    }
-    
     if (stream) {
-        /* stream not cleaned up, stay around */
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
-                      H2_STRM_MSG(stream, "task_done, stream open")); 
-        /* more data will not arrive, resume the stream */
-        if (stream->input) {
-            h2_beam_mutex_disable(stream->input);
-            h2_beam_leave(stream->input);
+        /* stream not done yet. */
+        if (!m->aborted && h2_ihash_get(m->sredo, stream->id)) {
+            /* reset and schedule again */
+            h2_task_redo(task);
+            h2_ihash_remove(m->sredo, stream->id);
+            h2_iq_add(m->q, stream->id, NULL, NULL);
         }
-        if (stream->output) {
-            h2_beam_mutex_disable(stream->output);
+        else {
+            /* stream not cleaned up, stay around */
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+                          H2_STRM_MSG(stream, "task_done, stream open")); 
+            /* more data will not arrive, resume the stream */
+            check_data_for(m, stream, 0);
+            
+            if (stream->input) {
+                h2_beam_leave(stream->input);
+                h2_beam_mutex_disable(stream->input);
+            }
+            if (stream->output) {
+                h2_beam_mutex_disable(stream->output);
+            }
         }
-        check_data_for(m, stream->id);
     }
     else if ((stream = h2_ihash_get(m->shold, task->stream_id)) != NULL) {
+        /* stream is done, was just waiting for this. */
         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
                       H2_STRM_MSG(stream, "task_done, in hold"));
-        /* stream was just waiting for us. */
         if (stream->input) {
-            h2_beam_mutex_disable(stream->input);
             h2_beam_leave(stream->input);
+            h2_beam_mutex_disable(stream->input);
         }
         if (stream->output) {
             h2_beam_mutex_disable(stream->output);
@@ -895,21 +870,21 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
 
 void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask)
 {
-    int acquired;
+    H2_MPLX_ENTER_ALWAYS(m);
+
+    task_done(m, task, NULL);
+    --m->tasks_active;
     
-    if (enter_mutex(m, &acquired) == APR_SUCCESS) {
-        task_done(m, task, NULL);
-        --m->tasks_active;
-        if (m->join_wait) {
-            apr_thread_cond_signal(m->join_wait);
-        }
-        if (ptask) {
-            /* caller wants another task */
-            *ptask = next_stream_task(m);
-        }
-        register_if_needed(m);
-        leave_mutex(m, acquired);
+    if (m->join_wait) {
+        apr_thread_cond_signal(m->join_wait);
+    }
+    if (ptask) {
+        /* caller wants another task */
+        *ptask = next_stream_task(m);
     }
+    register_if_needed(m);
+
+    H2_MPLX_LEAVE(m);
 }
 
 /*******************************************************************************
@@ -1001,52 +976,53 @@ apr_status_t h2_mplx_idle(h2_mplx *m)
 {
     apr_status_t status = APR_SUCCESS;
     apr_time_t now;            
-    int acquired;
+    apr_size_t scount;
     
-    if (enter_mutex(m, &acquired) == APR_SUCCESS) {
-        apr_size_t scount = h2_ihash_count(m->streams);
-        if (scount > 0 && m->tasks_active) {
-            /* If we have streams in connection state 'IDLE', meaning
-             * all streams are ready to sent data out, but lack
-             * WINDOW_UPDATEs. 
-             * 
-             * This is ok, unless we have streams that still occupy
-             * h2 workers. As worker threads are a scarce resource, 
-             * we need to take measures that we do not get DoSed.
-             * 
-             * This is what we call an 'idle block'. Limit the amount 
-             * of busy workers we allow for this connection until it
-             * well behaves.
-             */
-            now = apr_time_now();
-            m->last_idle_block = now;
-            if (m->limit_active > 2 
-                && now - m->last_limit_change >= m->limit_change_interval) {
-                if (m->limit_active > 16) {
-                    m->limit_active = 16;
-                }
-                else if (m->limit_active > 8) {
-                    m->limit_active = 8;
-                }
-                else if (m->limit_active > 4) {
-                    m->limit_active = 4;
-                }
-                else if (m->limit_active > 2) {
-                    m->limit_active = 2;
-                }
-                m->last_limit_change = now;
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                              "h2_mplx(%ld): decrease worker limit to %d",
-                              m->id, m->limit_active);
+    H2_MPLX_ENTER(m);
+
+    scount = h2_ihash_count(m->streams);
+    if (scount > 0 && m->tasks_active) {
+        /* If we have streams in connection state 'IDLE', meaning
+         * all streams are ready to sent data out, but lack
+         * WINDOW_UPDATEs. 
+         * 
+         * This is ok, unless we have streams that still occupy
+         * h2 workers. As worker threads are a scarce resource, 
+         * we need to take measures that we do not get DoSed.
+         * 
+         * This is what we call an 'idle block'. Limit the amount 
+         * of busy workers we allow for this connection until it
+         * well behaves.
+         */
+        now = apr_time_now();
+        m->last_idle_block = now;
+        if (m->limit_active > 2 
+            && now - m->last_limit_change >= m->limit_change_interval) {
+            if (m->limit_active > 16) {
+                m->limit_active = 16;
             }
-            
-            if (m->tasks_active > m->limit_active) {
-                status = unschedule_slow_tasks(m);
+            else if (m->limit_active > 8) {
+                m->limit_active = 8;
+            }
+            else if (m->limit_active > 4) {
+                m->limit_active = 4;
+            }
+            else if (m->limit_active > 2) {
+                m->limit_active = 2;
             }
+            m->last_limit_change = now;
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                          "h2_mplx(%ld): decrease worker limit to %d",
+                          m->id, m->limit_active);
+        }
+        
+        if (m->tasks_active > m->limit_active) {
+            status = unschedule_slow_tasks(m);
         }
-        register_if_needed(m);
-        leave_mutex(m, acquired);
     }
+    register_if_needed(m);
+
+    H2_MPLX_LEAVE(m);
     return status;
 }
 
@@ -1090,7 +1066,7 @@ apr_status_t h2_mplx_req_engine_push(const char *ngn_type,
     apr_status_t status;
     h2_mplx *m;
     h2_task *task;
-    int acquired;
+    h2_stream *stream;
     
     task = h2_ctx_rget_task(r);
     if (!task) {
@@ -1098,17 +1074,17 @@ apr_status_t h2_mplx_req_engine_push(const char *ngn_type,
     }
     m = task->mplx;
     
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
-        
-        if (stream) {
-            status = h2_ngn_shed_push_request(m->ngn_shed, ngn_type, r, einit);
-        }
-        else {
-            status = APR_ECONNABORTED;
-        }
-        leave_mutex(m, acquired);
+    H2_MPLX_ENTER(m);
+
+    stream = h2_ihash_get(m->streams, task->stream_id);
+    if (stream) {
+        status = h2_ngn_shed_push_request(m->ngn_shed, ngn_type, r, einit);
+    }
+    else {
+        status = APR_ECONNABORTED;
     }
+
+    H2_MPLX_LEAVE(m);
     return status;
 }
 
@@ -1120,35 +1096,36 @@ apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn,
     h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn);
     h2_mplx *m = h2_ngn_shed_get_ctx(shed);
     apr_status_t status;
-    int acquired;
+    int want_shutdown;
     
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        int want_shutdown = (block == APR_BLOCK_READ);
+    H2_MPLX_ENTER(m);
 
-        /* Take this opportunity to update output consummation 
-         * for this engine */
-        ngn_out_update_windows(m, ngn);
-        
-        if (want_shutdown && !h2_iq_empty(m->q)) {
-            /* For a blocking read, check first if requests are to be
-             * had and, if not, wait a short while before doing the
-             * blocking, and if unsuccessful, terminating read.
-             */
+    want_shutdown = (block == APR_BLOCK_READ);
+
+    /* Take this opportunity to update output consummation 
+     * for this engine */
+    ngn_out_update_windows(m, ngn);
+    
+    if (want_shutdown && !h2_iq_empty(m->q)) {
+        /* For a blocking read, check first if requests are to be
+         * had and, if not, wait a short while before doing the
+         * blocking, and if unsuccessful, terminating read.
+         */
+        status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr);
+        if (APR_STATUS_IS_EAGAIN(status)) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                          "h2_mplx(%ld): start block engine pull", m->id);
+            apr_thread_cond_timedwait(m->task_thawed, m->lock, 
+                                      apr_time_from_msec(20));
             status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr);
-            if (APR_STATUS_IS_EAGAIN(status)) {
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                              "h2_mplx(%ld): start block engine pull", m->id);
-                apr_thread_cond_timedwait(m->task_thawed, m->lock, 
-                                          apr_time_from_msec(20));
-                status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr);
-            }
-        }
-        else {
-            status = h2_ngn_shed_pull_request(shed, ngn, capacity,
-                                              want_shutdown, pr);
         }
-        leave_mutex(m, acquired);
     }
+    else {
+        status = h2_ngn_shed_pull_request(shed, ngn, capacity,
+                                          want_shutdown, pr);
+    }
+
+    H2_MPLX_LEAVE(m);
     return status;
 }
  
@@ -1159,29 +1136,29 @@ void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn,
     
     if (task) {
         h2_mplx *m = task->mplx;
-        int acquired;
+        h2_stream *stream;
 
-        if (enter_mutex(m, &acquired) == APR_SUCCESS) {
-            h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
-            
-            ngn_out_update_windows(m, ngn);
-            h2_ngn_shed_done_task(m->ngn_shed, ngn, task);
-            
-            if (status != APR_SUCCESS && stream 
-                && h2_task_can_redo(task) 
-                && !h2_ihash_get(m->sredo, stream->id)) {
-                h2_ihash_add(m->sredo, stream);
-            }
-            if (task->engine) { 
-                /* cannot report that as done until engine returns */
-            }
-            else {
-                task_done(m, task, ngn);
-            }
-            /* Take this opportunity to update output consummation 
-             * for this engine */
-            leave_mutex(m, acquired);
+        H2_MPLX_ENTER_ALWAYS(m);
+
+        stream = h2_ihash_get(m->streams, task->stream_id);
+        
+        ngn_out_update_windows(m, ngn);
+        h2_ngn_shed_done_task(m->ngn_shed, ngn, task);
+        
+        if (status != APR_SUCCESS && stream 
+            && h2_task_can_redo(task) 
+            && !h2_ihash_get(m->sredo, stream->id)) {
+            h2_ihash_add(m->sredo, stream);
+        }
+
+        if (task->engine) { 
+            /* cannot report that as done until engine returns */
         }
+        else {
+            task_done(m, task, ngn);
+        }
+
+        H2_MPLX_LEAVE(m);
     }
 }
 
@@ -1198,65 +1175,47 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m,
                                             stream_ev_callback *on_resume, 
                                             void *on_ctx)
 {
-    apr_status_t status;
-    int acquired;
-    int ids[100];
     h2_stream *stream;
-    size_t i, n;
+    int n;
     
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
-                      "h2_mplx(%ld): dispatch events", m->id);        
-        apr_atomic_set32(&m->event_pending, 0);
-        purge_streams(m);
-        
-        /* update input windows for streams */
-        h2_ihash_iter(m->streams, report_consumption_iter, m);
-        
-        if (!h2_iq_empty(m->readyq)) {
-            n = h2_iq_mshift(m->readyq, ids, H2_ALEN(ids));
-            for (i = 0; i < n; ++i) {
-                stream = h2_ihash_get(m->streams, ids[i]);
-                if (stream) {
-                    leave_mutex(m, acquired);
-                    on_resume(on_ctx, stream);
-                    enter_mutex(m, &acquired);
-                }
-            }
-        }
-        if (!h2_iq_empty(m->readyq)) {
-            apr_atomic_set32(&m->event_pending, 1);
-        } 
-        leave_mutex(m, acquired);
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
+                  "h2_mplx(%ld): dispatch events", m->id);        
+    apr_atomic_set32(&m->event_pending, 0);
+
+    /* update input windows for streams */
+    h2_ihash_iter(m->streams, report_consumption_iter, m);    
+    purge_streams(m, 1);
+    
+    n = h2_fifo_count(m->readyq);
+    while (n > 0 
+           && (h2_fifo_try_pull(m->readyq, (void**)&stream) == APR_SUCCESS)) {
+        --n;
+        on_resume(on_ctx, stream);
     }
-    return status;
+    
+    return APR_SUCCESS;
 }
 
-apr_status_t h2_mplx_keep_active(h2_mplx *m, int stream_id)
+apr_status_t h2_mplx_keep_active(h2_mplx *m, h2_stream *stream)
 {
-    apr_status_t status;
-    int acquired;
-    
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        check_data_for(m, stream_id);
-        leave_mutex(m, acquired);
-    }
-    return status;
+    check_data_for(m, stream, 1);
+    return APR_SUCCESS;
 }
 
 int h2_mplx_awaits_data(h2_mplx *m)
 {
-    apr_status_t status;
-    int acquired, waiting = 1;
+    int waiting = 1;
      
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        if (h2_ihash_empty(m->streams)) {
-            waiting = 0;
-        }
-        if (h2_iq_empty(m->readyq) && h2_iq_empty(m->q) && !m->tasks_active) {
-            waiting = 0;
-        }
-        leave_mutex(m, acquired);
+    H2_MPLX_ENTER_ALWAYS(m);
+
+    if (h2_ihash_empty(m->streams)) {
+        waiting = 0;
     }
+    if ((h2_fifo_count(m->readyq) == 0) 
+        && h2_iq_empty(m->q) && !m->tasks_active) {
+        waiting = 0;
+    }
+
+    H2_MPLX_LEAVE(m);
     return waiting;
 }
index 82a98fce0a83ea3942d53a3adc7ca3ae26971a30..ed332c8bc3d16efb55d99c9b71404df4ce9a353a 100644 (file)
@@ -68,7 +68,7 @@ struct h2_mplx {
     struct h2_ihash_t *spurge;      /* all streams done, ready for destroy */
     
     struct h2_iqueue *q;            /* all stream ids that need to be started */
-    struct h2_iqueue *readyq;       /* all stream ids ready for output */
+    struct h2_fifo *readyq;         /* all streams ready for output */
         
     struct h2_ihash_t *redo_tasks;  /* all tasks that need to be redone */
     
@@ -158,7 +158,7 @@ apr_status_t h2_mplx_stream_cleanup(h2_mplx *m, struct h2_stream *stream);
 apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
                                  struct apr_thread_cond_t *iowait);
 
-apr_status_t h2_mplx_keep_active(h2_mplx *m, int stream_id);
+apr_status_t h2_mplx_keep_active(h2_mplx *m, struct h2_stream *stream);
 
 /*******************************************************************************
  * Stream processing.
index e0c40cfb233cbfa32f4e44be9d2733411c892ac0..27474ba22dee3990628d05f700fa3350cd526198 100644 (file)
@@ -151,6 +151,7 @@ static void ngn_add_task(h2_req_engine *ngn, h2_task *task, request_rec *r)
     entry->task = task;
     entry->r = r;
     H2_REQ_ENTRIES_INSERT_TAIL(&ngn->entries, entry);
+    ngn->no_assigned++;
 }
 
 
@@ -176,6 +177,17 @@ apr_status_t h2_ngn_shed_push_request(h2_ngn_shed *shed, const char *ngn_type,
         task->assigned = NULL;
     }
     
+    if (task->engine) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c, 
+                      "h2_ngn_shed(%ld): push task(%s) hosting engine %s " 
+                      "already with %d tasks", 
+                      shed->c->id, task->id, task->engine->id,
+                      task->engine->no_assigned);
+        task->assigned = task->engine;
+        ngn_add_task(task->engine, task, r);
+        return APR_SUCCESS;
+    }
+    
     ngn = apr_hash_get(shed->ngns, ngn_type, APR_HASH_KEY_STRING);
     if (ngn && !ngn->shutdown) {
         /* this task will be processed in another thread,
@@ -187,7 +199,6 @@ apr_status_t h2_ngn_shed_push_request(h2_ngn_shed *shed, const char *ngn_type,
             h2_task_freeze(task);
         }
         ngn_add_task(ngn, task, r);
-        ngn->no_assigned++;
         return APR_SUCCESS;
     }
     
@@ -211,11 +222,11 @@ apr_status_t h2_ngn_shed_push_request(h2_ngn_shed *shed, const char *ngn_type,
         status = einit(newngn, newngn->id, newngn->type, newngn->pool,
                        shed->req_buffer_size, r,
                        &newngn->out_consumed, &newngn->out_consumed_ctx);
+        
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, task->c, APLOGNO(03395)
                       "h2_ngn_shed(%ld): create engine %s (%s)", 
                       shed->c->id, newngn->id, newngn->type);
         if (status == APR_SUCCESS) {
-            ap_assert(task->engine == NULL);
             newngn->task = task;
             task->engine = newngn;
             task->assigned = newngn;
index 49476e965b75bb0ea8b4f4b7634e20c73123c362..f2fed906b7123bd3d80ff18f41d1dfcd7577b585 100644 (file)
@@ -242,7 +242,6 @@ static int add_header(void *table, const char *n, const char *v)
 
 static void process_proxy_header(h2_proxy_stream *stream, const char *n, const char *v)
 {
-    request_rec *r = stream->r;
     static const struct {
         const char *name;
         ap_proxy_header_reverse_map_fn func;
@@ -254,23 +253,26 @@ static void process_proxy_header(h2_proxy_stream *stream, const char *n, const c
         { "Set-Cookie", ap_proxy_cookie_reverse_map },
         { NULL, NULL }
     };
+    request_rec *r = stream->r;
     proxy_dir_conf *dconf;
     int i;
     
-    for (i = 0; transform_hdrs[i].name; ++i) {
-        if (!ap_cstr_casecmp(transform_hdrs[i].name, n)) {
+    dconf = ap_get_module_config(r->per_dir_config, &proxy_module);
+    if (!dconf->preserve_host) {
+        for (i = 0; transform_hdrs[i].name; ++i) {
+            if (!ap_cstr_casecmp(transform_hdrs[i].name, n)) {
+                apr_table_add(r->headers_out, n,
+                              (*transform_hdrs[i].func)(r, dconf, v));
+                return;
+            }
+        }
+        if (!ap_cstr_casecmp("Link", n)) {
             dconf = ap_get_module_config(r->per_dir_config, &proxy_module);
             apr_table_add(r->headers_out, n,
-                          (*transform_hdrs[i].func)(r, dconf, v));
+                          h2_proxy_link_reverse_map(r, dconf, 
+                                                    stream->real_server_uri, stream->p_server_uri, v));
             return;
-       }
-    }
-    if (!ap_cstr_casecmp("Link", n)) {
-        dconf = ap_get_module_config(r->per_dir_config, &proxy_module);
-        apr_table_add(r->headers_out, n,
-                      h2_proxy_link_reverse_map(r, dconf, 
-                      stream->real_server_uri, stream->p_server_uri, v));
-        return;
+        }
     }
     apr_table_add(r->headers_out, n, v);
 }
index b92a876f42ba035cd03073a0640fa75320dfeaa4..206020fd8733c9d1b28e705f2f814696b0d689cc 100644 (file)
@@ -16,6 +16,8 @@
 #include <assert.h>
 #include <apr_lib.h>
 #include <apr_strings.h>
+#include <apr_thread_mutex.h>
+#include <apr_thread_cond.h>
 
 #include <httpd.h>
 #include <http_core.h>
@@ -1053,3 +1055,282 @@ const char *h2_proxy_link_reverse_map(request_rec *r,
                   "link_reverse_map %s --> %s", s, ctx.s);
     return ctx.s;
 }
+
+/*******************************************************************************
+ * FIFO queue
+ ******************************************************************************/
+
+struct h2_proxy_fifo {
+    void **elems;
+    int nelems;
+    int set;
+    int head;
+    int count;
+    int aborted;
+    apr_thread_mutex_t *lock;
+    apr_thread_cond_t  *not_empty;
+    apr_thread_cond_t  *not_full;
+};
+
+static int nth_index(h2_proxy_fifo *fifo, int n) 
+{
+    return (fifo->head + n) % fifo->nelems;
+}
+
+static apr_status_t fifo_destroy(void *data) 
+{
+    h2_proxy_fifo *fifo = data;
+
+    apr_thread_cond_destroy(fifo->not_empty);
+    apr_thread_cond_destroy(fifo->not_full);
+    apr_thread_mutex_destroy(fifo->lock);
+
+    return APR_SUCCESS;
+}
+
+static int index_of(h2_proxy_fifo *fifo, void *elem)
+{
+    int i;
+    
+    for (i = 0; i < fifo->count; ++i) {
+        if (elem == fifo->elems[nth_index(fifo, i)]) {
+            return i;
+        }
+    }
+    return -1;
+}
+
+static apr_status_t create_int(h2_proxy_fifo **pfifo, apr_pool_t *pool, 
+                               int capacity, int as_set)
+{
+    apr_status_t rv;
+    h2_proxy_fifo *fifo;
+    
+    fifo = apr_pcalloc(pool, sizeof(*fifo));
+    if (fifo == NULL) {
+        return APR_ENOMEM;
+    }
+
+    rv = apr_thread_mutex_create(&fifo->lock,
+                                 APR_THREAD_MUTEX_UNNESTED, pool);
+    if (rv != APR_SUCCESS) {
+        return rv;
+    }
+
+    rv = apr_thread_cond_create(&fifo->not_empty, pool);
+    if (rv != APR_SUCCESS) {
+        return rv;
+    }
+
+    rv = apr_thread_cond_create(&fifo->not_full, pool);
+    if (rv != APR_SUCCESS) {
+        return rv;
+    }
+
+    fifo->elems = apr_pcalloc(pool, capacity * sizeof(void*));
+    if (fifo->elems == NULL) {
+        return APR_ENOMEM;
+    }
+    fifo->nelems = capacity;
+    fifo->set = as_set;
+    
+    *pfifo = fifo;
+    apr_pool_cleanup_register(pool, fifo, fifo_destroy, apr_pool_cleanup_null);
+
+    return APR_SUCCESS;
+}
+
+apr_status_t h2_proxy_fifo_create(h2_proxy_fifo **pfifo, apr_pool_t *pool, int capacity)
+{
+    return create_int(pfifo, pool, capacity, 0);
+}
+
+apr_status_t h2_proxy_fifo_set_create(h2_proxy_fifo **pfifo, apr_pool_t *pool, int capacity)
+{
+    return create_int(pfifo, pool, capacity, 1);
+}
+
+apr_status_t h2_proxy_fifo_term(h2_proxy_fifo *fifo)
+{
+    apr_status_t rv;
+    if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
+        fifo->aborted = 1;
+        apr_thread_mutex_unlock(fifo->lock);
+    }
+    return rv;
+}
+
+apr_status_t h2_proxy_fifo_interrupt(h2_proxy_fifo *fifo)
+{
+    apr_status_t rv;
+    if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
+        apr_thread_cond_broadcast(fifo->not_empty);
+        apr_thread_cond_broadcast(fifo->not_full);
+        apr_thread_mutex_unlock(fifo->lock);
+    }
+    return rv;
+}
+
+int h2_proxy_fifo_count(h2_proxy_fifo *fifo)
+{
+    return fifo->count;
+}
+
+int h2_proxy_fifo_capacity(h2_proxy_fifo *fifo)
+{
+    return fifo->nelems;
+}
+
+static apr_status_t check_not_empty(h2_proxy_fifo *fifo, int block)
+{
+    if (fifo->count == 0) {
+        if (!block) {
+            return APR_EAGAIN;
+        }
+        while (fifo->count == 0) {
+            if (fifo->aborted) {
+                return APR_EOF;
+            }
+            apr_thread_cond_wait(fifo->not_empty, fifo->lock);
+        }
+    }
+    return APR_SUCCESS;
+}
+
+static apr_status_t fifo_push(h2_proxy_fifo *fifo, void *elem, int block)
+{
+    apr_status_t rv;
+    
+    if (fifo->aborted) {
+        return APR_EOF;
+    }
+
+    if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
+        if (fifo->set && index_of(fifo, elem) >= 0) {
+            /* set mode, elem already member */
+            apr_thread_mutex_unlock(fifo->lock);
+            return APR_EEXIST;
+        }
+        else if (fifo->count == fifo->nelems) {
+            if (block) {
+                while (fifo->count == fifo->nelems) {
+                    if (fifo->aborted) {
+                        apr_thread_mutex_unlock(fifo->lock);
+                        return APR_EOF;
+                    }
+                    apr_thread_cond_wait(fifo->not_full, fifo->lock);
+                }
+            }
+            else {
+                apr_thread_mutex_unlock(fifo->lock);
+                return APR_EAGAIN;
+            }
+        }
+        
+        ap_assert(fifo->count < fifo->nelems);
+        fifo->elems[nth_index(fifo, fifo->count)] = elem;
+        ++fifo->count;
+        if (fifo->count == 1) {
+            apr_thread_cond_broadcast(fifo->not_empty);
+        }
+        apr_thread_mutex_unlock(fifo->lock);
+    }
+    return rv;
+}
+
+apr_status_t h2_proxy_fifo_push(h2_proxy_fifo *fifo, void *elem)
+{
+    return fifo_push(fifo, elem, 1);
+}
+
+apr_status_t h2_proxy_fifo_try_push(h2_proxy_fifo *fifo, void *elem)
+{
+    return fifo_push(fifo, elem, 0);
+}
+
+static void *pull_head(h2_proxy_fifo *fifo)
+{
+    void *elem;
+    
+    ap_assert(fifo->count > 0);
+    elem = fifo->elems[fifo->head];
+    --fifo->count;
+    if (fifo->count > 0) {
+        fifo->head = nth_index(fifo, 1);
+        if (fifo->count+1 == fifo->nelems) {
+            apr_thread_cond_broadcast(fifo->not_full);
+        }
+    }
+    return elem;
+}
+
+static apr_status_t fifo_pull(h2_proxy_fifo *fifo, void **pelem, int block)
+{
+    apr_status_t rv;
+    
+    if (fifo->aborted) {
+        return APR_EOF;
+    }
+    
+    if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
+        if ((rv = check_not_empty(fifo, block)) != APR_SUCCESS) {
+            apr_thread_mutex_unlock(fifo->lock);
+            *pelem = NULL;
+            return rv;
+        }
+
+        ap_assert(fifo->count > 0);
+        *pelem = pull_head(fifo);
+
+        apr_thread_mutex_unlock(fifo->lock);
+    }
+    return rv;
+}
+
+apr_status_t h2_proxy_fifo_pull(h2_proxy_fifo *fifo, void **pelem)
+{
+    return fifo_pull(fifo, pelem, 1);
+}
+
+apr_status_t h2_proxy_fifo_try_pull(h2_proxy_fifo *fifo, void **pelem)
+{
+    return fifo_pull(fifo, pelem, 0);
+}
+
+apr_status_t h2_proxy_fifo_remove(h2_proxy_fifo *fifo, void *elem)
+{
+    apr_status_t rv;
+    
+    if (fifo->aborted) {
+        return APR_EOF;
+    }
+
+    if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
+        int i, rc;
+        void *e;
+        
+        rc = 0;
+        for (i = 0; i < fifo->count; ++i) {
+            e = fifo->elems[nth_index(fifo, i)];
+            if (e == elem) {
+                ++rc;
+            }
+            else if (rc) {
+                fifo->elems[nth_index(fifo, i-rc)] = e;
+            }
+        }
+        if (rc) {
+            fifo->count -= rc;
+            if (fifo->count + rc == fifo->nelems) {
+                apr_thread_cond_broadcast(fifo->not_full);
+            }
+            rv = APR_SUCCESS;
+        }
+        else {
+            rv = APR_EAGAIN;
+        }
+        
+        apr_thread_mutex_unlock(fifo->lock);
+    }
+    return rv;
+}
index f90d14951bc3b10455c138c7c56513c4688e516b..ea4418425676d9f42968427022af8d4a5e48ae41 100644 (file)
@@ -201,4 +201,55 @@ const char *h2_proxy_link_reverse_map(request_rec *r,
                                       const char *proxy_server_uri,
                                       const char *s);
 
+/*******************************************************************************
+ * FIFO queue
+ ******************************************************************************/
+
+/**
+ * A thread-safe FIFO queue with some extra bells and whistles, if you
+ * do not need anything special, better use 'apr_queue'.
+ */
+typedef struct h2_proxy_fifo h2_proxy_fifo;
+
+/**
+ * Create a FIFO queue that can hold up to capacity elements. Elements can
+ * appear several times.
+ */
+apr_status_t h2_proxy_fifo_create(h2_proxy_fifo **pfifo, apr_pool_t *pool, int capacity);
+
+/**
+ * Create a FIFO set that can hold up to capacity elements. Elements only
+ * appear once. Pushing an element already present does not change the
+ * queue and is successful.
+ */
+apr_status_t h2_proxy_fifo_set_create(h2_proxy_fifo **pfifo, apr_pool_t *pool, int capacity);
+
+apr_status_t h2_proxy_fifo_term(h2_proxy_fifo *fifo);
+apr_status_t h2_proxy_fifo_interrupt(h2_proxy_fifo *fifo);
+
+int h2_proxy_fifo_capacity(h2_proxy_fifo *fifo);
+int h2_proxy_fifo_count(h2_proxy_fifo *fifo);
+
+/**
+ * Push en element into the queue. Blocks if there is no capacity left.
+ * 
+ * @param fifo the FIFO queue
+ * @param elem the element to push
+ * @return APR_SUCCESS on push, APR_EAGAIN on try_push on a full queue,
+ *         APR_EEXIST when in set mode and elem already there.
+ */
+apr_status_t h2_proxy_fifo_push(h2_proxy_fifo *fifo, void *elem);
+apr_status_t h2_proxy_fifo_try_push(h2_proxy_fifo *fifo, void *elem);
+
+apr_status_t h2_proxy_fifo_pull(h2_proxy_fifo *fifo, void **pelem);
+apr_status_t h2_proxy_fifo_try_pull(h2_proxy_fifo *fifo, void **pelem);
+
+/**
+ * Remove the elem from the queue, will remove multiple appearances.
+ * @param elem  the element to remove
+ * @return APR_SUCCESS iff > 0 elems were removed, APR_EAGAIN otherwise.
+ */
+apr_status_t h2_proxy_fifo_remove(h2_proxy_fifo *fifo, void *elem);
+
+
 #endif /* defined(__mod_h2__h2_proxy_util__) */
index f37741b61fc8d9ab8cbab45ad7e7bc0885dd65a9..e23cb8d54b4ee6f216bd4f393d5a35130767ee7c 100644 (file)
@@ -72,7 +72,7 @@ static int h2_session_status_from_apr_status(apr_status_t rv)
     return NGHTTP2_ERR_PROTO;
 }
 
-static h2_stream *get_stream(h2_session *session, int stream_id)
+h2_stream *h2_session_stream_get(h2_session *session, int stream_id)
 {
     return nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
 }
@@ -231,7 +231,7 @@ static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags,
     h2_stream * stream;
     int rv = 0;
     
-    stream = get_stream(session, stream_id);
+    stream = h2_session_stream_get(session, stream_id);
     if (stream) {
         status = h2_stream_recv_DATA(stream, flags, data, len);
     }
@@ -256,7 +256,7 @@ static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id,
     h2_stream *stream;
     
     (void)ngh2;
-    stream = get_stream(session, stream_id);
+    stream = h2_session_stream_get(session, stream_id);
     if (stream) {
         if (error_code) {
             ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
@@ -278,7 +278,7 @@ static int on_begin_headers_cb(nghttp2_session *ngh2,
     /* We may see HEADERs at the start of a stream or after all DATA
      * streams to carry trailers. */
     (void)ngh2;
-    s = get_stream(session, frame->hd.stream_id);
+    s = h2_session_stream_get(session, frame->hd.stream_id);
     if (s) {
         /* nop */
     }
@@ -299,7 +299,7 @@ static int on_header_cb(nghttp2_session *ngh2, const nghttp2_frame *frame,
     apr_status_t status;
     
     (void)flags;
-    stream = get_stream(session, frame->hd.stream_id);
+    stream = h2_session_stream_get(session, frame->hd.stream_id);
     if (!stream) {
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(02920) 
                       "h2_stream(%ld-%d): on_header unknown stream",
@@ -344,13 +344,13 @@ static int on_frame_recv_cb(nghttp2_session *ng2s,
             /* This can be HEADERS for a new stream, defining the request,
              * or HEADER may come after DATA at the end of a stream as in
              * trailers */
-            stream = get_stream(session, frame->hd.stream_id);
+            stream = h2_session_stream_get(session, frame->hd.stream_id);
             if (stream) {
                 h2_stream_recv_frame(stream, NGHTTP2_HEADERS, frame->hd.flags);
             }
             break;
         case NGHTTP2_DATA:
-            stream = get_stream(session, frame->hd.stream_id);
+            stream = h2_session_stream_get(session, frame->hd.stream_id);
             if (stream) {
                 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,  
                               H2_STRM_LOG(APLOGNO(02923), stream, 
@@ -380,7 +380,7 @@ static int on_frame_recv_cb(nghttp2_session *ng2s,
                           "h2_stream(%ld-%d): RST_STREAM by client, errror=%d",
                           session->id, (int)frame->hd.stream_id,
                           (int)frame->rst_stream.error_code);
-            stream = get_stream(session, frame->hd.stream_id);
+            stream = h2_session_stream_get(session, frame->hd.stream_id);
             if (stream && stream->initiated_on) {
                 ++session->pushes_reset;
             }
@@ -453,7 +453,7 @@ static int on_send_data_cb(nghttp2_session *ngh2,
     }
     padlen = (unsigned char)frame->data.padlen;
     
-    stream = get_stream(session, stream_id);
+    stream = h2_session_stream_get(session, stream_id);
     if (!stream) {
         ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_NOTFOUND, session->c,
                       APLOGNO(02924) 
@@ -542,7 +542,7 @@ static int on_frame_send_cb(nghttp2_session *ngh2,
                      (long)session->frames_sent);
     }
     
-    stream = get_stream(session, stream_id);
+    stream = h2_session_stream_get(session, stream_id);
     if (stream) {
         h2_stream_send_frame(stream, frame->hd.type, frame->hd.flags);
     }
@@ -566,7 +566,7 @@ static int on_invalid_header_cb(nghttp2_session *ngh2,
                       apr_pstrndup(session->pool, (const char *)name, namelen),
                       apr_pstrndup(session->pool, (const char *)value, valuelen));
     }
-    stream = get_stream(session, frame->hd.stream_id);
+    stream = h2_session_stream_get(session, frame->hd.stream_id);
     if (stream) {
         h2_stream_rst(stream, NGHTTP2_PROTOCOL_ERROR);
     }
@@ -1028,7 +1028,7 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
     (void)ng2s;
     (void)buf;
     (void)source;
-    stream = get_stream(session, stream_id);
+    stream = h2_session_stream_get(session, stream_id);
     if (!stream) {
         ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c,
                       APLOGNO(02937) 
@@ -1449,7 +1449,7 @@ static void h2_session_in_flush(h2_session *session)
     int id;
     
     while ((id = h2_iq_shift(session->in_process)) > 0) {
-        h2_stream *stream = get_stream(session, id);
+        h2_stream *stream = h2_session_stream_get(session, id);
         if (stream) {
             ap_assert(!stream->scheduled);
             if (h2_stream_prep_processing(stream) == APR_SUCCESS) {
@@ -1462,7 +1462,7 @@ static void h2_session_in_flush(h2_session *session)
     }
 
     while ((id = h2_iq_shift(session->in_pending)) > 0) {
-        h2_stream *stream = get_stream(session, id);
+        h2_stream *stream = h2_session_stream_get(session, id);
         if (stream) {
             h2_stream_flush_input(stream);
         }
index 5751aed7bd618abde48fecea909550163a50aaa4..7a3ca3ca3810a92617d7bbd000f862224e76a64f 100644 (file)
@@ -194,6 +194,11 @@ void h2_session_close(h2_session *session);
  */
 int h2_session_push_enabled(h2_session *session);
 
+/**
+ * Look up the stream in this session with the given id.
+ */
+struct h2_stream *h2_session_stream_get(h2_session *session, int stream_id);
+
 /**
  * Submit a push promise on the stream and schedule the new steam for
  * processing..
index 7bf35aa3b27247ef2c32a098b0cab5e312a66459..9784b4ec28d9feee5fefdc88e8c6351baa92b95b 100644 (file)
@@ -764,18 +764,77 @@ static apr_bucket *get_first_headers_bucket(apr_bucket_brigade *bb)
     return NULL;
 }
 
+static apr_status_t add_data(h2_stream *stream, apr_off_t requested,
+                             apr_off_t *plen, int *peos, int *complete, 
+                             h2_headers **pheaders)
+{
+    apr_bucket *b, *e;
+    
+    *peos = 0;
+    *plen = 0;
+    *complete = 0;
+    if (pheaders) {
+        *pheaders = NULL;
+    }
+
+    H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "add_data");
+    b = APR_BRIGADE_FIRST(stream->out_buffer);
+    while (b != APR_BRIGADE_SENTINEL(stream->out_buffer)) {
+        e = APR_BUCKET_NEXT(b);
+        if (APR_BUCKET_IS_METADATA(b)) {
+            if (APR_BUCKET_IS_FLUSH(b)) {
+                APR_BUCKET_REMOVE(b);
+                apr_bucket_destroy(b);
+            }
+            else if (APR_BUCKET_IS_EOS(b)) {
+                *peos = 1;
+                return APR_SUCCESS;
+            }
+            else if (H2_BUCKET_IS_HEADERS(b)) {
+                if (*plen > 0) {
+                    /* data before the response, can only return up to here */
+                    return APR_SUCCESS;
+                }
+                else if (pheaders) {
+                    *pheaders = h2_bucket_headers_get(b);
+                    APR_BUCKET_REMOVE(b);
+                    apr_bucket_destroy(b);
+                    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
+                                  H2_STRM_MSG(stream, "prep, -> response %d"), 
+                                  (*pheaders)->status);
+                    return APR_SUCCESS;
+                }
+                else {
+                    return APR_EAGAIN;
+                }
+            }
+        }
+        else if (b->length == 0) {
+            APR_BUCKET_REMOVE(b);
+            apr_bucket_destroy(b);
+        }
+        else {
+            ap_assert(b->length != (apr_size_t)-1);
+            *plen += b->length;
+            if (*plen >= requested) {
+                *plen = requested;
+                return APR_SUCCESS;
+            }
+        }
+        b = e;
+    }
+    *complete = 1;
+    return APR_SUCCESS;
+}
+
 apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen, 
-                                   int *peos, h2_headers **presponse)
+                                   int *peos, h2_headers **pheaders)
 {
     apr_status_t status = APR_SUCCESS;
-    apr_off_t requested, max_chunk = H2_DATA_CHUNK_SIZE;
-    apr_bucket *b, *e;
+    apr_off_t requested, missing, max_chunk = H2_DATA_CHUNK_SIZE;
     conn_rec *c;
+    int complete;
 
-    if (presponse) {
-        *presponse = NULL;
-    }
-    
     ap_assert(stream);
     
     if (stream->rst_error) {
@@ -793,15 +852,34 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen,
     if (stream->session->io.write_size > 0) {
         max_chunk = stream->session->io.write_size - 9; /* header bits */ 
     }
-    *plen = requested = (*plen > 0)? H2MIN(*plen, max_chunk) : max_chunk;
+    requested = (*plen > 0)? H2MIN(*plen, max_chunk) : max_chunk;
+    
+    /* count the buffered data until eos or a headers bucket */
+    status = add_data(stream, requested, plen, peos, &complete, pheaders);
+    
+    if (status == APR_EAGAIN) {
+        /* TODO: ugly, someone needs to retrieve the response first */
+        h2_mplx_keep_active(stream->session->mplx, stream);
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+                      H2_STRM_MSG(stream, "prep, response eagain"));
+        return status;
+    }
+    else if (status != APR_SUCCESS) {
+        return status;
+    }
     
-    h2_util_bb_avail(stream->out_buffer, plen, peos);
-    if (!*peos && *plen < requested && *plen < stream->max_mem) {
-        H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "pre");
+    if (pheaders && *pheaders) {
+        return APR_SUCCESS;
+    }
+    
+    missing = H2MIN(requested, stream->max_mem) - *plen;
+    if (complete && !*peos && missing > 0) {
         if (stream->output) {
+            H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "pre");
             status = h2_beam_receive(stream->output, stream->out_buffer, 
                                      APR_NONBLOCK_READ, 
                                      stream->max_mem - *plen);
+            H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "post");
         }
         else {
             status = APR_EOF;
@@ -810,79 +888,24 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen,
         if (APR_STATUS_IS_EOF(status)) {
             apr_bucket *eos = apr_bucket_eos_create(c->bucket_alloc);
             APR_BRIGADE_INSERT_TAIL(stream->out_buffer, eos);
+            *peos = 1;
             status = APR_SUCCESS;
         }
-        else if (status == APR_EAGAIN) {
-            status = APR_SUCCESS;
-        }
-        *plen = requested;
-        h2_util_bb_avail(stream->out_buffer, plen, peos);
-        H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "post");
-    }
-    else {
-        H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "ok");
-    }
-
-    b = APR_BRIGADE_FIRST(stream->out_buffer);
-    while (b != APR_BRIGADE_SENTINEL(stream->out_buffer)) {
-        e = APR_BUCKET_NEXT(b);
-        if (APR_BUCKET_IS_FLUSH(b)
-            || (!APR_BUCKET_IS_METADATA(b) && b->length == 0)) {
-            APR_BUCKET_REMOVE(b);
-            apr_bucket_destroy(b);
-        }
-        else {
-            break;
+        else if (status == APR_SUCCESS) {
+            /* do it again, now that we have gotten more */
+            status = add_data(stream, requested, plen, peos, &complete, pheaders);
         }
-        b = e;
     }
-
-    b = get_first_headers_bucket(stream->out_buffer);
-    if (b) {
-        /* there are HEADERS to submit */
-        *peos = 0;
-        *plen = 0;
-        if (b == APR_BRIGADE_FIRST(stream->out_buffer)) {
-            if (presponse) {
-                *presponse = h2_bucket_headers_get(b);
-                APR_BUCKET_REMOVE(b);
-                apr_bucket_destroy(b);
-                status = APR_SUCCESS;
-            }
-            else {
-                /* someone needs to retrieve the response first */
-                h2_mplx_keep_active(stream->session->mplx, stream->id);
-                status = APR_EAGAIN;
-            }
-        }
-        else {
-            apr_bucket *e = APR_BRIGADE_FIRST(stream->out_buffer);
-            while (e != APR_BRIGADE_SENTINEL(stream->out_buffer)) {
-                if (e == b) {
-                    break;
-                }
-                else if (e->length != (apr_size_t)-1) {
-                    *plen += e->length;
-                }
-                e = APR_BUCKET_NEXT(e);
-            }
-        }
-    }
-
+    
     if (status == APR_SUCCESS) {
-        if (presponse && *presponse) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
-                          H2_STRM_MSG(stream, "prepare, response %d"), 
-                          (*presponse)->status);
-        }
-        else if (*peos || *plen) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
+        if (*peos || *plen) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
                           H2_STRM_MSG(stream, "prepare, len=%ld eos=%d"),
                           (long)*plen, *peos);
         }
         else {
             status = APR_EAGAIN;
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
                           H2_STRM_MSG(stream, "prepare, no data"));
         }
     }
index 5ab485faab3c0afe07aa2d31d4e69a414212874e..1ef0d9a887a49d6962a873622e4ae4985ba353e6 100644 (file)
@@ -383,7 +383,7 @@ static apr_status_t h2_filter_parse_h1(ap_filter_t* f, apr_bucket_brigade* bb)
     /* There are cases where we need to parse a serialized http/1.1 
      * response. One example is a 100-continue answer in serialized mode
      * or via a mod_proxy setup */
-    while (!task->output.sent_response) {
+    while (bb && !task->output.sent_response) {
         status = h2_from_h1_parse_response(task, f, bb);
         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, f->c,
                       "h2_task(%s): parsed response", task->id);
index 0389193e88f9752fe9c773d838113e90f50efffc..0ac65ccf656b1a6a7f446174fc5df32348ad0311 100644 (file)
@@ -438,12 +438,12 @@ int h2_iq_count(h2_iqueue *q)
 }
 
 
-void h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx)
+int h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx)
 {
     int i;
     
     if (h2_iq_contains(q, sid)) {
-        return;
+        return 0;
     }
     if (q->nelts >= q->nalloc) {
         iq_grow(q, q->nalloc * 2);
@@ -456,11 +456,12 @@ void h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx)
         /* bubble it to the front of the queue */
         iq_bubble_up(q, i, q->head, cmp, ctx);
     }
+    return 1;
 }
 
-void h2_iq_append(h2_iqueue *q, int sid)
+int h2_iq_append(h2_iqueue *q, int sid)
 {
-    h2_iq_add(q, sid, NULL, NULL);
+    return h2_iq_add(q, sid, NULL, NULL);
 }
 
 int h2_iq_remove(h2_iqueue *q, int sid)
@@ -612,6 +613,7 @@ int h2_iq_contains(h2_iqueue *q, int sid)
 struct h2_fifo {
     void **elems;
     int nelems;
+    int set;
     int head;
     int count;
     int aborted;
@@ -636,7 +638,20 @@ static apr_status_t fifo_destroy(void *data)
     return APR_SUCCESS;
 }
 
-apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity)
+static int index_of(h2_fifo *fifo, void *elem)
+{
+    int i;
+    
+    for (i = 0; i < fifo->count; ++i) {
+        if (elem == fifo->elems[nth_index(fifo, i)]) {
+            return i;
+        }
+    }
+    return -1;
+}
+
+static apr_status_t create_int(h2_fifo **pfifo, apr_pool_t *pool, 
+                               int capacity, int as_set)
 {
     apr_status_t rv;
     h2_fifo *fifo;
@@ -667,6 +682,7 @@ apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity)
         return APR_ENOMEM;
     }
     fifo->nelems = capacity;
+    fifo->set = as_set;
     
     *pfifo = fifo;
     apr_pool_cleanup_register(pool, fifo, fifo_destroy, apr_pool_cleanup_null);
@@ -674,6 +690,16 @@ apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity)
     return APR_SUCCESS;
 }
 
+apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity)
+{
+    return create_int(pfifo, pool, capacity, 0);
+}
+
+apr_status_t h2_fifo_set_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity)
+{
+    return create_int(pfifo, pool, capacity, 1);
+}
+
 apr_status_t h2_fifo_term(h2_fifo *fifo)
 {
     apr_status_t rv;
@@ -725,7 +751,12 @@ static apr_status_t fifo_push(h2_fifo *fifo, void *elem, int block)
     }
 
     if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
-        if (fifo->count == fifo->nelems) {
+        if (fifo->set && index_of(fifo, elem) >= 0) {
+            /* set mode, elem already member */
+            apr_thread_mutex_unlock(fifo->lock);
+            return APR_EEXIST;
+        }
+        else if (fifo->count == fifo->nelems) {
             if (block) {
                 while (fifo->count == fifo->nelems) {
                     if (fifo->aborted) {
@@ -762,6 +793,22 @@ apr_status_t h2_fifo_try_push(h2_fifo *fifo, void *elem)
     return fifo_push(fifo, elem, 0);
 }
 
+static void *pull_head(h2_fifo *fifo)
+{
+    void *elem;
+    
+    ap_assert(fifo->count > 0);
+    elem = fifo->elems[fifo->head];
+    --fifo->count;
+    if (fifo->count > 0) {
+        fifo->head = nth_index(fifo, 1);
+        if (fifo->count+1 == fifo->nelems) {
+            apr_thread_cond_broadcast(fifo->not_full);
+        }
+    }
+    return elem;
+}
+
 static apr_status_t fifo_pull(h2_fifo *fifo, void **pelem, int block)
 {
     apr_status_t rv;
@@ -778,14 +825,8 @@ static apr_status_t fifo_pull(h2_fifo *fifo, void **pelem, int block)
         }
 
         ap_assert(fifo->count > 0);
-        *pelem = fifo->elems[fifo->head];
-        --fifo->count;
-        if (fifo->count > 0) {
-            fifo->head = nth_index(fifo, 1);
-            if (fifo->count+1 == fifo->nelems) {
-                apr_thread_cond_broadcast(fifo->not_full);
-            }
-        }
+        *pelem = pull_head(fifo);
+
         apr_thread_mutex_unlock(fifo->lock);
     }
     return rv;
@@ -817,29 +858,18 @@ static apr_status_t fifo_peek(h2_fifo *fifo, h2_fifo_peek_fn *fn, void *ctx, int
         }
 
         ap_assert(fifo->count > 0);
-        elem = fifo->elems[fifo->head];
+        elem = pull_head(fifo);
         
+        apr_thread_mutex_unlock(fifo->lock);
+
         switch (fn(elem, ctx)) {
             case H2_FIFO_OP_PULL:
-                --fifo->count;
-                if (fifo->count > 0) {
-                    fifo->head = nth_index(fifo, 1);
-                    if (fifo->count+1 == fifo->nelems) {
-                        apr_thread_cond_broadcast(fifo->not_full);
-                    }
-                }
                 break;
             case H2_FIFO_OP_REPUSH:
-                if (fifo->count > 1) {
-                    fifo->head = nth_index(fifo, 1);
-                    if (fifo->count < fifo->nelems) {
-                        fifo->elems[nth_index(fifo, fifo->count-1)] = elem;
-                    }
-                }
+                return h2_fifo_push(fifo, elem);
                 break;
         }
         
-        apr_thread_mutex_unlock(fifo->lock);
     }
     return rv;
 }
index f6a4b9a43dfed5f261c46811fa9055f0ea3cd4a3..9b408fad3df47fb8f406d4ce3dd57ed48b4f1e36 100644 (file)
@@ -119,17 +119,19 @@ int h2_iq_count(h2_iqueue *q);
  * @param q the queue to append the id to
  * @param sid the stream id to add
  * @param cmp the comparator for sorting
- * @param ctx user data for comparator 
+ * @param ctx user data for comparator
+ * @return != 0 iff id was not already there 
  */
-void h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx);
+int h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx);
 
 /**
  * Append the id to the queue if not already present. 
  *
  * @param q the queue to append the id to
  * @param sid the id to append
+ * @return != 0 iff id was not already there 
  */
-void h2_iq_append(h2_iqueue *q, int sid);
+int h2_iq_append(h2_iqueue *q, int sid);
 
 /**
  * Remove the stream id from the queue. Return != 0 iff task
@@ -193,12 +195,32 @@ int h2_iq_contains(h2_iqueue *q, int sid);
  */
 typedef struct h2_fifo h2_fifo;
 
+/**
+ * Create a FIFO queue that can hold up to capacity elements. Elements can
+ * appear several times.
+ */
 apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity);
+
+/**
+ * Create a FIFO set that can hold up to capacity elements. Elements only
+ * appear once. Pushing an element already present does not change the
+ * queue and is successful.
+ */
+apr_status_t h2_fifo_set_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity);
+
 apr_status_t h2_fifo_term(h2_fifo *fifo);
 apr_status_t h2_fifo_interrupt(h2_fifo *fifo);
 
 int h2_fifo_count(h2_fifo *fifo);
 
+/**
+ * Push en element into the queue. Blocks if there is no capacity left.
+ * 
+ * @param fifo the FIFO queue
+ * @param elem the element to push
+ * @return APR_SUCCESS on push, APR_EAGAIN on try_push on a full queue,
+ *         APR_EEXIST when in set mode and elem already there.
+ */
 apr_status_t h2_fifo_push(h2_fifo *fifo, void *elem);
 apr_status_t h2_fifo_try_push(h2_fifo *fifo, void *elem);
 
index e6765e5a03cb60da82676ee94995b90c8e4686e4..528d21aed7d30c84813ddb07598f19c11278b381 100644 (file)
@@ -26,7 +26,7 @@
  * @macro
  * Version number of the http2 module as c string
  */
-#define MOD_HTTP2_VERSION "1.10.0"
+#define MOD_HTTP2_VERSION "1.10.1"
 
 /**
  * @macro
@@ -34,7 +34,7 @@
  * release. This is a 24 bit number with 8 bits for major number, 8 bits
  * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
  */
-#define MOD_HTTP2_VERSION_NUM 0x010a00
+#define MOD_HTTP2_VERSION_NUM 0x010a01
 
 
 #endif /* mod_h2_h2_version_h */
index fa395255e9f7a695b557af91d52a5021e14b0d43..9c7afc64e6c0750882810e1f0a81c7fcfae3d10d 100644 (file)
@@ -39,6 +39,7 @@ struct h2_slot {
     int sticks;
     h2_task *task;
     apr_thread_t *thread;
+    apr_thread_mutex_t *lock;
     apr_thread_cond_t *not_idle;
 };
 
@@ -78,6 +79,17 @@ static apr_status_t activate_slot(h2_workers *workers, h2_slot *slot)
     slot->workers = workers;
     slot->aborted = 0;
     slot->task = NULL;
+
+    if (!slot->lock) {
+        status = apr_thread_mutex_create(&slot->lock,
+                                         APR_THREAD_MUTEX_DEFAULT,
+                                         workers->pool);
+        if (status != APR_SUCCESS) {
+            push_slot(&workers->free, slot);
+            return status;
+        }
+    }
+
     if (!slot->not_idle) {
         status = apr_thread_cond_create(&slot->not_idle, workers->pool);
         if (status != APR_SUCCESS) {
@@ -95,7 +107,7 @@ static apr_status_t activate_slot(h2_workers *workers, h2_slot *slot)
         return APR_ENOMEM;
     }
     
-    ++workers->worker_count;
+    apr_atomic_inc32(&workers->worker_count);
     return APR_SUCCESS;
 }
 
@@ -112,9 +124,9 @@ static void wake_idle_worker(h2_workers *workers)
 {
     h2_slot *slot = pop_slot(&workers->idle);
     if (slot) {
-        apr_thread_mutex_lock(workers->lock);
+        apr_thread_mutex_lock(slot->lock);
         apr_thread_cond_signal(slot->not_idle);
-        apr_thread_mutex_unlock(workers->lock);
+        apr_thread_mutex_unlock(slot->lock);
     }
     else if (workers->dynamic) {
         add_worker(workers);
@@ -130,7 +142,7 @@ static void cleanup_zombies(h2_workers *workers)
             apr_thread_join(&status, slot->thread);
             slot->thread = NULL;
         }
-        --workers->worker_count;
+        apr_atomic_dec32(&workers->worker_count);
         push_slot(&workers->free, slot);
     }
 }
@@ -185,15 +197,12 @@ static apr_status_t get_next(h2_slot *slot)
             return APR_SUCCESS;
         }
         
-        apr_thread_mutex_lock(workers->lock);
         cleanup_zombies(workers);
 
-        ++workers->idle_workers;
+        apr_thread_mutex_lock(slot->lock);
         push_slot(&workers->idle, slot);
-        apr_thread_cond_wait(slot->not_idle, workers->lock);
-        --workers->idle_workers;
-
-        apr_thread_mutex_unlock(workers->lock);
+        apr_thread_cond_wait(slot->not_idle, slot->lock);
+        apr_thread_mutex_unlock(slot->lock);
     }
     return APR_EOF;
 }
@@ -239,24 +248,25 @@ static apr_status_t workers_pool_cleanup(void *data)
     h2_slot *slot;
     
     if (!workers->aborted) {
-        apr_thread_mutex_lock(workers->lock);
         workers->aborted = 1;
-        /* before we go, cleanup any zombies and abort the rest */
-        cleanup_zombies(workers);
+        /* abort all idle slots */
         for (;;) {
             slot = pop_slot(&workers->idle);
             if (slot) {
+                apr_thread_mutex_lock(slot->lock);
                 slot->aborted = 1;
                 apr_thread_cond_signal(slot->not_idle);
+                apr_thread_mutex_unlock(slot->lock);
             }
             else {
                 break;
             }
         }
-        apr_thread_mutex_unlock(workers->lock);
 
         h2_fifo_term(workers->mplxs);
         h2_fifo_interrupt(workers->mplxs);
+
+        cleanup_zombies(workers);
     }
     return APR_SUCCESS;
 }
index 30a7514cd0cdf2028dc420b0902140f09a60ddfe..7964b3c3aa6a130f168c5fd5f9304a42136a24c1 100644 (file)
@@ -40,8 +40,6 @@ struct h2_workers {
     int next_worker_id;
     int min_workers;
     int max_workers;
-    int worker_count;
-    int idle_workers;
     int max_idle_secs;
     
     int aborted;
@@ -51,6 +49,8 @@ struct h2_workers {
     int nslots;
     struct h2_slot *slots;
     
+    volatile apr_uint32_t worker_count;
+    
     struct h2_slot *free;
     struct h2_slot *idle;
     struct h2_slot *zombies;
index ef23d0c428faf566846728c506df1835b5018230..5b2a798996e701ed05c09f960d5a4ea289b17f73 100644 (file)
@@ -26,6 +26,8 @@
 #include "h2_version.h"
 #include "h2_proxy_session.h"
 
+#define H2MIN(x,y) ((x) < (y) ? (x) : (y))
+
 static void register_hook(apr_pool_t *p);
 
 AP_DECLARE_MODULE(proxy_http2) = {
@@ -65,7 +67,7 @@ typedef struct h2_proxy_ctx {
     const char *engine_type;
     apr_pool_t *engine_pool;    
     apr_size_t req_buffer_size;
-    request_rec *next;
+    h2_proxy_fifo *requests;
     int capacity;
     
     unsigned standalone : 1;
@@ -218,36 +220,23 @@ static apr_status_t proxy_engine_init(h2_req_engine *engine,
 {
     h2_proxy_ctx *ctx = ap_get_module_config(r->connection->conn_config, 
                                              &proxy_http2_module);
-    if (ctx) {
-        conn_rec *c = ctx->owner;
-        h2_proxy_ctx *nctx;
-        
-        /* we need another lifetime for this. If we do not host
-         * an engine, the context lives in r->pool. Since we expect
-         * to server more than r, we need to live longer */
-        nctx = apr_pcalloc(pool, sizeof(*nctx));
-        if (nctx == NULL) {
-            return APR_ENOMEM;
-        }
-        memcpy(nctx, ctx, sizeof(*nctx));
-        ctx = nctx;
-        ctx->pool = pool;
-        ctx->engine = engine;
-        ctx->engine_id = id;
-        ctx->engine_type = type;
-        ctx->engine_pool = pool;
-        ctx->req_buffer_size = req_buffer_size;
-        ctx->capacity = 100;
-
-        ap_set_module_config(c->conn_config, &proxy_http2_module, ctx);
-
-        *pconsumed = out_consumed;
-        *pctx = ctx;
-        return APR_SUCCESS;
+    if (!ctx) {
+        ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, r, APLOGNO(03368)
+                      "h2_proxy_session, engine init, no ctx found");
+        return APR_ENOTIMPL;
     }
-    ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, r, APLOGNO(03368)
-                  "h2_proxy_session, engine init, no ctx found");
-    return APR_ENOTIMPL;
+    
+    ctx->pool = pool;
+    ctx->engine = engine;
+    ctx->engine_id = id;
+    ctx->engine_type = type;
+    ctx->engine_pool = pool;
+    ctx->req_buffer_size = req_buffer_size;
+    ctx->capacity = H2MIN(100, h2_proxy_fifo_capacity(ctx->requests));
+    
+    *pconsumed = out_consumed;
+    *pctx = ctx;
+    return APR_SUCCESS;
 }
 
 static apr_status_t add_request(h2_proxy_session *session, request_rec *r)
@@ -270,10 +259,9 @@ static apr_status_t add_request(h2_proxy_session *session, request_rec *r)
     return status;
 }
 
-static void request_done(h2_proxy_session *session, request_rec *r,
+static void request_done(h2_proxy_ctx *ctx, request_rec *r,
                          apr_status_t status, int touched)
 {   
-    h2_proxy_ctx *ctx = session->user_data;
     const char *task_id = apr_table_get(r->connection->notes, H2_TASK_ID_NOTE);
 
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, r->connection, 
@@ -282,35 +270,26 @@ static void request_done(h2_proxy_session *session, request_rec *r,
     if (status != APR_SUCCESS) {
         if (!touched) {
             /* untouched request, need rescheduling */
-            if (req_engine_push && is_h2 && is_h2(ctx->owner)) {
-                if (req_engine_push(ctx->engine_type, r, NULL) == APR_SUCCESS) {
-                    /* push to engine */
-                    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r->connection, 
-                                  APLOGNO(03369)
-                                  "h2_proxy_session(%s): rescheduled request %s",
-                                  ctx->engine_id, task_id);
-                    return;
-                }
-            }
-            else if (!ctx->next) {
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, r->connection, 
-                              "h2_proxy_session(%s): retry untouched request",
-                              ctx->engine_id);
-                ctx->next = r;
-            }
+            status = h2_proxy_fifo_push(ctx->requests, r);
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, r->connection, 
+                          APLOGNO(03369)
+                          "h2_proxy_session(%s): rescheduled request %s",
+                          ctx->engine_id, task_id);
+            return;
         }
         else {
             const char *uri;
             uri = apr_uri_unparse(r->pool, &r->parsed_uri, 0);
             ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, r->connection, 
                           APLOGNO(03471) "h2_proxy_session(%s): request %s -> %s "
-                          "not complete, was touched",
+                          "not complete, cannot repeat", 
                           ctx->engine_id, task_id, uri);
         }
     }
     
     if (r == ctx->rbase) {
-        ctx->r_status = (status == APR_SUCCESS)? APR_SUCCESS : HTTP_SERVICE_UNAVAILABLE;
+        ctx->r_status = ((status == APR_SUCCESS)? APR_SUCCESS
+                         : HTTP_SERVICE_UNAVAILABLE);
     }
     
     if (req_engine_done && ctx->engine) {
@@ -322,21 +301,32 @@ static void request_done(h2_proxy_session *session, request_rec *r,
     }
 }    
 
+static void session_req_done(h2_proxy_session *session, request_rec *r,
+                             apr_status_t status, int touched)
+{
+    request_done(session->user_data, r, status, touched);
+}
+
 static apr_status_t next_request(h2_proxy_ctx *ctx, int before_leave)
 {
-    if (ctx->next) {
+    if (h2_proxy_fifo_count(ctx->requests) > 0) {
         return APR_SUCCESS;
     }
     else if (req_engine_pull && ctx->engine) {
         apr_status_t status;
+        request_rec *r = NULL;
+        
         status = req_engine_pull(ctx->engine, before_leave? 
                                  APR_BLOCK_READ: APR_NONBLOCK_READ, 
-                                 ctx->capacity, &ctx->next);
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, ctx->owner, 
-                      "h2_proxy_engine(%s): pulled request (%s) %s", 
-                      ctx->engine_id, 
-                      before_leave? "before leave" : "regular", 
-                      (ctx->next? ctx->next->the_request : "NULL"));
+                                 ctx->capacity, &r);
+        if (status == APR_SUCCESS && r) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, ctx->owner, 
+                          "h2_proxy_engine(%s): pulled request (%s) %s", 
+                          ctx->engine_id, 
+                          before_leave? "before leave" : "regular", 
+                          r->the_request);
+            h2_proxy_fifo_push(ctx->requests, r);
+        }
         return APR_STATUS_IS_EAGAIN(status)? APR_SUCCESS : status;
     }
     return APR_EOF;
@@ -345,6 +335,7 @@ static apr_status_t next_request(h2_proxy_ctx *ctx, int before_leave)
 static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) {
     apr_status_t status = OK;
     int h2_front;
+    request_rec *r;
     
     /* Step Four: Send the Request in a new HTTP/2 stream and
      * loop until we got the response or encounter errors.
@@ -355,7 +346,7 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) {
     ctx->session = h2_proxy_session_setup(ctx->engine_id, ctx->p_conn, ctx->conf,
                                           h2_front, 30, 
                                           h2_proxy_log2((int)ctx->req_buffer_size), 
-                                          request_done);
+                                          session_req_done);
     if (!ctx->session) {
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner, 
                       APLOGNO(03372) "session unavailable");
@@ -366,10 +357,9 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) {
                   "eng(%s): run session %s", ctx->engine_id, ctx->session->id);
     ctx->session->user_data = ctx;
     
-    while (1) {
-        if (ctx->next) {
-            add_request(ctx->session, ctx->next);
-            ctx->next = NULL;
+    while (!ctx->owner->aborted) {
+        if (APR_SUCCESS == h2_proxy_fifo_try_pull(ctx->requests, (void**)&r)) {
+            add_request(ctx->session, r);
         }
         
         status = h2_proxy_session_process(ctx->session);
@@ -379,7 +369,8 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) {
             /* ongoing processing, call again */
             if (ctx->session->remote_max_concurrent > 0
                 && ctx->session->remote_max_concurrent != ctx->capacity) {
-                ctx->capacity = (int)ctx->session->remote_max_concurrent;
+                ctx->capacity = H2MIN((int)ctx->session->remote_max_concurrent, 
+                                      h2_proxy_fifo_capacity(ctx->requests));
             }
             s2 = next_request(ctx, 0);
             if (s2 == APR_ECONNABORTED) {
@@ -395,7 +386,8 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) {
                 status = ctx->r_status = APR_SUCCESS;
                 break;
             }
-            if (!ctx->next && h2_proxy_ihash_empty(ctx->session->streams)) {
+            if ((h2_proxy_fifo_count(ctx->requests) == 0) 
+                && h2_proxy_ihash_empty(ctx->session->streams)) {
                 break;
             }
         }
@@ -409,7 +401,7 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) {
              * a) be reopened on the new session iff safe to do so
              * b) reported as done (failed) otherwise
              */
-            h2_proxy_session_cleanup(ctx->session, request_done);
+            h2_proxy_session_cleanup(ctx->session, session_req_done);
             break;
         }
     }
@@ -420,7 +412,7 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) {
     return status;
 }
 
-static h2_proxy_ctx *push_request_somewhere(h2_proxy_ctx *ctx)
+static apr_status_t push_request_somewhere(h2_proxy_ctx *ctx, request_rec *r)
 {
     conn_rec *c = ctx->owner;
     const char *engine_type, *hostname;
@@ -430,21 +422,15 @@ static h2_proxy_ctx *push_request_somewhere(h2_proxy_ctx *ctx)
     engine_type = apr_psprintf(ctx->pool, "proxy_http2 %s%s", hostname, 
                                ctx->server_portstr);
     
-    if (c->master && req_engine_push && ctx->next && is_h2 && is_h2(c)) {
+    if (c->master && req_engine_push && r && is_h2 && is_h2(c)) {
         /* If we are have req_engine capabilities, push the handling of this
          * request (e.g. slave connection) to a proxy_http2 engine which 
          * uses the same backend. We may be called to create an engine 
          * ourself. */
-        if (req_engine_push(engine_type, ctx->next, proxy_engine_init)
-            == APR_SUCCESS) {
-            /* to renew the lifetime, we might have set a new ctx */
-            ctx = ap_get_module_config(c->conn_config, &proxy_http2_module);
+        if (req_engine_push(engine_type, r, proxy_engine_init) == APR_SUCCESS) {
             if (ctx->engine == NULL) {
-                /* Another engine instance has taken over processing of this
-                 * request. */
-                ctx->r_status = SUSPENDED;
-                ctx->next = NULL;
-                return ctx;
+                /* request has been assigned to an engine in another thread */
+                return SUSPENDED;
             }
         }
     }
@@ -465,7 +451,8 @@ static h2_proxy_ctx *push_request_somewhere(h2_proxy_ctx *ctx)
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, 
                       "H2: hosting engine %s", ctx->engine_id);
     }
-    return ctx;
+
+    return h2_proxy_fifo_push(ctx->requests, r);
 }
 
 static int proxy_http2_handler(request_rec *r, 
@@ -482,7 +469,7 @@ static int proxy_http2_handler(request_rec *r,
     apr_status_t status;
     h2_proxy_ctx *ctx;
     apr_uri_t uri;
-    int reconnected = 0;
+    int reconnects = 0;
     
     /* find the scheme */
     if ((url[0] != 'h' && url[0] != 'H') || url[1] != '2') {
@@ -507,6 +494,7 @@ static int proxy_http2_handler(request_rec *r,
         default:
             return DECLINED;
     }
+    
     ctx = apr_pcalloc(r->pool, sizeof(*ctx));
     ctx->owner      = r->connection;
     ctx->pool       = r->pool;
@@ -518,8 +506,9 @@ static int proxy_http2_handler(request_rec *r,
     ctx->conf       = conf;
     ctx->flushall   = apr_table_get(r->subprocess_env, "proxy-flushall")? 1 : 0;
     ctx->r_status   = HTTP_SERVICE_UNAVAILABLE;
-    ctx->next       = r;
-    r = NULL;
+    
+    h2_proxy_fifo_set_create(&ctx->requests, ctx->pool, 100);
+    
     ap_set_module_config(ctx->owner->conn_config, &proxy_http2_module, ctx);
 
     /* scheme says, this is for us. */
@@ -565,10 +554,11 @@ run_connect:
     
     /* If we are not already hosting an engine, try to push the request 
      * to an already existing engine or host a new engine here. */
-    if (!ctx->engine) {
-        ctx = push_request_somewhere(ctx);
+    if (r && !ctx->engine) {
+        ctx->r_status = push_request_somewhere(ctx, r);
+        r = NULL;
         if (ctx->r_status == SUSPENDED) {
-            /* request was pushed to another engine */
+            /* request was pushed to another thread, leave processing here */
             goto cleanup;
         }
     }
@@ -581,7 +571,7 @@ run_connect:
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner, APLOGNO(03352)
                       "H2: failed to make connection to backend: %s",
                       ctx->p_conn->hostname);
-        goto cleanup;
+        goto reconnect;
     }
     
     /* Step Three: Create conn_rec for the socket we have open now. */
@@ -593,7 +583,7 @@ run_connect:
                           "setup new connection: is_ssl=%d %s %s %s", 
                           ctx->p_conn->is_ssl, ctx->p_conn->ssl_hostname, 
                           locurl, ctx->p_conn->hostname);
-            goto cleanup;
+            goto reconnect;
         }
         
         if (!ctx->p_conn->data) {
@@ -628,8 +618,8 @@ run_session:
         ctx->engine = NULL;
     }
 
-cleanup:
-    if (!reconnected && next_request(ctx, 1) == APR_SUCCESS) {
+reconnect:
+    if (next_request(ctx, 1) == APR_SUCCESS) {
         /* Still more to do, tear down old conn and start over */
         if (ctx->p_conn) {
             ctx->p_conn->close = 1;
@@ -638,10 +628,16 @@ cleanup:
             ap_proxy_release_connection(ctx->proxy_func, ctx->p_conn, ctx->server);
             ctx->p_conn = NULL;
         }
-        reconnected = 1; /* we do this only once, then fail */
-        goto run_connect;
+        ++reconnects;
+        if (reconnects < 5 && !ctx->owner->aborted) {
+            goto run_connect;
+        } 
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner, APLOGNO(10023)
+                      "giving up after %d reconnects, %d requests todo",
+                      reconnects, h2_proxy_fifo_count(ctx->requests));
     }
     
+cleanup:
     if (ctx->p_conn) {
         if (status != APR_SUCCESS) {
             /* close socket when errors happened or session shut down (EOF) */
@@ -653,6 +649,11 @@ cleanup:
         ctx->p_conn = NULL;
     }
 
+    /* Any requests will still have need to fail */
+    while (APR_SUCCESS == h2_proxy_fifo_try_pull(ctx->requests, (void**)&r)) {
+        request_done(ctx, r, HTTP_SERVICE_UNAVAILABLE, 1);
+    }
+    
     ap_set_module_config(ctx->owner->conn_config, &proxy_http2_module, NULL);
     ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, ctx->owner, 
                   APLOGNO(03377) "leaving handler");