]> granicus.if.org Git - apache/commitdiff
On the trunk:
authorStefan Eissing <icing@apache.org>
Mon, 27 Feb 2017 14:30:50 +0000 (14:30 +0000)
committerStefan Eissing <icing@apache.org>
Mon, 27 Feb 2017 14:30:50 +0000 (14:30 +0000)
mod_http2: separate mutex instances for each bucket beam, resulting in
     less lock contention. input beams only created when necessary.

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1784571 13f79535-47bb-0310-9956-ffa450edef68

CHANGES
modules/http2/h2_bucket_beam.c
modules/http2/h2_bucket_beam.h
modules/http2/h2_mplx.c
modules/http2/h2_session.c
modules/http2/h2_stream.c
modules/http2/h2_stream.h
modules/http2/h2_task.c
modules/http2/h2_task.h
modules/http2/h2_version.h

diff --git a/CHANGES b/CHANGES
index 2affdfb7e8f92e50c39accd151628ad986307482..cf143aa541e0307edfcff1df38d2e6c4a7ca7ee7 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -1,6 +1,10 @@
                                                          -*- coding: utf-8 -*-
 Changes with Apache 2.5.0
 
+  *) mod_http2: separate mutex instances for each bucket beam, resulting in 
+     less lock contention. input beams only created when necessary.
+     [Stefan Eissing]
+     
   *) mod_syslog: Support use of optional "tag" in syslog entries.
      PR 60525. [Ben Rubson <ben.rubson gmail.com>, Jim Jagielski]
 
index 3d644fd68e67cc78898821f9f6ba3dda9919ebc3..46919b9285ffd4ca60799d7aa8dd4b18013e3c73 100644 (file)
@@ -195,6 +195,19 @@ static apr_bucket *h2_beam_bucket(h2_bucket_beam *beam,
  * bucket beam that can transport buckets across threads
  ******************************************************************************/
 
+static void mutex_leave(void *ctx, apr_thread_mutex_t *lock)
+{
+    apr_thread_mutex_unlock(lock);
+}
+
+static apr_status_t mutex_enter(void *ctx, h2_beam_lock *pbl)
+{
+    h2_bucket_beam *beam = ctx;
+    pbl->mutex = beam->lock;
+    pbl->leave = mutex_leave;
+    return apr_thread_mutex_lock(pbl->mutex);
+}
+
 static apr_status_t enter_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl)
 {
     h2_beam_mutex_enter *enter = beam->m_enter;
@@ -227,26 +240,37 @@ static apr_off_t bucket_mem_used(apr_bucket *b)
     }
 }
 
-static int report_consumption(h2_bucket_beam *beam)
+static int report_consumption(h2_bucket_beam *beam, h2_beam_lock *pbl)
 {
     int rv = 0;
-    if (beam->cons_io_cb) { 
-        beam->cons_io_cb(beam->cons_ctx, beam, beam->received_bytes
-                         - beam->cons_bytes_reported);
+    apr_off_t len = beam->received_bytes - beam->cons_bytes_reported;
+    h2_beam_io_callback *cb = beam->cons_io_cb;
+     
+    if (cb) {
+        void *ctx = beam->cons_ctx;
+        
+        if (pbl) leave_yellow(beam, pbl);
+        cb(ctx, beam, len);
+        if (pbl) enter_yellow(beam, pbl);
         rv = 1;
     }
-    beam->cons_bytes_reported = beam->received_bytes;
+    beam->cons_bytes_reported += len;
     return rv;
 }
 
-static void report_prod_io(h2_bucket_beam *beam, int force)
+static void report_prod_io(h2_bucket_beam *beam, int force, h2_beam_lock *pbl)
 {
-    if (force || beam->prod_bytes_reported != beam->sent_bytes) {
-        if (beam->prod_io_cb) { 
-            beam->prod_io_cb(beam->prod_ctx, beam, beam->sent_bytes
-                             - beam->prod_bytes_reported);
+    apr_off_t len = beam->sent_bytes - beam->prod_bytes_reported;
+    if (force || len > 0) {
+        h2_beam_io_callback *cb = beam->prod_io_cb; 
+        if (cb) {
+            void *ctx = beam->prod_ctx;
+            
+            leave_yellow(beam, pbl);
+            cb(ctx, beam, len);
+            enter_yellow(beam, pbl);
         }
-        beam->prod_bytes_reported = beam->sent_bytes;
+        beam->prod_bytes_reported += len;
     }
 }
 
@@ -293,10 +317,10 @@ static apr_size_t calc_space_left(h2_bucket_beam *beam)
 static apr_status_t wait_cond(h2_bucket_beam *beam, apr_thread_mutex_t *lock)
 {
     if (beam->timeout > 0) {
-        return apr_thread_cond_timedwait(beam->m_cond, lock, beam->timeout);
+        return apr_thread_cond_timedwait(beam->cond, lock, beam->timeout);
     }
     else {
-        return apr_thread_cond_wait(beam->m_cond, lock);
+        return apr_thread_cond_wait(beam->cond, lock);
     }
 }
 
@@ -307,7 +331,7 @@ static apr_status_t r_wait_space(h2_bucket_beam *beam, apr_read_type_e block,
     while (!beam->aborted && *premain <= 0 
            && (block == APR_BLOCK_READ) && pbl->mutex) {
         apr_status_t status;
-        report_prod_io(beam, 1);
+        report_prod_io(beam, 1, pbl);
         status = wait_cond(beam, pbl->mutex);
         if (APR_STATUS_IS_TIMEUP(status)) {
             return status;
@@ -378,8 +402,8 @@ static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy)
         if (!bl.mutex) {
             r_purge_sent(beam);
         }
-        else if (beam->m_cond) {
-            apr_thread_cond_broadcast(beam->m_cond);
+        else if (beam->cond) {
+            apr_thread_cond_broadcast(beam->cond);
         }
         leave_yellow(beam, &bl);
     }
@@ -399,8 +423,8 @@ static apr_status_t beam_close(h2_bucket_beam *beam)
 {
     if (!beam->closed) {
         beam->closed = 1;
-        if (beam->m_cond) {
-            apr_thread_cond_broadcast(beam->m_cond);
+        if (beam->cond) {
+            apr_thread_cond_broadcast(beam->cond);
         }
     }
     return APR_SUCCESS;
@@ -445,7 +469,7 @@ static apr_status_t beam_send_cleanup(void *data)
     /* sender is going away, clear up all references to its memory */
     r_purge_sent(beam);
     h2_blist_cleanup(&beam->send_list);
-    report_consumption(beam);
+    report_consumption(beam, NULL);
     while (!H2_BPROXY_LIST_EMPTY(&beam->proxies)) {
         h2_beam_proxy *proxy = H2_BPROXY_LIST_FIRST(&beam->proxies);
         H2_BPROXY_REMOVE(proxy);
@@ -555,10 +579,16 @@ apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *pool,
     H2_BPROXY_LIST_INIT(&beam->proxies);
     beam->tx_mem_limits = 1;
     beam->max_buf_size = max_buf_size;
-    apr_pool_pre_cleanup_register(pool, beam, beam_cleanup);
 
-    *pbeam = beam;
-    
+    status = apr_thread_mutex_create(&beam->lock, APR_THREAD_MUTEX_DEFAULT, 
+                                     pool);
+    if (status == APR_SUCCESS) {
+        status = apr_thread_cond_create(&beam->cond, pool);
+        if (status == APR_SUCCESS) {
+            apr_pool_pre_cleanup_register(pool, beam, beam_cleanup);
+            *pbeam = beam;
+        }
+    }
     return status;
 }
 
@@ -586,7 +616,6 @@ apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam)
 
 void h2_beam_mutex_set(h2_bucket_beam *beam, 
                        h2_beam_mutex_enter m_enter,
-                       apr_thread_cond_t *cond,
                        void *m_ctx)
 {
     h2_beam_lock bl;
@@ -594,11 +623,20 @@ void h2_beam_mutex_set(h2_bucket_beam *beam,
     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
         beam->m_enter = m_enter;
         beam->m_ctx   = m_ctx;
-        beam->m_cond  = cond;
         leave_yellow(beam, &bl);
     }
 }
 
+void h2_beam_mutex_enable(h2_bucket_beam *beam)
+{
+    h2_beam_mutex_set(beam, mutex_enter, beam);
+}
+
+void h2_beam_mutex_disable(h2_bucket_beam *beam)
+{
+    h2_beam_mutex_set(beam, NULL, NULL);
+}
+
 void h2_beam_timeout_set(h2_bucket_beam *beam, apr_interval_time_t timeout)
 {
     h2_beam_lock bl;
@@ -630,10 +668,10 @@ void h2_beam_abort(h2_bucket_beam *beam)
             beam->aborted = 1;
             r_purge_sent(beam);
             h2_blist_cleanup(&beam->send_list);
-            report_consumption(beam);
+            report_consumption(beam, &bl);
         }
-        if (beam->m_cond) {
-            apr_thread_cond_broadcast(beam->m_cond);
+        if (beam->cond) {
+            apr_thread_cond_broadcast(beam->cond);
         }
         leave_yellow(beam, &bl);
     }
@@ -646,7 +684,7 @@ apr_status_t h2_beam_close(h2_bucket_beam *beam)
     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
         r_purge_sent(beam);
         beam_close(beam);
-        report_consumption(beam);
+        report_consumption(beam, &bl);
         leave_yellow(beam, &bl);
     }
     return beam->aborted? APR_ECONNABORTED : APR_SUCCESS;
@@ -680,8 +718,8 @@ apr_status_t h2_beam_wait_empty(h2_bucket_beam *beam, apr_read_type_e block)
                 status = APR_EAGAIN;
                 break;
             }
-            if (beam->m_cond) {
-                apr_thread_cond_broadcast(beam->m_cond);
+            if (beam->cond) {
+                apr_thread_cond_broadcast(beam->cond);
             }
             status = wait_cond(beam, bl.mutex);
         }
@@ -868,12 +906,12 @@ apr_status_t h2_beam_send(h2_bucket_beam *beam,
                 b = APR_BRIGADE_FIRST(sender_bb);
                 status = append_bucket(beam, b, block, &bl);
             }
-            report_prod_io(beam, force_report);
-            if (beam->m_cond) {
-                apr_thread_cond_broadcast(beam->m_cond);
+            report_prod_io(beam, force_report, &bl);
+            if (beam->cond) {
+                apr_thread_cond_broadcast(beam->cond);
             }
         }
-        report_consumption(beam);
+        report_consumption(beam, &bl);
         leave_yellow(beam, &bl);
     }
     return status;
@@ -1040,15 +1078,15 @@ transfer:
         }
         
         if (transferred) {
-            if (beam->m_cond) {
-                apr_thread_cond_broadcast(beam->m_cond);
+            if (beam->cond) {
+                apr_thread_cond_broadcast(beam->cond);
             }
             status = APR_SUCCESS;
         }
         else if (beam->closed) {
             status = APR_EOF;
         }
-        else if (block == APR_BLOCK_READ && bl.mutex && beam->m_cond) {
+        else if (block == APR_BLOCK_READ && bl.mutex && beam->cond) {
             status = wait_cond(beam, bl.mutex);
             if (status != APR_SUCCESS) {
                 goto leave;
@@ -1056,8 +1094,8 @@ transfer:
             goto transfer;
         }
         else {
-            if (beam->m_cond) {
-                apr_thread_cond_broadcast(beam->m_cond);
+            if (beam->cond) {
+                apr_thread_cond_broadcast(beam->cond);
             }
             status = APR_EAGAIN;
         }
@@ -1198,7 +1236,7 @@ int h2_beam_report_consumption(h2_bucket_beam *beam)
     h2_beam_lock bl;
     int rv = 0;
     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
-        rv = report_consumption(beam);
+        rv = report_consumption(beam, &bl);
         leave_yellow(beam, &bl);
     }
     return rv;
index 2b54eee8b0e1c9acc16a6d3b069ec6c99d4a3180..0984d7b3e1147ea0d815f710a6de371b80991638 100644 (file)
@@ -190,9 +190,10 @@ struct h2_bucket_beam {
     unsigned int close_sent : 1;
     unsigned int tx_mem_limits : 1; /* only memory size counts on transfers */
 
+    struct apr_thread_mutex_t *lock;
+    struct apr_thread_cond_t *cond;
     void *m_ctx;
     h2_beam_mutex_enter *m_enter;
-    struct apr_thread_cond_t *m_cond;
     
     apr_off_t cons_bytes_reported;    /* amount of bytes reported as consumed */
     h2_beam_ev_callback *cons_ev_cb;
@@ -315,9 +316,11 @@ apr_status_t h2_beam_wait_empty(h2_bucket_beam *beam, apr_read_type_e block);
 
 void h2_beam_mutex_set(h2_bucket_beam *beam, 
                        h2_beam_mutex_enter m_enter,
-                       struct apr_thread_cond_t *cond,
                        void *m_ctx);
 
+void h2_beam_mutex_enable(h2_bucket_beam *beam);
+void h2_beam_mutex_disable(h2_bucket_beam *beam);
+
 /** 
  * Set/get the timeout for blocking read/write operations. Only works
  * if a mutex has been set for the beam.
index 465eb9bb4f0db0400a82616677ce86eed1aacbe9..1b6680d696264c7679f7319c43c5269a81cd8e30 100644 (file)
@@ -100,33 +100,19 @@ static void leave_mutex(h2_mplx *m, int acquired)
     }
 }
 
-static void beam_leave(void *ctx, apr_thread_mutex_t *lock)
-{
-    leave_mutex(ctx, 1);
-}
-
-static apr_status_t beam_enter(void *ctx, h2_beam_lock *pbl)
-{
-    h2_mplx *m = ctx;
-    int acquired;
-    apr_status_t status;
-    
-    status = enter_mutex(m, &acquired);
-    if (status == APR_SUCCESS) {
-        pbl->mutex = m->lock;
-        pbl->leave = acquired? beam_leave : NULL;
-        pbl->leave_ctx = m;
-    }
-    return status;
-}
-
 static void stream_output_consumed(void *ctx, 
                                    h2_bucket_beam *beam, apr_off_t length)
 {
     h2_stream *stream = ctx;
+    h2_mplx *m = stream->session->mplx;
     h2_task *task = stream->task;
+    int acquired;
+    
     if (length > 0 && task && task->assigned) {
-        h2_req_engine_out_consumed(task->assigned, task->c, length); 
+        if (enter_mutex(m, &acquired) == APR_SUCCESS) {
+            h2_req_engine_out_consumed(task->assigned, task->c, length); 
+            leave_mutex(m, acquired);
+        }
     }
 }
 
@@ -139,9 +125,16 @@ static void stream_input_ev(void *ctx, h2_bucket_beam *beam)
 static void stream_input_consumed(void *ctx, 
                                   h2_bucket_beam *beam, apr_off_t length)
 {
-    h2_mplx *m = ctx;
-    if (m->input_consumed && length) {
-        m->input_consumed(m->input_consumed_ctx, beam->id, length);
+    if (length > 0) { 
+        h2_mplx *m = ctx;
+        int acquired;
+    
+        if (enter_mutex(m, &acquired) == APR_SUCCESS) {
+            if (m->input_consumed) {
+                m->input_consumed(m->input_consumed_ctx, beam->id, length);
+            }
+            leave_mutex(m, acquired);
+        }
     }
 }
 
@@ -190,17 +183,21 @@ static void stream_joined(h2_mplx *m, h2_stream *stream)
     
     h2_ihash_remove(m->shold, stream->id);
     h2_ihash_add(m->spurge, stream);
-    m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input);
+    if (stream->input) {
+        m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input);
+    }
     m->tx_handles_reserved += h2_beam_get_files_beamed(stream->output);
 }
 
 static void stream_cleanup(h2_mplx *m, h2_stream *stream)
 {
     ap_assert(stream->state == H2_SS_CLEANUP);
-    
+
+    if (stream->input) {
+        h2_beam_on_consumed(stream->input, NULL, NULL, NULL);
+        h2_beam_abort(stream->input);
+    }
     h2_beam_on_produced(stream->output, NULL, NULL);
-    h2_beam_on_consumed(stream->input, NULL, NULL, NULL);
-    h2_beam_abort(stream->input);
     h2_beam_leave(stream->output);
     
     h2_stream_cleanup(stream);
@@ -376,18 +373,21 @@ static int stream_destroy_iter(void *ctx, void *val)
     h2_ihash_remove(m->spurge, stream->id);
     ap_assert(stream->state == H2_SS_CLEANUP);
     
-    if (stream->input == NULL || stream->output == NULL) {
+    if (stream->output == NULL) {
         ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, m->c, 
                       H2_STRM_MSG(stream, "already with beams==NULL"));
         return 0;
     }
-    /* Process outstanding events before destruction */
-    input_consumed_signal(m, stream);
     
-    h2_beam_log(stream->input, m->c, APLOG_TRACE2, "stream_destroy");
+    if (stream->input) {
+        /* Process outstanding events before destruction */
+        input_consumed_signal(m, stream);    
+        h2_beam_log(stream->input, m->c, APLOG_TRACE2, "stream_destroy");
+        h2_beam_destroy(stream->input);
+        stream->input = NULL;
+    }
+    
     h2_beam_log(stream->output, m->c, APLOG_TRACE2, "stream_destroy");
-    h2_beam_destroy(stream->input);
-    stream->input = NULL;
     h2_beam_destroy(stream->output);
     stream->output = NULL;
     if (stream->task) {
@@ -628,7 +628,7 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
     }
     
     /* time to protect the beam against multi-threaded use */
-    h2_beam_mutex_set(stream->output, beam_enter, stream->task->cond, m);
+    h2_beam_mutex_enable(stream->output);
     
     /* we might see some file buckets in the output, see
      * if we have enough handles reserved. */
@@ -812,11 +812,13 @@ static h2_task *next_stream_task(h2_mplx *m)
                     m->max_stream_started = sid;
                 }
                 
-                h2_beam_timeout_set(stream->input, m->stream_timeout);
-                h2_beam_on_consumed(stream->input, stream_input_ev, 
-                                    stream_input_consumed, m);
-                h2_beam_on_file_beam(stream->input, can_beam_file, m);
-                h2_beam_mutex_set(stream->input, beam_enter, stream->task->cond, m);
+                if (stream->input) {
+                    h2_beam_timeout_set(stream->input, m->stream_timeout);
+                    h2_beam_on_consumed(stream->input, stream_input_ev, 
+                                        stream_input_consumed, m);
+                    h2_beam_on_file_beam(stream->input, can_beam_file, m);
+                    h2_beam_mutex_enable(stream->input);
+                }
                 
                 h2_beam_buffer_size_set(stream->output, m->stream_max_mem);
                 h2_beam_timeout_set(stream->output, m->stream_timeout);
@@ -931,18 +933,22 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
                       H2_STRM_MSG(stream, "task_done, stream open")); 
         /* more data will not arrive, resume the stream */
-        h2_beam_mutex_set(stream->input, NULL, NULL, NULL);
-        h2_beam_mutex_set(stream->output, NULL, NULL, NULL);
-        h2_beam_leave(stream->input);
+        if (stream->input) {
+            h2_beam_mutex_disable(stream->input);
+            h2_beam_leave(stream->input);
+        }
+        h2_beam_mutex_disable(stream->output);
         have_out_data_for(m, stream);
     }
     else if ((stream = h2_ihash_get(m->shold, task->stream_id)) != NULL) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
                       H2_STRM_MSG(stream, "task_done, in hold"));
         /* stream was just waiting for us. */
-        h2_beam_mutex_set(stream->input, NULL, NULL, NULL);
-        h2_beam_mutex_set(stream->output, NULL, NULL, NULL);
-        h2_beam_leave(stream->input);
+        if (stream->input) {
+            h2_beam_mutex_disable(stream->input);
+            h2_beam_leave(stream->input);
+        }
+        h2_beam_mutex_disable(stream->output);
         stream_joined(m, stream);
     }
     else if ((stream = h2_ihash_get(m->spurge, task->stream_id)) != NULL) {
index 99a9c094d74c12791fd326fd8bd380c72cb0c847..62ba81d2f9a40f4513d9c79be5d6b3dd8682723c 100644 (file)
@@ -1738,12 +1738,7 @@ static void ev_stream_open(h2_session *session, h2_stream *stream)
     }
     
     ap_assert(!stream->scheduled);
-    if (stream->request) {
-        const h2_request *r = stream->request;
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
-                      H2_STRM_MSG(stream, "schedule %s %s://%s%s chunked=%d"),
-                      r->method, r->scheme, r->authority, r->path, r->chunked);
-        stream->scheduled = 1;
+    if (h2_stream_prep_processing(stream) == APR_SUCCESS) {
         h2_mplx_process(session->mplx, stream, stream_pri_cmp, session);
     }
     else {
index c4b1227d6a3e1c04490f8b339abcbadf52e5a7aa..14010da4e538081ef4d811d92ff0dfc592a226d3 100644 (file)
@@ -170,14 +170,22 @@ static void H2_STREAM_OUT_LOG(int lvl, h2_stream *s, const char *tag)
     }
 }
 
+static apr_status_t setup_input(h2_stream *stream) {
+    if (stream->input == NULL && !stream->input_eof) {
+        h2_beam_create(&stream->input, stream->pool, stream->id, 
+                       "input", H2_BEAM_OWNER_SEND, 0);
+        h2_beam_send_from(stream->input, stream->pool);
+    }
+    return APR_SUCCESS;
+}
+
 static apr_status_t close_input(h2_stream *stream)
 {
     conn_rec *c = stream->session->c;
-    apr_status_t status;
-    apr_bucket_brigade *tmp;
-    apr_bucket *b;
+    apr_status_t status = APR_SUCCESS;
 
-    if (h2_beam_is_closed(stream->input)) {
+    stream->input_eof = 1;
+    if (stream->input && h2_beam_is_closed(stream->input)) {
         return APR_SUCCESS;
     }
     
@@ -187,22 +195,30 @@ static apr_status_t close_input(h2_stream *stream)
         return APR_ECONNRESET;
     }
     
-    tmp = apr_brigade_create(stream->pool, c->bucket_alloc);
     if (stream->trailers && !apr_is_empty_table(stream->trailers)) {
-        h2_headers *r = h2_headers_create(HTTP_OK, stream->trailers, 
-                                          NULL, stream->pool);
+        apr_bucket_brigade *tmp;
+        apr_bucket *b;
+        h2_headers *r;
+        
+        tmp = apr_brigade_create(stream->pool, c->bucket_alloc);
+        
+        r = h2_headers_create(HTTP_OK, stream->trailers, NULL, stream->pool);
+        stream->trailers = NULL;        
         b = h2_bucket_headers_create(c->bucket_alloc, r);
         APR_BRIGADE_INSERT_TAIL(tmp, b);
-        stream->trailers = NULL;
+        
+        b = apr_bucket_eos_create(c->bucket_alloc);
+        APR_BRIGADE_INSERT_TAIL(tmp, b);
+        
         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
                       H2_STRM_MSG(stream, "added trailers"));
+        setup_input(stream);
+        status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ);
+        apr_brigade_destroy(tmp);
+    }
+    if (stream->input) {
+        return h2_beam_close(stream->input);
     }
-    
-    b = apr_bucket_eos_create(c->bucket_alloc);
-    APR_BRIGADE_INSERT_TAIL(tmp, b);
-    status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ);
-    apr_brigade_destroy(tmp);
-    h2_beam_close(stream->input);
     return status;
 }
 
@@ -440,18 +456,16 @@ apr_status_t h2_stream_recv_DATA(h2_stream *stream, uint8_t flags,
     apr_bucket_brigade *tmp;
     
     ap_assert(stream);
-    if (!stream->input) {
-        return APR_EOF;
+    if (len > 0) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
+                      H2_STRM_MSG(stream, "recv DATA, len=%d"), (int)len);
+        
+        tmp = apr_brigade_create(stream->pool, session->c->bucket_alloc);
+        apr_brigade_write(tmp, NULL, NULL, (const char *)data, len);
+        setup_input(stream);
+        status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ);
+        apr_brigade_destroy(tmp);
     }
-
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
-                  H2_STRM_MSG(stream, "recv DATA, len=%d"), (int)len);
-    
-    tmp = apr_brigade_create(stream->pool, session->c->bucket_alloc);
-    apr_brigade_write(tmp, NULL, NULL, (const char *)data, len);
-    status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ);
-    apr_brigade_destroy(tmp);
-    
     stream->in_data_frames++;
     stream->in_data_octets += len;
     return status;
@@ -478,8 +492,6 @@ h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session,
     stream->monitor      = monitor;
     stream->max_mem      = session->max_stream_mem;
     
-    h2_beam_create(&stream->input, pool, id, "input", H2_BEAM_OWNER_SEND, 0);
-    h2_beam_send_from(stream->input, stream->pool);
     h2_beam_create(&stream->output, pool, id, "output", H2_BEAM_OWNER_RECV, 0);
     
     ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
@@ -498,14 +510,16 @@ void h2_stream_cleanup(h2_stream *stream)
          * references into request pools */
         apr_brigade_cleanup(stream->out_buffer);
     }
-    h2_beam_abort(stream->input);
-    status = h2_beam_wait_empty(stream->input, APR_NONBLOCK_READ);
-    if (status == APR_EAGAIN) {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, 
-                      H2_STRM_MSG(stream, "wait on input drain"));
-        status = h2_beam_wait_empty(stream->input, APR_BLOCK_READ);
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c, 
-                      H2_STRM_MSG(stream, "input drain returned"));
+    if (stream->input) {
+        h2_beam_abort(stream->input);
+        status = h2_beam_wait_empty(stream->input, APR_NONBLOCK_READ);
+        if (status == APR_EAGAIN) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, 
+                          H2_STRM_MSG(stream, "wait on input drain"));
+            status = h2_beam_wait_empty(stream->input, APR_BLOCK_READ);
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c, 
+                          H2_STRM_MSG(stream, "input drain returned"));
+        }
     }
 }
 
@@ -527,10 +541,26 @@ apr_pool_t *h2_stream_detach_pool(h2_stream *stream)
     return pool;
 }
 
+apr_status_t h2_stream_prep_processing(h2_stream *stream)
+{
+    if (stream->request) {
+        const h2_request *r = stream->request;
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
+                      H2_STRM_MSG(stream, "schedule %s %s://%s%s chunked=%d"),
+                      r->method, r->scheme, r->authority, r->path, r->chunked);
+        setup_input(stream);
+        stream->scheduled = 1;
+        return APR_SUCCESS;
+    }
+    return APR_EINVAL;
+}
+
 void h2_stream_rst(h2_stream *stream, int error_code)
 {
     stream->rst_error = error_code;
-    h2_beam_abort(stream->input);
+    if (stream->input) {
+        h2_beam_abort(stream->input);
+    }
     h2_beam_leave(stream->output);
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
                   H2_STRM_MSG(stream, "reset, error=%d"), error_code);
index 86b6da898f06c79e9491ceb327571650f2c28085..241c4fefe5a73434bcc73a8d54c723143d1d31b2 100644 (file)
@@ -77,6 +77,7 @@ struct h2_stream {
     unsigned int aborted   : 1; /* was aborted */
     unsigned int scheduled : 1; /* stream has been scheduled */
     unsigned int has_response : 1; /* response headers are known */
+    unsigned int input_eof : 1; /* no more request data coming */
     unsigned int push_policy;   /* which push policy to use for this request */
     
     struct h2_task *task;       /* assigned task to fullfill request */
@@ -110,6 +111,8 @@ h2_stream *h2_stream_create(int id, apr_pool_t *pool,
  */
 void h2_stream_destroy(h2_stream *stream);
 
+apr_status_t h2_stream_prep_processing(h2_stream *stream);
+
 /*
  * Set a new monitor for this stream, replacing any existing one. Can
  * be called with NULL to have no monitor installed.
index 0190f8a8898d724dd96124e5ca2768881a2cf1a2..19cbca1b2dd05c8c10b055609bab4e9cc46ac0a0 100644 (file)
@@ -17,7 +17,6 @@
 #include <stddef.h>
 
 #include <apr_atomic.h>
-#include <apr_thread_cond.h>
 #include <apr_strings.h>
 
 #include <httpd.h>
@@ -386,7 +385,7 @@ static apr_status_t h2_filter_parse_h1(ap_filter_t* f, apr_bucket_brigade* bb)
  ******************************************************************************/
  
 int h2_task_can_redo(h2_task *task) {
-    if (h2_beam_was_received(task->input.beam)) {
+    if (task->input.beam && h2_beam_was_received(task->input.beam)) {
         /* cannot repeat that. */
         return 0;
     }
@@ -403,7 +402,9 @@ void h2_task_redo(h2_task *task)
 void h2_task_rst(h2_task *task, int error)
 {
     task->rst_error = error;
-    h2_beam_leave(task->input.beam);
+    if (task->input.beam) {
+        h2_beam_leave(task->input.beam);
+    }
     if (!task->worker_done) {
         h2_beam_abort(task->output.beam);
     }
@@ -508,7 +509,6 @@ h2_task *h2_task_create(h2_stream *stream, conn_rec *slave)
     task->output.beam = stream->output;
     
     h2_beam_send_from(stream->output, task->pool);
-    apr_thread_cond_create(&task->cond, pool);
     h2_ctx_create_for(slave, task);
     
     return task;
index f004a4f92b268f87d2ac477d950e6a8d61a2be5e..e0a426b0ad42f04dbd389220c9ef93199f1af879 100644 (file)
@@ -37,7 +37,6 @@
  * of our own to disble those.
  */
 
-struct apr_thread_cond_t;
 struct h2_bucket_beam;
 struct h2_conn;
 struct h2_mplx;
@@ -76,7 +75,6 @@ struct h2_task {
     } output;
     
     struct h2_mplx *mplx;
-    struct apr_thread_cond_t *cond;
     
     unsigned int filters_set    : 1;
     unsigned int frozen         : 1;
index a3f3f8528da6acded24b0c18992f32625dd5c0ac..066487a798a563847febc405b97a4c97376e2585 100644 (file)
@@ -26,7 +26,7 @@
  * @macro
  * Version number of the http2 module as c string
  */
-#define MOD_HTTP2_VERSION "1.9.2-DEV"
+#define MOD_HTTP2_VERSION "1.9.3-DEV"
 
 /**
  * @macro
@@ -34,7 +34,7 @@
  * release. This is a 24 bit number with 8 bits for major number, 8 bits
  * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
  */
-#define MOD_HTTP2_VERSION_NUM 0x010901
+#define MOD_HTTP2_VERSION_NUM 0x010903
 
 
 #endif /* mod_h2_h2_version_h */