]> granicus.if.org Git - apache/commitdiff
mod_http2: backport of v1.5.6 plus mod_proxy_http2 dsp support
authorStefan Eissing <icing@apache.org>
Mon, 23 May 2016 10:55:29 +0000 (10:55 +0000)
committerStefan Eissing <icing@apache.org>
Mon, 23 May 2016 10:55:29 +0000 (10:55 +0000)
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/branches/2.4.x@1745137 13f79535-47bb-0310-9956-ffa450edef68

15 files changed:
CHANGES
modules/http2/h2_bucket_beam.c
modules/http2/h2_bucket_beam.h
modules/http2/h2_mplx.c
modules/http2/h2_mplx.h
modules/http2/h2_session.c
modules/http2/h2_session.h
modules/http2/h2_stream.c
modules/http2/h2_stream.h
modules/http2/h2_task.c
modules/http2/h2_task.h
modules/http2/h2_util.c
modules/http2/h2_util.h
modules/http2/h2_version.h
modules/http2/mod_proxy_http2.dsp [new file with mode: 0644]

diff --git a/CHANGES b/CHANGES
index 3e73c1f1cf89cfa3b8891b2bfe71e6faccc63fbb..6bcbaf13dd579877c12a469e00505fc4013305c2 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -2,6 +2,9 @@
 
 Changes with Apache 2.4.21
 
+  *) mod_http2: improved event handling for suspended streams, responses
+     and window updates. [Stefan Eissing] 
+     
   *) mod_proxy_hcheck: Provide for dynamic background health
      checks on reverse proxies associated with BalancerMember
      workers. [Jim Jagielski]
index 65f9906a10bace775d6a073cd4ce4c2590789f3e..d648b1d159db02b3ab93aec5b7b777cf7a2040a9 100644 (file)
@@ -356,12 +356,25 @@ static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy)
     }
 }
 
-static void report_consumption(h2_bucket_beam *beam)
+static void report_consumption(h2_bucket_beam *beam, int force)
 {
-    if (beam->consumed_fn && (beam->received_bytes != beam->reported_bytes)) {
-        beam->consumed_fn(beam->consumed_ctx, beam, 
-                          beam->received_bytes - beam->reported_bytes);
-        beam->reported_bytes = beam->received_bytes;
+    if (force || beam->received_bytes != beam->reported_consumed_bytes) {
+        if (beam->consumed_fn) { 
+            beam->consumed_fn(beam->consumed_ctx, beam, beam->received_bytes
+                              - beam->reported_consumed_bytes);
+        }
+        beam->reported_consumed_bytes = beam->received_bytes;
+    }
+}
+
+static void report_production(h2_bucket_beam *beam, int force)
+{
+    if (force || beam->sent_bytes != beam->reported_produced_bytes) {
+        if (beam->produced_fn) { 
+            beam->produced_fn(beam->produced_ctx, beam, beam->sent_bytes
+                              - beam->reported_produced_bytes);
+        }
+        beam->reported_produced_bytes = beam->sent_bytes;
     }
 }
 
@@ -393,7 +406,7 @@ static apr_status_t beam_cleanup(void *data)
     beam_close(beam);
     r_purge_reds(beam);
     h2_blist_cleanup(&beam->red);
-    report_consumption(beam);
+    report_consumption(beam, 0);
     h2_blist_cleanup(&beam->purge);
     h2_blist_cleanup(&beam->hold);
     
@@ -500,7 +513,7 @@ void h2_beam_abort(h2_bucket_beam *beam)
         r_purge_reds(beam);
         h2_blist_cleanup(&beam->red);
         beam->aborted = 1;
-        report_consumption(beam);
+        report_consumption(beam, 0);
         if (beam->m_cond) {
             apr_thread_cond_broadcast(beam->m_cond);
         }
@@ -515,7 +528,7 @@ apr_status_t h2_beam_close(h2_bucket_beam *beam)
     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
         r_purge_reds(beam);
         beam_close(beam);
-        report_consumption(beam);
+        report_consumption(beam, 0);
         leave_yellow(beam, &bl);
     }
     return beam->aborted? APR_ECONNABORTED : APR_SUCCESS;
@@ -530,7 +543,7 @@ apr_status_t h2_beam_shutdown(h2_bucket_beam *beam, apr_read_type_e block)
         r_purge_reds(beam);
         h2_blist_cleanup(&beam->red);
         beam_close(beam);
-        report_consumption(beam);
+        report_consumption(beam, 0);
         
         while (status == APR_SUCCESS 
                && (!H2_BPROXY_LIST_EMPTY(&beam->proxies)
@@ -693,16 +706,18 @@ apr_status_t h2_beam_send(h2_bucket_beam *beam,
             status = APR_ECONNABORTED;
         }
         else if (red_brigade) {
+            int force_report = !APR_BRIGADE_EMPTY(red_brigade); 
             while (!APR_BRIGADE_EMPTY(red_brigade)
                    && status == APR_SUCCESS) {
                 bred = APR_BRIGADE_FIRST(red_brigade);
                 status = append_bucket(beam, bred, block, beam->red_pool, &bl);
             }
+            report_production(beam, force_report);
             if (beam->m_cond) {
                 apr_thread_cond_broadcast(beam->m_cond);
             }
         }
-        report_consumption(beam);
+        report_consumption(beam, 0);
         leave_yellow(beam, &bl);
     }
     return status;
@@ -756,8 +771,8 @@ transfer:
                         
             if (APR_BUCKET_IS_METADATA(bred)) {
                 if (APR_BUCKET_IS_EOS(bred)) {
-                    beam->close_sent = 1;
                     bgreen = apr_bucket_eos_create(bb->bucket_alloc);
+                    beam->close_sent = 1;
                 }
                 else if (APR_BUCKET_IS_FLUSH(bred)) {
                     bgreen = apr_bucket_flush_create(bb->bucket_alloc);
@@ -834,20 +849,25 @@ transfer:
                  }
             }
         }
-                
-        if (transferred) {
-            status = APR_SUCCESS;
-        }
-        else if (beam->closed) {
+
+        if (beam->closed 
+            && (!beam->green || APR_BRIGADE_EMPTY(beam->green))
+            && H2_BLIST_EMPTY(&beam->red)) {
+            /* beam is closed and we have nothing more to receive */ 
             if (!beam->close_sent) {
                 apr_bucket *b = apr_bucket_eos_create(bb->bucket_alloc);
                 APR_BRIGADE_INSERT_TAIL(bb, b);
                 beam->close_sent = 1;
+                ++transferred;
                 status = APR_SUCCESS;
             }
-            else {
-                status = APR_EOF;
-            }
+        }
+        
+        if (transferred) {
+            status = APR_SUCCESS;
+        }
+        else if (beam->closed) {
+            status = APR_EOF;
         }
         else if (block == APR_BLOCK_READ && bl.mutex && beam->m_cond) {
             status = wait_cond(beam, bl.mutex);
@@ -866,7 +886,7 @@ leave:
 }
 
 void h2_beam_on_consumed(h2_bucket_beam *beam, 
-                         h2_beam_consumed_callback *cb, void *ctx)
+                         h2_beam_io_callback *cb, void *ctx)
 {
     h2_beam_lock bl;
     
@@ -877,6 +897,18 @@ void h2_beam_on_consumed(h2_bucket_beam *beam,
     }
 }
 
+void h2_beam_on_produced(h2_bucket_beam *beam, 
+                         h2_beam_io_callback *cb, void *ctx)
+{
+    h2_beam_lock bl;
+    
+    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
+        beam->produced_fn = cb;
+        beam->produced_ctx = ctx;
+        leave_yellow(beam, &bl);
+    }
+}
+
 void h2_beam_on_file_beam(h2_bucket_beam *beam, 
                           h2_beam_can_beam_callback *cb, void *ctx)
 {
index 1fc6656a09016e70b586e07c110deb55e6897236..a5c7458a1151c74f5f9d282e23e545527055ae85 100644 (file)
@@ -152,8 +152,8 @@ typedef struct h2_bucket_beam h2_bucket_beam;
 
 typedef apr_status_t h2_beam_mutex_enter(void *ctx, h2_beam_lock *pbl);
 
-typedef void h2_beam_consumed_callback(void *ctx, h2_bucket_beam *beam,
-                                       apr_off_t bytes);
+typedef void h2_beam_io_callback(void *ctx, h2_bucket_beam *beam,
+                                 apr_off_t bytes);
 
 typedef struct h2_beam_proxy h2_beam_proxy;
 typedef struct {
@@ -174,12 +174,14 @@ struct h2_bucket_beam {
     apr_pool_t *red_pool;
     
     apr_size_t max_buf_size;
-    apr_size_t files_beamed;  /* how many file handles have been set aside */
-    apr_file_t *last_beamed;  /* last file beamed */
+    apr_interval_time_t timeout;
+
     apr_off_t sent_bytes;     /* amount of bytes send */
     apr_off_t received_bytes; /* amount of bytes received */
-    apr_off_t reported_bytes; /* amount of bytes reported as consumed */
-    apr_size_t buckets_sent;
+
+    apr_size_t buckets_sent;  /* # of beam buckets sent */
+    apr_size_t files_beamed;  /* how many file handles have been set aside */
+    apr_file_t *last_beamed;  /* last file beamed */
     
     unsigned int aborted : 1;
     unsigned int closed : 1;
@@ -188,10 +190,13 @@ struct h2_bucket_beam {
     void *m_ctx;
     h2_beam_mutex_enter *m_enter;
     struct apr_thread_cond_t *m_cond;
-    apr_interval_time_t timeout;
     
-    h2_beam_consumed_callback *consumed_fn;
+    apr_off_t reported_consumed_bytes; /* amount of bytes reported as consumed */
+    h2_beam_io_callback *consumed_fn;
     void *consumed_ctx;
+    apr_off_t reported_produced_bytes; /* amount of bytes reported as produced */
+    h2_beam_io_callback *produced_fn;
+    void *produced_ctx;
     h2_beam_can_beam_callback *can_beam_fn;
     void *can_beam_ctx;
 };
@@ -319,7 +324,20 @@ apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam);
  * Call from the red side, callbacks invoked on red side.
  */
 void h2_beam_on_consumed(h2_bucket_beam *beam, 
-                         h2_beam_consumed_callback *cb, void *ctx);
+                         h2_beam_io_callback *cb, void *ctx);
+
+/**
+ * Register a callback to be invoked on the red side with the
+ * amount of bytes that have been consumed by the red side, since the
+ * last callback invocation or reset.
+ * @param beam the beam to set the callback on
+ * @param cb   the callback or NULL
+ * @param ctx  the context to use in callback invocation
+ * 
+ * Call from the red side, callbacks invoked on red side.
+ */
+void h2_beam_on_produced(h2_bucket_beam *beam, 
+                         h2_beam_io_callback *cb, void *ctx);
 
 void h2_beam_on_file_beam(h2_bucket_beam *beam, 
                           h2_beam_can_beam_callback *cb, void *ctx);
index a88b071855a587db5f2989f998c1debaebe97699..f7b30fffad51e201c74275b353095e071f15d628 100644 (file)
@@ -96,7 +96,8 @@ static apr_status_t enter_mutex(h2_mplx *m, int *pacquired)
         *pacquired = 0;
         return APR_SUCCESS;
     }
-        
+
+    AP_DEBUG_ASSERT(m->lock);
     status = apr_thread_mutex_lock(m->lock);
     *pacquired = (status == APR_SUCCESS);
     if (*pacquired) {
@@ -282,10 +283,11 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
         m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
 
         m->streams = h2_ihash_create(m->pool, offsetof(h2_stream,id));
-        m->sready = h2_ihash_create(m->pool, offsetof(h2_stream,id));
         m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id));
         m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id));
         m->q = h2_iq_create(m->pool, m->max_streams);
+        m->sready = h2_ihash_create(m->pool, offsetof(h2_stream,id));
+        m->sresume = h2_ihash_create(m->pool, offsetof(h2_stream,id));
         m->tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id));
 
         m->stream_timeout = stream_timeout;
@@ -323,7 +325,7 @@ apr_uint32_t h2_mplx_shutdown(h2_mplx *m)
 
 static void input_consumed_signal(h2_mplx *m, h2_stream *stream)
 {
-    if (stream->input) {
+    if (stream->input && stream->started) {
         h2_beam_send(stream->input, NULL, 0); /* trigger updates */
     }
 }
@@ -331,7 +333,8 @@ static void input_consumed_signal(h2_mplx *m, h2_stream *stream)
 static int output_consumed_signal(h2_mplx *m, h2_task *task)
 {
     if (task->output.beam && task->worker_started && task->assigned) {
-        h2_beam_send(task->output.beam, NULL, 0); /* trigger updates */
+        /* trigger updates */
+        h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
     }
     return 0;
 }
@@ -366,6 +369,7 @@ static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master)
     if (task->output.beam) {
         m->tx_handles_reserved += 
         h2_beam_get_files_beamed(task->output.beam);
+        h2_beam_on_produced(task->output.beam, NULL, NULL);
     }
     
     slave = task->c;
@@ -428,6 +432,7 @@ static void stream_done(h2_mplx *m, h2_stream *stream, int rst_error)
      */
     h2_iq_remove(m->q, stream->id);
     h2_ihash_remove(m->sready, stream->id);
+    h2_ihash_remove(m->sresume, stream->id);
     h2_ihash_remove(m->streams, stream->id);
     if (stream->input) {
         m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input);
@@ -498,6 +503,17 @@ static int task_abort_connection(void *ctx, void *val)
     return 1;
 }
 
+static int report_stream_iter(void *ctx, void *val) {
+    h2_mplx *m = ctx;
+    h2_stream *stream = val;
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                  "h2_mplx(%ld-%d): exists, started=%d, scheduled=%d, "
+                  "submitted=%d, suspended=%d", 
+                  m->id, stream->id, stream->started, stream->scheduled,
+                  stream->submitted, stream->suspended);
+    return 1;
+}
+
 apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
 {
     apr_status_t status;
@@ -507,6 +523,17 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
     
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         int i, wait_secs = 5;
+
+        if (!h2_ihash_empty(m->streams) && APLOGctrace1(m->c)) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                          "h2_mplx(%ld): release_join with %d streams open, "
+                          "%d streams resume, %d streams ready, %d tasks", 
+                          m->id, (int)h2_ihash_count(m->streams),
+                          (int)h2_ihash_count(m->sresume), 
+                          (int)h2_ihash_count(m->sready), 
+                          (int)h2_ihash_count(m->tasks));
+            h2_ihash_iter(m->streams, report_stream_iter, m);
+        }
         
         /* disable WINDOW_UPDATE callbacks */
         h2_mplx_set_consumed_cb(m, NULL, NULL);
@@ -581,10 +608,9 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
             purge_streams(m);
         }
         AP_DEBUG_ASSERT(h2_ihash_empty(m->spurge));
-        AP_DEBUG_ASSERT(h2_ihash_empty(m->tasks));
         
         if (!h2_ihash_empty(m->tasks)) {
-            ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03056)
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
                           "h2_mplx(%ld): release_join -> destroy, "
                           "%d tasks still present", 
                           m->id, (int)h2_ihash_count(m->tasks));
@@ -630,74 +656,6 @@ void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx)
     m->input_consumed_ctx = ctx;
 }
 
-static int update_window(void *ctx, void *val)
-{
-    input_consumed_signal(ctx, val);
-    return 1;
-}
-
-apr_status_t h2_mplx_in_update_windows(h2_mplx *m)
-{
-    apr_status_t status;
-    int acquired;
-    
-    AP_DEBUG_ASSERT(m);
-    if (m->aborted) {
-        return APR_ECONNABORTED;
-    }
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_ihash_iter(m->streams, update_window, m);
-        
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
-                      "h2_session(%ld): windows updated", m->id);
-        status = APR_SUCCESS;
-        leave_mutex(m, acquired);
-    }
-    return status;
-}
-
-static int stream_iter_first(void *ctx, void *val)
-{
-    h2_stream **pstream = ctx;
-    *pstream = val;
-    return 0;
-}
-
-h2_stream *h2_mplx_next_submit(h2_mplx *m)
-{
-    apr_status_t status;
-    h2_stream *stream = NULL;
-    int acquired;
-
-    AP_DEBUG_ASSERT(m);
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_ihash_iter(m->sready, stream_iter_first, &stream);
-        if (stream) {
-            h2_task *task = h2_ihash_get(m->tasks, stream->id);
-            h2_ihash_remove(m->sready, stream->id);
-            if (task) {
-                task->submitted = 1;
-                if (task->rst_error) {
-                    h2_stream_rst(stream, task->rst_error);
-                }
-                else {
-                    AP_DEBUG_ASSERT(task->response);
-                    h2_stream_set_response(stream, task->response, 
-                                           task->output.beam);
-                }
-            }
-            else {
-                /* We have the stream ready without a task. This happens
-                 * when we fail streams early. A response should already
-                 * be present.  */
-                AP_DEBUG_ASSERT(stream->response || stream->rst_error);
-            }
-        }
-        leave_mutex(m, acquired);
-    }
-    return stream;
-}
-
 static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response)
 {
     apr_status_t status = APR_SUCCESS;
@@ -798,6 +756,9 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
         if (m->aborted) {
             status = APR_ECONNABORTED;
         }
+        else if (!h2_ihash_empty(m->sready) || !h2_ihash_empty(m->sresume)) {
+            status = APR_SUCCESS;
+        }
         else {
             m->added_output = iowait;
             status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
@@ -854,26 +815,27 @@ apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream,
         if (m->aborted) {
             status = APR_ECONNABORTED;
         }
-        else if (stream->response) {
-            /* already have a respone, schedule for submit */
-            h2_ihash_add(m->sready, stream);
-        }
         else {
-            h2_beam_create(&stream->input, stream->pool, stream->id, 
-                           "input", 0);
             h2_ihash_add(m->streams, stream);
-            
-            if (!m->need_registration) {
-                m->need_registration = h2_iq_empty(m->q);
+            if (stream->response) {
+                /* already have a respone, schedule for submit */
+                h2_ihash_add(m->sready, stream);
             }
-            if (m->workers_busy < m->workers_max) {
-                do_registration = m->need_registration;
+            else {
+                h2_beam_create(&stream->input, stream->pool, stream->id, 
+                               "input", 0);
+                if (!m->need_registration) {
+                    m->need_registration = h2_iq_empty(m->q);
+                }
+                if (m->workers_busy < m->workers_max) {
+                    do_registration = m->need_registration;
+                }
+                h2_iq_add(m->q, stream->id, cmp, ctx);
+                
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
+                              "h2_mplx(%ld-%d): process, body=%d", 
+                              m->c->id, stream->id, stream->request->body);
             }
-            h2_iq_add(m->q, stream->id, cmp, ctx);
-
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
-                          "h2_mplx(%ld-%d): process, body=%d", 
-                          m->c->id, stream->id, stream->request->body);
         }
         leave_mutex(m, acquired);
     }
@@ -916,6 +878,7 @@ static h2_task *pop_task(h2_mplx *m)
             if (new_conn) {
                 h2_slave_run_pre_connection(slave, ap_get_conn_socket(slave));
             }
+            stream->started = 1;
             task->worker_started = 1;
             task->started_at = apr_time_now();
             if (sid > m->max_stream_started) {
@@ -1045,6 +1008,11 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
                           "h2_mplx(%s): task_done, stream still open", 
                           task->id);
+            if (h2_stream_is_suspended(stream)) {
+                /* more data will not arrive, resume the stream */
+                h2_ihash_add(m->sresume, stream);
+                have_out_data_for(m, stream->id);
+            }
         }
         else {
             /* stream done, was it placed in hold? */
@@ -1353,4 +1321,129 @@ void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn)
         }
     }
 }
-                                
+
+/*******************************************************************************
+ * mplx master events dispatching
+ ******************************************************************************/
+
+static int update_window(void *ctx, void *val)
+{
+    input_consumed_signal(ctx, val);
+    return 1;
+}
+
+apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, 
+                                            stream_ev_callback *on_resume, 
+                                            stream_ev_callback *on_response, 
+                                            void *on_ctx)
+{
+    apr_status_t status;
+    int acquired;
+    int streams[32];
+    h2_stream *stream;
+    h2_task *task;
+    size_t i, n;
+    
+    AP_DEBUG_ASSERT(m);
+    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, 
+                      "h2_mplx(%ld): dispatch events", m->id);
+                      
+        /* update input windows for streams */
+        h2_ihash_iter(m->streams, update_window, m);
+
+        if (on_response && !h2_ihash_empty(m->sready)) {
+            n = h2_ihash_ishift(m->sready, streams, H2_ALEN(streams));
+            for (i = 0; i < n; ++i) {
+                stream = h2_ihash_get(m->streams, streams[i]);
+                if (!stream) {
+                    continue;
+                }
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, 
+                              "h2_mplx(%ld-%d): on_response", 
+                              m->id, stream->id);
+                task = h2_ihash_get(m->tasks, stream->id);
+                if (task) {
+                    task->submitted = 1;
+                    if (task->rst_error) {
+                        h2_stream_rst(stream, task->rst_error);
+                    }
+                    else {
+                        AP_DEBUG_ASSERT(task->response);
+                        h2_stream_set_response(stream, task->response, task->output.beam);
+                    }
+                }
+                else {
+                    /* We have the stream ready without a task. This happens
+                     * when we fail streams early. A response should already
+                     * be present.  */
+                    AP_DEBUG_ASSERT(stream->response || stream->rst_error);
+                }
+                status = on_response(on_ctx, stream->id);
+            }
+        }
+
+        if (on_resume && !h2_ihash_empty(m->sresume)) {
+            n = h2_ihash_ishift(m->sresume, streams, H2_ALEN(streams));
+            for (i = 0; i < n; ++i) {
+                stream = h2_ihash_get(m->streams, streams[i]);
+                if (!stream) {
+                    continue;
+                }
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, 
+                              "h2_mplx(%ld-%d): on_resume", 
+                              m->id, stream->id);
+                h2_stream_set_suspended(stream, 0);
+                status = on_resume(on_ctx, stream->id);
+            }
+        }
+        
+        leave_mutex(m, acquired);
+    }
+    return status;
+}
+
+static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
+{
+    h2_mplx *m = ctx;
+    apr_status_t status;
+    h2_stream *stream;
+    int acquired;
+    
+    AP_DEBUG_ASSERT(m);
+    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+        stream = h2_ihash_get(m->streams, beam->id);
+        if (stream && h2_stream_is_suspended(stream)) {
+            h2_ihash_add(m->sresume, stream);
+            h2_beam_on_produced(beam, NULL, NULL);
+            have_out_data_for(m, beam->id);
+        }
+        leave_mutex(m, acquired);
+    }
+}
+
+apr_status_t h2_mplx_suspend_stream(h2_mplx *m, int stream_id)
+{
+    apr_status_t status;
+    h2_stream *stream;
+    h2_task *task;
+    int acquired;
+    
+    AP_DEBUG_ASSERT(m);
+    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+        stream = h2_ihash_get(m->streams, stream_id);
+        if (stream) {
+            h2_stream_set_suspended(stream, 1);
+            task = h2_ihash_get(m->tasks, stream->id);
+            if (stream->started && (!task || task->worker_done)) {
+                h2_ihash_add(m->sresume, stream);
+            }
+            else {
+                /* register callback so that we can resume on new output */
+                h2_beam_on_produced(task->output.beam, output_produced, m);
+            }
+        }
+        leave_mutex(m, acquired);
+    }
+    return status;
+}
index 17cc75f1ee089b03e7df5f5a21db8cf9a35e5fab..e92a5ea3447878a7353b5ecd3f08140e6ae6a968 100644 (file)
@@ -73,10 +73,12 @@ struct h2_mplx {
     unsigned int need_registration : 1;
 
     struct h2_ihash_t *streams;     /* all streams currently processing */
-    struct h2_ihash_t *sready;      /* all streams ready for response */
     struct h2_ihash_t *shold;       /* all streams done with task ongoing */
     struct h2_ihash_t *spurge;      /* all streams done, ready for destroy */
+
     struct h2_iqueue *q;            /* all stream ids that need to be started */
+    struct h2_ihash_t *sready;      /* all streams ready for response */
+    struct h2_ihash_t *sresume;     /* all streams that can be resumed */
     
     struct h2_ihash_t *tasks;       /* all tasks started and not destroyed */
     struct h2_ihash_t *redo_tasks;  /* all tasks that need to be redone */
@@ -214,31 +216,25 @@ apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx)
  */
 void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx);
 
-/*******************************************************************************
- * Input handling of streams.
- ******************************************************************************/
+
+typedef apr_status_t stream_ev_callback(void *ctx, int stream_id);
 
 /**
- * Invoke the consumed callback for all streams that had bytes read since the 
- * last call to this function. If no stream had input data consumed, the 
- * callback is not invoked.
- * The consumed callback may also be invoked at other times whenever
- * the need arises.
+ * Dispatch events for the master connection, such as
+ * - resume: new output data has arrived for a suspended stream
+ * - response: the response for a stream is ready
  */
-apr_status_t h2_mplx_in_update_windows(h2_mplx *m);
+apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, 
+                                            stream_ev_callback *on_resume, 
+                                            stream_ev_callback *on_response, 
+                                            void *ctx);
+
+apr_status_t h2_mplx_suspend_stream(h2_mplx *m, int stream_id);
 
 /*******************************************************************************
  * Output handling of streams.
  ******************************************************************************/
 
-/**
- * Get a stream whose response is ready for submit. Will set response and
- * any out data available in stream. 
- * @param m the mplxer to get a response from
- * @param bb the brigade to place any existing repsonse body data into
- */
-struct h2_stream *h2_mplx_next_submit(h2_mplx *m);
-
 /**
  * Opens the output for the given stream with the specified response.
  */
index cdee085fa2915dadc283223ae6466b09027703a3..f2698c48b3af9e4ea3a9b914534b810149244f8b 100644 (file)
@@ -79,6 +79,18 @@ static int is_accepting_streams(h2_session *session);
 static void dispatch_event(h2_session *session, h2_session_event_t ev, 
                              int err, const char *msg);
 
+apr_status_t h2_session_stream_done(h2_session *session, h2_stream *stream)
+{
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+                  "h2_stream(%ld-%d): EOS bucket cleanup -> done", 
+                  session->id, stream->id);
+    h2_ihash_remove(session->streams, stream->id);
+    h2_mplx_stream_done(session->mplx, stream);
+    
+    dispatch_event(session, H2_SESSION_EV_STREAM_DONE, 0, NULL);
+    return APR_SUCCESS;
+}
+
 typedef struct stream_sel_ctx {
     h2_session *session;
     h2_stream *candidate;
@@ -133,7 +145,6 @@ h2_stream *h2_session_open_stream(h2_session *session, int stream_id,
     
     stream = h2_stream_open(stream_id, stream_pool, session, 
                             initiated_on, req);
-    ++session->unanswered_streams;
     nghttp2_session_set_stream_user_data(session->ngh2, stream_id, stream);
     h2_ihash_add(session->streams, stream);
     
@@ -1064,58 +1075,6 @@ static apr_status_t h2_session_start(h2_session *session, int *rv)
     return status;
 }
 
-typedef struct {
-    h2_session *session;
-    int resume_count;
-} resume_ctx;
-
-static int resume_on_data(void *ctx, void *val)
-{
-    h2_stream *stream = val;
-    resume_ctx *rctx = (resume_ctx*)ctx;
-    h2_session *session = rctx->session;
-    AP_DEBUG_ASSERT(session);
-    AP_DEBUG_ASSERT(stream);
-    
-    if (h2_stream_is_suspended(stream)) {
-        apr_status_t status;
-        apr_off_t len = -1;
-        int eos;
-        
-        status = h2_stream_out_prepare(stream, &len, &eos);
-        if (status == APR_SUCCESS) {
-            int rv;
-            h2_stream_set_suspended(stream, 0);
-            ++rctx->resume_count;
-            
-            rv = nghttp2_session_resume_data(session->ngh2, stream->id);
-            ap_log_cerror(APLOG_MARK, nghttp2_is_fatal(rv)?
-                          APLOG_ERR : APLOG_DEBUG, 0, session->c,
-                          APLOGNO(02936) 
-                          "h2_stream(%ld-%d): resuming %s, len=%ld, eos=%d",
-                          session->id, stream->id, 
-                          rv? nghttp2_strerror(rv) : "", (long)len, eos);
-        }
-    }
-    return 1;
-}
-
-static int h2_session_resume_streams_with_data(h2_session *session)
-{
-    AP_DEBUG_ASSERT(session);
-    if (session->open_streams && !session->mplx->aborted) {
-        resume_ctx ctx;
-        ctx.session      = session;
-        ctx.resume_count = 0;
-
-        /* Resume all streams where we have data in the out queue and
-         * which had been suspended before. */
-        h2_ihash_iter(session->streams, resume_on_data, &ctx);
-        return ctx.resume_count;
-    }
-    return 0;
-}
-
 static ssize_t stream_data_cb(nghttp2_session *ng2s,
                               int32_t stream_id,
                               uint8_t *buf,
@@ -1171,7 +1130,7 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
              * it. Remember at our h2_stream that we need to do this.
              */
             nread = 0;
-            h2_stream_set_suspended(stream, 1);
+            h2_mplx_suspend_stream(session->mplx, stream->id);
             ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03071)
                           "h2_stream(%ld-%d): suspending",
                           session->id, (int)stream_id);
@@ -1214,104 +1173,6 @@ typedef struct {
     size_t offset;
 } nvctx_t;
 
-/**
- * Start submitting the response to a stream request. This is possible
- * once we have all the response headers. The response body will be
- * read by the session using the callback we supply.
- */
-static apr_status_t submit_response(h2_session *session, h2_stream *stream)
-{
-    apr_status_t status = APR_SUCCESS;
-    h2_response *response = h2_stream_get_response(stream);
-    int rv = 0;
-    AP_DEBUG_ASSERT(session);
-    AP_DEBUG_ASSERT(stream);
-    AP_DEBUG_ASSERT(response || stream->rst_error);
-    
-    if (stream->submitted) {
-        rv = NGHTTP2_PROTOCOL_ERROR;
-    }
-    else if (response && response->headers) {
-        nghttp2_data_provider provider, *pprovider = NULL;
-        h2_ngheader *ngh;
-        const h2_priority *prio;
-        
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03073)
-                      "h2_stream(%ld-%d): submit response %d, REMOTE_WINDOW_SIZE=%u",
-                      session->id, stream->id, response->http_status,
-                      (unsigned int)nghttp2_session_get_stream_remote_window_size(session->ngh2, stream->id));
-        
-        if (response->content_length != 0) {
-            memset(&provider, 0, sizeof(provider));
-            provider.source.fd = stream->id;
-            provider.read_callback = stream_data_cb;
-            pprovider = &provider;
-        }
-        
-        /* If this stream is not a pushed one itself,
-         * and HTTP/2 server push is enabled here,
-         * and the response is in the range 200-299 *),
-         * and the remote side has pushing enabled,
-         * -> find and perform any pushes on this stream
-         *    *before* we submit the stream response itself.
-         *    This helps clients avoid opening new streams on Link
-         *    headers that get pushed right afterwards.
-         * 
-         * *) the response code is relevant, as we do not want to 
-         *    make pushes on 401 or 403 codes, neiterh on 301/302
-         *    and friends. And if we see a 304, we do not push either
-         *    as the client, having this resource in its cache, might
-         *    also have the pushed ones as well.
-         */
-        if (stream->request && !stream->request->initiated_on
-            && H2_HTTP_2XX(response->http_status)
-            && h2_session_push_enabled(session)) {
-            
-            h2_stream_submit_pushes(stream);
-        }
-        
-        prio = h2_stream_get_priority(stream);
-        if (prio) {
-            h2_session_set_prio(session, stream, prio);
-            /* no showstopper if that fails for some reason */
-        }
-        
-        ngh = h2_util_ngheader_make_res(stream->pool, response->http_status, 
-                                        response->headers);
-        rv = nghttp2_submit_response(session->ngh2, response->stream_id,
-                                     ngh->nv, ngh->nvlen, pprovider);
-    }
-    else {
-        int err = H2_STREAM_RST(stream, H2_ERR_PROTOCOL_ERROR);
-        
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03074)
-                      "h2_stream(%ld-%d): RST_STREAM, err=%d",
-                      session->id, stream->id, err);
-
-        rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
-                                       stream->id, err);
-    }
-    
-    stream->submitted = 1;
-    --session->unanswered_streams;
-    if (stream->request && stream->request->initiated_on) {
-        ++session->pushes_submitted;
-    }
-    else {
-        ++session->responses_submitted;
-    }
-    
-    if (nghttp2_is_fatal(rv)) {
-        status = APR_EGENERAL;
-        dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, rv, nghttp2_strerror(rv));
-        ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c,
-                      APLOGNO(02940) "submit_response: %s", 
-                      nghttp2_strerror(rv));
-    }
-    
-    return status;
-}
-
 struct h2_stream *h2_session_push(h2_session *session, h2_stream *is,
                                   h2_push *push)
 {
@@ -1465,19 +1326,6 @@ apr_status_t h2_session_set_prio(h2_session *session, h2_stream *stream,
     return status;
 }
 
-apr_status_t h2_session_stream_done(h2_session *session, h2_stream *stream)
-{
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
-                  "h2_stream(%ld-%d): EOS bucket cleanup -> done", 
-                  session->id, stream->id);
-    h2_ihash_remove(session->streams, stream->id);
-    --session->unanswered_streams;
-    h2_mplx_stream_done(session->mplx, stream);
-    
-    dispatch_event(session, H2_SESSION_EV_STREAM_DONE, 0, NULL);
-    return APR_SUCCESS;
-}
-
 int h2_session_push_enabled(h2_session *session)
 {
     /* iff we can and they can and want */
@@ -1504,6 +1352,7 @@ static apr_status_t h2_session_send(h2_session *session)
     if (socket) {
         apr_socket_timeout_set(socket, saved_timeout);
     }
+    session->have_written = 1;
     if (rv != 0) {
         if (nghttp2_is_fatal(rv)) {
             dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, rv, nghttp2_strerror(rv));
@@ -1517,6 +1366,148 @@ static apr_status_t h2_session_send(h2_session *session)
     return APR_SUCCESS;
 }
 
+/**
+ * A stream was resumed as new output data arrived.
+ */
+static apr_status_t on_stream_resume(void *ctx, int stream_id)
+{
+    h2_session *session = ctx;
+    h2_stream *stream = get_stream(session, stream_id);
+    apr_status_t status = APR_SUCCESS;
+    
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, 
+                  "h2_stream(%ld-%d): on_resume", session->id, stream_id);
+    if (stream) {
+        int rv = nghttp2_session_resume_data(session->ngh2, stream_id);
+        session->have_written = 1;
+        ap_log_cerror(APLOG_MARK, nghttp2_is_fatal(rv)?
+                      APLOG_ERR : APLOG_DEBUG, 0, session->c,
+                      APLOGNO(02936) 
+                      "h2_stream(%ld-%d): resuming %s",
+                      session->id, stream->id, rv? nghttp2_strerror(rv) : "");
+    }
+    return status;
+}
+
+/**
+ * A response for the stream is ready.
+ */
+static apr_status_t on_stream_response(void *ctx, int stream_id)
+{
+    h2_session *session = ctx;
+    h2_stream *stream = get_stream(session, stream_id);
+    apr_status_t status = APR_SUCCESS;
+    h2_response *response;
+    int rv = 0;
+
+    AP_DEBUG_ASSERT(session);
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, 
+                  "h2_stream(%ld-%d): on_response", session->id, stream_id);
+    if (!stream) {
+        return APR_NOTFOUND;
+    }
+    
+    response = h2_stream_get_response(stream);
+    AP_DEBUG_ASSERT(response || stream->rst_error);
+    
+    if (stream->submitted) {
+        rv = NGHTTP2_PROTOCOL_ERROR;
+    }
+    else if (response && response->headers) {
+        nghttp2_data_provider provider, *pprovider = NULL;
+        h2_ngheader *ngh;
+        const h2_priority *prio;
+        
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03073)
+                      "h2_stream(%ld-%d): submit response %d, REMOTE_WINDOW_SIZE=%u",
+                      session->id, stream->id, response->http_status,
+                      (unsigned int)nghttp2_session_get_stream_remote_window_size(session->ngh2, stream->id));
+        
+        if (response->content_length != 0) {
+            memset(&provider, 0, sizeof(provider));
+            provider.source.fd = stream->id;
+            provider.read_callback = stream_data_cb;
+            pprovider = &provider;
+        }
+        
+        /* If this stream is not a pushed one itself,
+         * and HTTP/2 server push is enabled here,
+         * and the response is in the range 200-299 *),
+         * and the remote side has pushing enabled,
+         * -> find and perform any pushes on this stream
+         *    *before* we submit the stream response itself.
+         *    This helps clients avoid opening new streams on Link
+         *    headers that get pushed right afterwards.
+         * 
+         * *) the response code is relevant, as we do not want to 
+         *    make pushes on 401 or 403 codes, neiterh on 301/302
+         *    and friends. And if we see a 304, we do not push either
+         *    as the client, having this resource in its cache, might
+         *    also have the pushed ones as well.
+         */
+        if (stream->request && !stream->request->initiated_on
+            && H2_HTTP_2XX(response->http_status)
+            && h2_session_push_enabled(session)) {
+            
+            h2_stream_submit_pushes(stream);
+        }
+        
+        prio = h2_stream_get_priority(stream);
+        if (prio) {
+            h2_session_set_prio(session, stream, prio);
+            /* no showstopper if that fails for some reason */
+        }
+        
+        ngh = h2_util_ngheader_make_res(stream->pool, response->http_status, 
+                                        response->headers);
+        rv = nghttp2_submit_response(session->ngh2, response->stream_id,
+                                     ngh->nv, ngh->nvlen, pprovider);
+    }
+    else {
+        int err = H2_STREAM_RST(stream, H2_ERR_PROTOCOL_ERROR);
+        
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03074)
+                      "h2_stream(%ld-%d): RST_STREAM, err=%d",
+                      session->id, stream->id, err);
+
+        rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
+                                       stream->id, err);
+    }
+    
+    stream->submitted = 1;
+    session->have_written = 1;
+    
+    if (stream->request && stream->request->initiated_on) {
+        ++session->pushes_submitted;
+    }
+    else {
+        ++session->responses_submitted;
+    }
+    
+    if (nghttp2_is_fatal(rv)) {
+        status = APR_EGENERAL;
+        dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, rv, nghttp2_strerror(rv));
+        ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c,
+                      APLOGNO(02940) "submit_response: %s", 
+                      nghttp2_strerror(rv));
+    }
+    
+    ++session->unsent_submits;
+    
+    /* Unsent push promises are written immediately, as nghttp2
+     * 1.5.0 realizes internal stream data structures only on 
+     * send and we might need them for other submits. 
+     * Also, to conserve memory, we send at least every 10 submits
+     * so that nghttp2 does not buffer all outbound items too 
+     * long.
+     */
+    if (status == APR_SUCCESS 
+        && (session->unsent_promises || session->unsent_submits > 10)) {
+        status = h2_session_send(session);
+    }
+    return status;
+}
+
 static apr_status_t h2_session_receive(void *ctx, const char *data, 
                                        apr_size_t len, apr_size_t *readlen)
 {
@@ -1644,36 +1635,6 @@ static int has_suspended_streams(h2_session *session)
     return has_suspended;
 }
 
-static apr_status_t h2_session_submit(h2_session *session)
-{
-    apr_status_t status = APR_EAGAIN;
-    h2_stream *stream;
-    
-    if (has_unsubmitted_streams(session)) {
-        /* If we have responses ready, submit them now. */
-        while ((stream = h2_mplx_next_submit(session->mplx))) {
-            status = submit_response(session, stream);
-            ++session->unsent_submits;
-            
-            /* Unsent push promises are written immediately, as nghttp2
-             * 1.5.0 realizes internal stream data structures only on 
-             * send and we might need them for other submits. 
-             * Also, to conserve memory, we send at least every 10 submits
-             * so that nghttp2 does not buffer all outbound items too 
-             * long.
-             */
-            if (status == APR_SUCCESS 
-                && (session->unsent_promises || session->unsent_submits > 10)) {
-                status = h2_session_send(session);
-                if (status != APR_SUCCESS) {
-                    break;
-                }
-            }
-        }
-    }
-    return status;
-}
-
 static const char *StateNames[] = {
     "INIT",      /* H2_SESSION_ST_INIT */
     "DONE",      /* H2_SESSION_ST_DONE */
@@ -1855,51 +1816,52 @@ static void h2_session_ev_no_io(h2_session *session, int arg, const char *msg)
         case H2_SESSION_ST_BUSY:
         case H2_SESSION_ST_LOCAL_SHUTDOWN:
         case H2_SESSION_ST_REMOTE_SHUTDOWN:
-            /* nothing for input and output to do. If we remain
-             * in this state, we go into a tight loop and suck up
-             * CPU cycles. Ideally, we'd like to do a blocking read, but that
-             * is not possible if we have scheduled tasks and wait
-             * for them to produce something. */
+            /* Nothing to READ, nothing to WRITE on the master connection.
+             * Possible causes:
+             * - we wait for the client to send us sth
+             * - we wait for started tasks to produce output
+             * - we have finished all streams and the client has sent GO_AWAY
+             */
             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
                           "h2_session(%ld): NO_IO event, %d streams open", 
                           session->id, session->open_streams);
-            if (!session->open_streams) {
-                if (!is_accepting_streams(session)) {
-                    /* We are no longer accepting new streams and have
-                     * finished processing existing ones. Time to leave. */
-                    h2_session_shutdown(session, arg, msg, 0);
-                    transit(session, "no io", H2_SESSION_ST_DONE);
+            if (session->open_streams > 0) {
+                if (has_unsubmitted_streams(session) 
+                    || has_suspended_streams(session)) {
+                    /* waiting for at least one stream to produce data */
+                    transit(session, "no io", H2_SESSION_ST_WAIT);
                 }
                 else {
-                    apr_time_t now = apr_time_now();
-                    /* When we have no streams, no task event are possible,
-                     * switch to blocking reads */
-                    transit(session, "no io (keepalive)", H2_SESSION_ST_IDLE);
-                    session->idle_until = (session->remote.emitted_count? 
-                                           session->s->keep_alive_timeout : 
-                                           session->s->timeout) + now;
-                    session->keep_sync_until = now + apr_time_from_sec(1);
+                    /* we have streams open, and all are submitted and none
+                     * is suspended. The only thing keeping us from WRITEing
+                     * more must be the flow control.
+                     * This means we only wait for WINDOW_UPDATE from the 
+                     * client and can block on READ. */
+                    transit(session, "no io (flow wait)", H2_SESSION_ST_IDLE);
+                    session->idle_until = apr_time_now() + session->s->timeout;
+                    session->keep_sync_until = session->idle_until;
+                    /* Make sure we have flushed all previously written output
+                     * so that the client will react. */
+                    if (h2_conn_io_flush(&session->io) != APR_SUCCESS) {
+                        dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+                        return;
+                    }
                 }
             }
-            else if (!has_unsubmitted_streams(session)
-                     && !has_suspended_streams(session)) {
-                transit(session, "no io (flow wait)", H2_SESSION_ST_IDLE);
-                session->idle_until = apr_time_now() + session->s->timeout;
-                session->keep_sync_until = session->idle_until;
-                /* none of our streams is waiting for a response or
-                 * new output data from task processing, 
-                 * switch to blocking reads. We are probably waiting on
-                 * window updates. */
-                if (h2_conn_io_flush(&session->io) != APR_SUCCESS) {
-                    dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
-                    return;
-                }
+            else if (is_accepting_streams(session)) {
+                /* When we have no streams, but accept new, switch to idle */
+                apr_time_t now = apr_time_now();
+                transit(session, "no io (keepalive)", H2_SESSION_ST_IDLE);
+                session->idle_until = (session->remote.emitted_count? 
+                                       session->s->keep_alive_timeout : 
+                                       session->s->timeout) + now;
+                session->keep_sync_until = now + apr_time_from_sec(1);
             }
             else {
-                /* Unable to do blocking reads, as we wait on events from
-                 * task processing in other threads. Do a busy wait with
-                 * backoff timer. */
-                transit(session, "no io", H2_SESSION_ST_WAIT);
+                /* We are no longer accepting new streams and there are
+                 * none left. Time to leave. */
+                h2_session_shutdown(session, arg, msg, 0);
+                transit(session, "no io", H2_SESSION_ST_DONE);
             }
             break;
         default:
@@ -1989,8 +1951,6 @@ static void h2_session_ev_stream_open(h2_session *session, int arg, const char *
 static void h2_session_ev_stream_done(h2_session *session, int arg, const char *msg)
 {
     --session->open_streams;
-    if (session->open_streams <= 0) {
-    }
     switch (session->state) {
         case H2_SESSION_ST_IDLE:
             if (session->open_streams == 0) {
@@ -2068,17 +2028,21 @@ apr_status_t h2_session_process(h2_session *session, int async)
 {
     apr_status_t status = APR_SUCCESS;
     conn_rec *c = session->c;
-    int rv, have_written, have_read, mpm_state;
+    int rv, mpm_state, trace = APLOGctrace3(c);
 
-    ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
-                  "h2_session(%ld): process start, async=%d", session->id, async);
+    if (trace) {
+        ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+                      "h2_session(%ld): process start, async=%d", 
+                      session->id, async);
+    }
                   
     if (c->cs) {
         c->cs->state = CONN_STATE_WRITE_COMPLETION;
     }
     
     while (1) {
-        have_read = have_written = 0;
+        trace = APLOGctrace3(c);
+        session->have_read = session->have_written = 0;
 
         if (!ap_mpm_query(AP_MPMQ_MPM_STATE, &mpm_state)) {
             if (mpm_state == AP_MPMQ_STOPPING) {
@@ -2114,10 +2078,12 @@ apr_status_t h2_session_process(h2_session *session, int async)
                 /* make certain, we send everything before we idle */
                 if (!session->keep_sync_until && async && !session->open_streams
                     && !session->r && session->remote.emitted_count) {
-                    ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
-                                  "h2_session(%ld): async idle, nonblock read, "
-                                  "%d streams open", session->id, 
-                                  session->open_streams);
+                    if (trace) {
+                        ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+                                      "h2_session(%ld): async idle, nonblock read, "
+                                      "%d streams open", session->id, 
+                                      session->open_streams);
+                    }
                     /* We do not return to the async mpm immediately, since under
                      * load, mpms show the tendency to throw keep_alive connections
                      * away very rapidly.
@@ -2130,7 +2096,7 @@ apr_status_t h2_session_process(h2_session *session, int async)
                     status = h2_session_read(session, 0);
                     
                     if (status == APR_SUCCESS) {
-                        have_read = 1;
+                        session->have_read = 1;
                         dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
                     }
                     else if (APR_STATUS_IS_EAGAIN(status) || APR_STATUS_IS_TIMEUP(status)) {
@@ -2150,10 +2116,12 @@ apr_status_t h2_session_process(h2_session *session, int async)
                     }
                 }
                 else {
-                    ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
-                                  "h2_session(%ld): sync idle, stutter 1-sec, "
-                                  "%d streams open", session->id,
-                                  session->open_streams);
+                    if (trace) {
+                        ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+                                      "h2_session(%ld): sync idle, stutter 1-sec, "
+                                      "%d streams open", session->id,
+                                      session->open_streams);
+                    }
                     /* We wait in smaller increments, using a 1 second timeout.
                      * That gives us the chance to check for MPMQ_STOPPING often. 
                      */
@@ -2165,7 +2133,7 @@ apr_status_t h2_session_process(h2_session *session, int async)
                     h2_filter_cin_timeout_set(session->cin, apr_time_from_sec(1));
                     status = h2_session_read(session, 1);
                     if (status == APR_SUCCESS) {
-                        have_read = 1;
+                        session->have_read = 1;
                         dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
                     }
                     else if (status == APR_EAGAIN) {
@@ -2179,12 +2147,14 @@ apr_status_t h2_session_process(h2_session *session, int async)
                             session->keep_sync_until = 0;
                         }
                         if (now > session->idle_until) {
-                            ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
-                                          "h2_session(%ld): keepalive timeout",
-                                          session->id);
+                            if (trace) {
+                                ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+                                              "h2_session(%ld): keepalive timeout",
+                                              session->id);
+                            }
                             dispatch_event(session, H2_SESSION_EV_CONN_TIMEOUT, 0, "timeout");
                         }
-                        else {
+                        else if (trace) {                        
                             ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
                                           "h2_session(%ld): keepalive, %f sec left",
                                           session->id, (session->idle_until - now) / 1000000.0f);
@@ -2192,9 +2162,11 @@ apr_status_t h2_session_process(h2_session *session, int async)
                         /* continue reading handling */
                     }
                     else {
-                        ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
-                                      "h2_session(%ld): idle(1 sec timeout) "
-                                      "read failed", session->id);
+                        if (trace) {
+                            ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+                                          "h2_session(%ld): idle(1 sec timeout) "
+                                          "read failed", session->id);
+                        }
                         dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, "error");
                     }
                 }
@@ -2209,7 +2181,7 @@ apr_status_t h2_session_process(h2_session *session, int async)
                     h2_filter_cin_timeout_set(session->cin, session->s->timeout);
                     status = h2_session_read(session, 0);
                     if (status == APR_SUCCESS) {
-                        have_read = 1;
+                        session->have_read = 1;
                         dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
                     }
                     else if (status == APR_EAGAIN) {
@@ -2224,43 +2196,32 @@ apr_status_t h2_session_process(h2_session *session, int async)
                     }
                 }
                 
-                if (session->open_streams) {
-                    /* resume any streams with output data */
-                    h2_session_resume_streams_with_data(session);
-                    /* Submit any responses/push_promises that are ready */
-                    status = h2_session_submit(session);
-                    if (status == APR_SUCCESS) {
-                        have_written = 1;
-                    }
-                    else if (status != APR_EAGAIN) {
-                        dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 
-                                         H2_ERR_INTERNAL_ERROR, "submit error");
-                        break;
-                    }
-                    /* send out window updates for our inputs */
-                    status = h2_mplx_in_update_windows(session->mplx);
-                    if (status != APR_SUCCESS) {
-                        dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 
-                                       H2_ERR_INTERNAL_ERROR, 
-                                       "window update error");
-                        break;
-                    }
+                /* trigger window updates, stream resumes and submits */
+                status = h2_mplx_dispatch_master_events(session->mplx, 
+                                                        on_stream_resume,
+                                                        on_stream_response, 
+                                                        session);
+                if (status != APR_SUCCESS) {
+                    ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, c,
+                                  "h2_session(%ld): dispatch error", 
+                                  session->id);
+                    dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 
+                                   H2_ERR_INTERNAL_ERROR, 
+                                   "dispatch error");
+                    break;
                 }
                 
                 if (nghttp2_session_want_write(session->ngh2)) {
                     ap_update_child_status(session->c->sbh, SERVER_BUSY_WRITE, NULL);
                     status = h2_session_send(session);
-                    if (status == APR_SUCCESS) {
-                        have_written = 1;
-                    }
-                    else {
+                    if (status != APR_SUCCESS) {
                         dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 
-                                         H2_ERR_INTERNAL_ERROR, "writing");
+                                       H2_ERR_INTERNAL_ERROR, "writing");
                         break;
                     }
                 }
                 
-                if (have_read || have_written) {
+                if (session->have_read || session->have_written) {
                     if (session->wait_us) {
                         session->wait_us = 0;
                     }
@@ -2281,8 +2242,10 @@ apr_status_t h2_session_process(h2_session *session, int async)
                 }
                 else if ((apr_time_now() - session->start_wait) >= session->s->timeout) {
                     /* waited long enough */
-                    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, APR_TIMEUP, c,
-                                  "h2_session: wait for data");
+                    if (trace) {
+                        ap_log_cerror(APLOG_MARK, APLOG_TRACE3, APR_TIMEUP, c,
+                                      "h2_session: wait for data");
+                    }
                     dispatch_event(session, H2_SESSION_EV_CONN_TIMEOUT, 0, "timeout");
                     break;
                 }
@@ -2291,8 +2254,8 @@ apr_status_t h2_session_process(h2_session *session, int async)
                     session->wait_us = H2MIN(session->wait_us*2, MAX_WAIT_MICROS);
                 }
 
-                if (APLOGctrace1(c)) {
-                    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+                if (trace) {
+                    ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, c,
                                   "h2_session: wait for data, %ld micros", 
                                   (long)session->wait_us);
                 }
@@ -2343,9 +2306,11 @@ apr_status_t h2_session_process(h2_session *session, int async)
     }
     
 out:
-    ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
-                  "h2_session(%ld): [%s] process returns", 
-                  session->id, state_name(session->state));
+    if (trace) {
+        ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+                      "h2_session(%ld): [%s] process returns", 
+                      session->id, state_name(session->state));
+    }
     
     if ((session->state != H2_SESSION_ST_DONE)
         && (APR_STATUS_IS_EOF(status)
index 06142f5ae7fbf3046074eb98e5414daa5abb0dfb..c5c5b7aecf4b5bcb1cab204bae921a38362e2005 100644 (file)
@@ -98,12 +98,13 @@ typedef struct h2_session {
     unsigned int reprioritize  : 1; /* scheduled streams priority changed */
     unsigned int eoc_written   : 1; /* h2 eoc bucket written */
     unsigned int flush         : 1; /* flushing output necessary */
+    unsigned int have_read     : 1; /* session has read client data */
+    unsigned int have_written  : 1; /* session did write data to client */
     apr_interval_time_t  wait_us;   /* timout during BUSY_WAIT state, micro secs */
     
     struct h2_push_diary *push_diary; /* remember pushes, avoid duplicates */
     
     int open_streams;               /* number of streams open */
-    int unanswered_streams;         /* number of streams waiting for response */
     int unsent_submits;             /* number of submitted, but not yet written responses. */
     int unsent_promises;            /* number of submitted, but not yet written push promised */
                                          
index dc29d7060c0922027da338805a3eacfe6e30feca..acd5072c639b12b56baadc14fdc10fdda7254237 100644 (file)
@@ -594,6 +594,11 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream,
             APR_BRIGADE_INSERT_TAIL(stream->buffer, eos);
             status = APR_SUCCESS;
         }
+        else if (status == APR_EAGAIN) {
+            /* did not receive more, it's ok */
+            status = APR_SUCCESS;
+        }
+        *plen = requested;
         h2_util_bb_avail(stream->buffer, plen, peos);
     }
     H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "h2_stream_out_prepare_post");
index 84f9717140926f22b5bf63a40c1d26bbae049163..dd2c193358ba8b9c6ce37193f9e7a90c1d01ea8e 100644 (file)
@@ -61,6 +61,7 @@ struct h2_stream {
     unsigned int aborted   : 1; /* was aborted */
     unsigned int suspended : 1; /* DATA sending has been suspended */
     unsigned int scheduled : 1; /* stream has been scheduled */
+    unsigned int started   : 1; /* stream has started processing */
     unsigned int submitted : 1; /* response HEADER has been sent */
     
     apr_off_t input_remaining;  /* remaining bytes on input as advertised via content-length */
index 36073f772cba4ab18556619665d83e0c9c51a7db..f505e85927f05429f052efff50882c0e305e3211 100644 (file)
@@ -59,13 +59,39 @@ static int input_ser_header(void *ctx, const char *name, const char *value)
     return 1;
 }
 
-static apr_status_t input_append_eos(h2_task *task, request_rec *r)
+static void make_chunk(h2_task *task, apr_bucket_brigade *bb, 
+                       apr_bucket *first, apr_uint64_t chunk_len, 
+                       apr_bucket *tail)
+{
+    /* Surround the buckets [first, tail[ with new buckets carrying the
+     * HTTP/1.1 chunked encoding format. If tail is NULL, the chunk extends
+     * to the end of the brigade. */
+    char buffer[128];
+    apr_bucket *c;
+    int len;
+    
+    len = apr_snprintf(buffer, H2_ALEN(buffer), 
+                       "%"APR_UINT64_T_HEX_FMT"\r\n", chunk_len);
+    c = apr_bucket_heap_create(buffer, len, NULL, bb->bucket_alloc);
+    APR_BUCKET_INSERT_BEFORE(first, c);
+    c = apr_bucket_heap_create("\r\n", 2, NULL, bb->bucket_alloc);
+    if (tail) {
+        APR_BUCKET_INSERT_BEFORE(tail, c);
+    }
+    else {
+        APR_BRIGADE_INSERT_TAIL(bb, c);
+    }
+}
+
+static apr_status_t input_handle_eos(h2_task *task, request_rec *r, 
+                                     apr_bucket *b)
 {
     apr_status_t status = APR_SUCCESS;
     apr_bucket_brigade *bb = task->input.bb;
     apr_table_t *t = task->request->trailers;
 
     if (task->input.chunked) {
+        task->input.tmp = apr_brigade_split_ex(bb, b, task->input.tmp);
         if (t && !apr_is_empty_table(t)) {
             status = apr_brigade_puts(bb, NULL, NULL, "0\r\n");
             apr_table_do(input_ser_header, task, t, NULL);
@@ -74,13 +100,38 @@ static apr_status_t input_append_eos(h2_task *task, request_rec *r)
         else {
             status = apr_brigade_puts(bb, NULL, NULL, "0\r\n\r\n");
         }
+        APR_BRIGADE_CONCAT(bb, task->input.tmp);
     }
     else if (r && t && !apr_is_empty_table(t)){
         /* trailers passed in directly. */
         apr_table_overlap(r->trailers_in, t, APR_OVERLAP_TABLES_SET);
     }
     task->input.eos_written = 1;
+    return status;
+}
+
+static apr_status_t input_append_eos(h2_task *task, request_rec *r)
+{
+    apr_status_t status = APR_SUCCESS;
+    apr_bucket_brigade *bb = task->input.bb;
+    apr_table_t *t = task->request->trailers;
+
+    if (task->input.chunked) {
+        if (t && !apr_is_empty_table(t)) {
+            status = apr_brigade_puts(bb, NULL, NULL, "0\r\n");
+            apr_table_do(input_ser_header, task, t, NULL);
+            status = apr_brigade_puts(bb, NULL, NULL, "\r\n");
+        }
+        else {
+            status = apr_brigade_puts(bb, NULL, NULL, "0\r\n\r\n");
+        }
+    }
+    else if (r && t && !apr_is_empty_table(t)){
+        /* trailers passed in directly. */
+        apr_table_overlap(r->trailers_in, t, APR_OVERLAP_TABLES_SET);
+    }
     APR_BRIGADE_INSERT_TAIL(bb, apr_bucket_eos_create(bb->bucket_alloc));
+    task->input.eos_written = 1;
     return status;
 }
 
@@ -89,7 +140,7 @@ static apr_status_t input_read(h2_task *task, ap_filter_t* f,
                                apr_read_type_e block, apr_off_t readbytes)
 {
     apr_status_t status = APR_SUCCESS;
-    apr_bucket *b, *next;
+    apr_bucket *b, *next, *first_data;
     apr_off_t bblen = 0;
     
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
@@ -104,31 +155,28 @@ static apr_status_t input_read(h2_task *task, ap_filter_t* f,
         return APR_ECONNABORTED;
     }
     
-    if (task->input.bb) {
-        /* Cleanup brigades from those nasty 0 length non-meta buckets
-         * that apr_brigade_split_line() sometimes produces. */
-        for (b = APR_BRIGADE_FIRST(task->input.bb); 
-             b != APR_BRIGADE_SENTINEL(task->input.bb); b = next) {
-            next = APR_BUCKET_NEXT(b);
-            if (b->length == 0 && !APR_BUCKET_IS_METADATA(b)) {
-                apr_bucket_delete(b);
-            } 
+    if (!task->input.bb) {
+        if (!task->input.eos_written) {
+            input_append_eos(task, f->r);
         }
-        apr_brigade_length(task->input.bb, 0, &bblen);
+        return APR_EOF;
     }
     
-    if (bblen == 0) {
-        if (task->input.eos_written) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, APR_EOF, f->c,
-                          "h2_task(%s): read no data", task->id); 
-            return APR_EOF;
-        }
-        else if (task->input.eos) {
-            input_append_eos(task, f->r);
-        }
+    /* Cleanup brigades from those nasty 0 length non-meta buckets
+     * that apr_brigade_split_line() sometimes produces. */
+    for (b = APR_BRIGADE_FIRST(task->input.bb);
+         b != APR_BRIGADE_SENTINEL(task->input.bb); b = next) {
+        next = APR_BUCKET_NEXT(b);
+        if (b->length == 0 && !APR_BUCKET_IS_METADATA(b)) {
+            apr_bucket_delete(b);
+        } 
     }
     
     while (APR_BRIGADE_EMPTY(task->input.bb)) {
+        if (task->input.eos_written) {
+            return APR_EOF;
+        }
+        
         /* Get more input data for our request. */
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
                       "h2_task(%s): get more data from mplx, block=%d, "
@@ -161,34 +209,52 @@ static apr_status_t input_read(h2_task *task, ap_filter_t* f,
             return status;
         }
         
-        apr_brigade_length(task->input.bb, 0, &bblen);
-        if (bblen > 0 && task->input.chunked) {
-            /* need to add chunks since request processing expects it */
-            char buffer[128];
-            apr_bucket *b;
-            int len;
-            
-            len = apr_snprintf(buffer, H2_ALEN(buffer), "%lx\r\n", 
-                               (unsigned long)bblen);
-            b = apr_bucket_heap_create(buffer, len, NULL, 
-                                       task->input.bb->bucket_alloc);
-            APR_BRIGADE_INSERT_HEAD(task->input.bb, b);
-            status = apr_brigade_puts(task->input.bb, NULL, NULL, "\r\n");
-        }
-        
-        if (h2_util_has_eos(task->input.bb, -1)) {
-            task->input.eos = 1;
-        }
-        
-        if (task->input.eos && !task->input.eos_written) {
-            input_append_eos(task, f->r);
+        /* Inspect the buckets received, detect EOS and apply
+         * chunked encoding if necessary */
+        h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2, 
+                       "input.beam recv raw", task->input.bb);
+        first_data = NULL;
+        bblen = 0;
+        for (b = APR_BRIGADE_FIRST(task->input.bb); 
+             b != APR_BRIGADE_SENTINEL(task->input.bb); b = next) {
+            next = APR_BUCKET_NEXT(b);
+            if (APR_BUCKET_IS_METADATA(b)) {
+                if (first_data && task->input.chunked) {
+                    make_chunk(task, task->input.bb, first_data, bblen, b);
+                    first_data = NULL;
+                    bblen = 0;
+                }
+                if (APR_BUCKET_IS_EOS(b)) {
+                    task->input.eos = 1;
+                    input_handle_eos(task, f->r, b);
+                    h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2, 
+                                   "input.bb after handle eos", 
+                                   task->input.bb);
+                }
+            }
+            else if (b->length == 0) {
+                apr_bucket_delete(b);
+            } 
+            else {
+                if (!first_data) {
+                    first_data = b;
+                }
+                bblen += b->length;
+            }    
         }
+        if (first_data && task->input.chunked) {
+            make_chunk(task, task->input.bb, first_data, bblen, NULL);
+        }            
         
         if (h2_task_logio_add_bytes_in) {
             h2_task_logio_add_bytes_in(f->c, bblen);
         }
     }
     
+    if (!task->input.eos_written && task->input.eos) {
+        input_append_eos(task, f->r);
+    }
+
     h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2, 
                    "task_input.bb", task->input.bb);
            
index c9280388332404276cd42f92f04b8ff1a9d967e3..dda8a562bb412610debf4f35e0f6488866a33b00 100644 (file)
@@ -61,6 +61,7 @@ struct h2_task {
     struct {
         struct h2_bucket_beam *beam;
         apr_bucket_brigade *bb;
+        apr_bucket_brigade *tmp;
         apr_read_type_e block;
         unsigned int chunked : 1;
         unsigned int eos : 1;
index 4cfa1649e4018ac442c96df6b48285d222dcd435..f8575fa7e10a4b31c5ac7fc295a6ad8adc7a7f2f 100644 (file)
@@ -325,11 +325,84 @@ void h2_ihash_remove(h2_ihash_t *ih, int id)
     apr_hash_set(ih->hash, &id, sizeof(id), NULL);
 }
 
+void h2_ihash_remove_val(h2_ihash_t *ih, void *val)
+{
+    int id = *((int*)((char *)val + ih->ioff));
+    apr_hash_set(ih->hash, &id, sizeof(id), NULL);
+}
+
+
 void h2_ihash_clear(h2_ihash_t *ih)
 {
     apr_hash_clear(ih->hash);
 }
 
+typedef struct {
+    h2_ihash_t *ih;
+    void **buffer;
+    size_t max;
+    size_t len;
+} collect_ctx;
+
+static int collect_iter(void *x, void *val)
+{
+    collect_ctx *ctx = x;
+    if (ctx->len < ctx->max) {
+        ctx->buffer[ctx->len++] = val;
+        return 1;
+    }
+    return 0;
+}
+
+size_t h2_ihash_shift(h2_ihash_t *ih, void **buffer, size_t max)
+{
+    collect_ctx ctx;
+    size_t i;
+    
+    ctx.ih = ih;
+    ctx.buffer = buffer;
+    ctx.max = max;
+    ctx.len = 0;
+    h2_ihash_iter(ih, collect_iter, &ctx);
+    for (i = 0; i < ctx.len; ++i) {
+        h2_ihash_remove_val(ih, buffer[i]);
+    }
+    return ctx.len;
+}
+
+typedef struct {
+    h2_ihash_t *ih;
+    int *buffer;
+    size_t max;
+    size_t len;
+} icollect_ctx;
+
+static int icollect_iter(void *x, void *val)
+{
+    icollect_ctx *ctx = x;
+    if (ctx->len < ctx->max) {
+        ctx->buffer[ctx->len++] = *((int*)((char *)val + ctx->ih->ioff));
+        return 1;
+    }
+    return 0;
+}
+
+size_t h2_ihash_ishift(h2_ihash_t *ih, int *buffer, size_t max)
+{
+    icollect_ctx ctx;
+    size_t i;
+    
+    ctx.ih = ih;
+    ctx.buffer = buffer;
+    ctx.max = max;
+    ctx.len = 0;
+    h2_ihash_iter(ih, icollect_iter, &ctx);
+    for (i = 0; i < ctx.len; ++i) {
+        h2_ihash_remove(ih, buffer[i]);
+    }
+    return ctx.len;
+}
+
 /*******************************************************************************
  * ilist - sorted list for structs with int identifier
  ******************************************************************************/
@@ -682,11 +755,6 @@ static apr_status_t last_not_included(apr_bucket_brigade *bb,
                 /* included */
             }
             else {
-                if (maxlen == 0) {
-                    *pend = b;
-                    return status;
-                }
-                
                 if (b->length == ((apr_size_t)-1)) {
                     const char *ign;
                     apr_size_t ilen;
@@ -696,6 +764,11 @@ static apr_status_t last_not_included(apr_bucket_brigade *bb,
                     }
                 }
                 
+                if (maxlen == 0 && b->length > 0) {
+                    *pend = b;
+                    return status;
+                }
+                
                 if (same_alloc && APR_BUCKET_IS_FILE(b)) {
                     /* we like it move it, always */
                 }
@@ -832,20 +905,6 @@ int h2_util_has_eos(apr_bucket_brigade *bb, apr_off_t len)
     return 0;
 }
 
-int h2_util_bb_has_data(apr_bucket_brigade *bb)
-{
-    apr_bucket *b;
-    for (b = APR_BRIGADE_FIRST(bb);
-         b != APR_BRIGADE_SENTINEL(bb);
-         b = APR_BUCKET_NEXT(b))
-    {
-        if (!AP_BUCKET_IS_EOR(b)) {
-            return 1;
-        }
-    }
-    return 0;
-}
-
 apr_status_t h2_util_bb_avail(apr_bucket_brigade *bb, 
                               apr_off_t *plen, int *peos)
 {
index 56614766c387f839a8b062a386c0e3b70fdf9d0d..99724d7a5d75ce25213456fc12ad64b699b0b0e4 100644 (file)
@@ -64,8 +64,12 @@ int h2_ihash_iter(h2_ihash_t *ih, h2_ihash_iter_t *fn, void *ctx);
 
 void h2_ihash_add(h2_ihash_t *ih, void *val);
 void h2_ihash_remove(h2_ihash_t *ih, int id);
+void h2_ihash_remove_val(h2_ihash_t *ih, void *val);
 void h2_ihash_clear(h2_ihash_t *ih);
 
+size_t h2_ihash_shift(h2_ihash_t *ih, void **buffer, size_t max);
+size_t h2_ihash_ishift(h2_ihash_t *ih, int *buffer, size_t max);
+
 /*******************************************************************************
  * ilist - sorted list for structs with int identifier as first member
  ******************************************************************************/
@@ -321,7 +325,6 @@ apr_status_t h2_brigade_copy_length(apr_bucket_brigade *dest,
  * @return != 0 iff brigade holds FLUSH or EOS bucket (or both)
  */
 int h2_util_has_eos(apr_bucket_brigade *bb, apr_off_t len);
-int h2_util_bb_has_data(apr_bucket_brigade *bb);
 
 /**
  * Check how many bytes of the desired amount are available and if the
index 971b0766d42b968d97e933ad229c472eaa885057..020827ccc330b1dfed3c93acf533fe7034d64575 100644 (file)
@@ -26,7 +26,7 @@
  * @macro
  * Version number of the http2 module as c string
  */
-#define MOD_HTTP2_VERSION "1.5.5"
+#define MOD_HTTP2_VERSION "1.5.6"
 
 /**
  * @macro
@@ -34,7 +34,7 @@
  * release. This is a 24 bit number with 8 bits for major number, 8 bits
  * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
  */
-#define MOD_HTTP2_VERSION_NUM 0x010505
+#define MOD_HTTP2_VERSION_NUM 0x010506
 
 
 #endif /* mod_h2_h2_version_h */
diff --git a/modules/http2/mod_proxy_http2.dsp b/modules/http2/mod_proxy_http2.dsp
new file mode 100644 (file)
index 0000000..31b91b5
--- /dev/null
@@ -0,0 +1,119 @@
+# Microsoft Developer Studio Project File - Name="mod_proxy_http2" - Package Owner=<4>
+# Microsoft Developer Studio Generated Build File, Format Version 6.00
+# ** DO NOT EDIT **
+
+# TARGTYPE "Win32 (x86) Dynamic-Link Library" 0x0102
+
+CFG=mod_proxy_http2 - Win32 Release
+!MESSAGE This is not a valid makefile. To build this project using NMAKE,
+!MESSAGE use the Export Makefile command and run
+!MESSAGE 
+!MESSAGE NMAKE /f "mod_proxy_http2.mak".
+!MESSAGE 
+!MESSAGE You can specify a configuration when running NMAKE
+!MESSAGE by defining the macro CFG on the command line. For example:
+!MESSAGE 
+!MESSAGE NMAKE /f "mod_proxy_http2.mak" CFG="mod_proxy_http2 - Win32 Release"
+!MESSAGE 
+!MESSAGE Possible choices for configuration are:
+!MESSAGE 
+!MESSAGE "mod_proxy_http2 - Win32 Release" (based on "Win32 (x86) Dynamic-Link Library")
+!MESSAGE "mod_proxy_http2 - Win32 Debug" (based on "Win32 (x86) Dynamic-Link Library")
+!MESSAGE 
+
+# Begin Project
+# PROP AllowPerConfigDependencies 0
+# PROP Scc_ProjName ""
+# PROP Scc_LocalPath ""
+CPP=cl.exe
+MTL=midl.exe
+RSC=rc.exe
+
+!IF  "$(CFG)" == "mod_proxy_http2 - Win32 Release"
+
+# PROP BASE Use_MFC 0
+# PROP BASE Use_Debug_Libraries 0
+# PROP BASE Output_Dir "Release"
+# PROP BASE Intermediate_Dir "Release"
+# PROP BASE Target_Dir ""
+# PROP Use_MFC 0
+# PROP Use_Debug_Libraries 0
+# PROP Output_Dir "Release"
+# PROP Intermediate_Dir "Release"
+# PROP Ignore_Export_Lib 0
+# PROP Target_Dir ""
+# ADD BASE CPP /nologo /MD /W3 /O2 /D "WIN32" /D "NDEBUG" /D "_WINDOWS" /D "ssize_t=long" /FD /c
+# ADD CPP /nologo /MD /W3 /O2 /Oy- /Zi /I "../ssl" /I "../../include" /I "../../srclib/apr/include" /I "../../srclib/apr-util/include" /I "../../srclib/nghttp2/lib/includes" /D "NDEBUG" /D "WIN32" /D "_WINDOWS" /D "ssize_t=long" /Fd"Release\mod_proxy_http2_src" /FD /c
+# ADD BASE MTL /nologo /D "NDEBUG" /win32
+# ADD MTL /nologo /D "NDEBUG" /mktyplib203 /win32
+# ADD BASE RSC /l 0x409 /d "NDEBUG"
+# ADD RSC /l 0x409 /fo"Release/mod_proxy_http2.res" /i "../../include" /i "../../srclib/apr/include" /d "NDEBUG" /d BIN_NAME="mod_proxy_http2.so" /d LONG_NAME="http2_module for Apache"
+BSC32=bscmake.exe
+# ADD BASE BSC32 /nologo
+# ADD BSC32 /nologo
+LINK32=link.exe
+# ADD BASE LINK32 kernel32.lib nghttp2.lib /nologo /subsystem:windows /dll /libpath:"..\..\srclib\nghttp2\lib\MSVC_obj" /out:".\Release\mod_proxy_http2.so" /base:@..\..\os\win32\BaseAddr.ref,mod_proxy_http2.so
+# ADD LINK32 kernel32.lib nghttp2.lib /nologo /subsystem:windows /dll /libpath:"..\..\srclib\nghttp2\lib\MSVC_obj" /incremental:no /debug /out:".\Release\mod_proxy_http2.so" /base:@..\..\os\win32\BaseAddr.ref,mod_proxy_http2.so /opt:ref
+# Begin Special Build Tool
+TargetPath=.\Release\mod_proxy_http2.so
+SOURCE="$(InputPath)"
+PostBuild_Desc=Embed .manifest
+PostBuild_Cmds=if exist $(TargetPath).manifest mt.exe -manifest $(TargetPath).manifest -outputresource:$(TargetPath);2
+# End Special Build Tool
+
+!ELSEIF  "$(CFG)" == "mod_proxy_http2 - Win32 Debug"
+
+# PROP BASE Use_MFC 0
+# PROP BASE Use_Debug_Libraries 1
+# PROP BASE Output_Dir "Debug"
+# PROP BASE Intermediate_Dir "Debug"
+# PROP BASE Target_Dir ""
+# PROP Use_MFC 0
+# PROP Use_Debug_Libraries 1
+# PROP Output_Dir "Debug"
+# PROP Intermediate_Dir "Debug"
+# PROP Ignore_Export_Lib 0
+# PROP Target_Dir ""
+# ADD BASE CPP /nologo /MDd /W3 /EHsc /Zi /Od /D "WIN32" /D "_DEBUG" /D "_WINDOWS" /D "ssize_t=long" /FD /c
+# ADD CPP /nologo /MDd /W3 /EHsc /Zi /Od /I "../ssl" /I "../../include" /I "../../srclib/apr/include" /I "../../srclib/apr-util/include" /I "../../srclib/nghttp2/lib/includes" /D "_DEBUG" /D "WIN32" /D "_WINDOWS" /D "ssize_t=long" /Fd"Debug\mod_proxy_http2_src" /FD /c
+# ADD BASE MTL /nologo /D "_DEBUG" /win32
+# ADD MTL /nologo /D "_DEBUG" /mktyplib203 /win32
+# ADD BASE RSC /l 0x409 /d "_DEBUG"
+# ADD RSC /l 0x409 /fo"Debug/mod_proxy_http2.res" /i "../../include" /i "../../srclib/apr/include" /d "_DEBUG" /d BIN_NAME="mod_proxy_http2.so" /d LONG_NAME="http2_module for Apache"
+BSC32=bscmake.exe
+# ADD BASE BSC32 /nologo
+# ADD BSC32 /nologo
+LINK32=link.exe
+# ADD BASE LINK32 kernel32.lib nghttp2d.lib /nologo /subsystem:windows /dll /libpath:"..\..\srclib\nghttp2\lib\MSVC_obj" /incremental:no /debug /out:".\Debug\mod_proxy_http2.so" /base:@..\..\os\win32\BaseAddr.ref,mod_proxy_http2.so
+# ADD LINK32 kernel32.lib nghttp2d.lib /nologo /subsystem:windows /dll /libpath:"..\..\srclib\nghttp2\lib\MSVC_obj" /incremental:no /debug /out:".\Debug\mod_proxy_http2.so" /base:@..\..\os\win32\BaseAddr.ref,mod_proxy_http2.so
+# Begin Special Build Tool
+TargetPath=.\Debug\mod_proxy_http2.so
+SOURCE="$(InputPath)"
+PostBuild_Desc=Embed .manifest
+PostBuild_Cmds=if exist $(TargetPath).manifest mt.exe -manifest $(TargetPath).manifest -outputresource:$(TargetPath);2
+# End Special Build Tool
+
+!ENDIF 
+
+# Begin Target
+
+# Name "mod_proxy_http2 - Win32 Release"
+# Name "mod_proxy_http2 - Win32 Debug"
+# Begin Source File
+
+SOURCE=./h2_proxy_session.c
+# End Source File
+# Begin Source File
+
+SOURCE=./h2_util.c
+# End Source File
+# Begin Source File
+
+SOURCE=./mod_proxy_http2.c
+# End Source File
+# Begin Source File
+
+SOURCE=..\..\build\win32\httpd.rc
+# End Source File
+# End Target
+# End Project