]> granicus.if.org Git - apache/commitdiff
mod_http2: fix suspended handling for streams
authorStefan Eissing <icing@apache.org>
Mon, 19 Sep 2016 13:22:47 +0000 (13:22 +0000)
committerStefan Eissing <icing@apache.org>
Mon, 19 Sep 2016 13:22:47 +0000 (13:22 +0000)
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1761434 13f79535-47bb-0310-9956-ffa450edef68

CHANGES
modules/http2/h2_mplx.c
modules/http2/h2_session.c

diff --git a/CHANGES b/CHANGES
index d95aea46817e822f87c7e06db2dca9d7d3fd0598..d79be83bc25484111cc349a8cf73f063e920ab8c 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -1,6 +1,9 @@
                                                          -*- coding: utf-8 -*-
 Changes with Apache 2.5.0
 
+  *) mod_http2: fix suspended handling for streams. Output could become
+     blocked in rare cases.
+
   *) core: Permit unencoded ';' characters to appear in proxy requests and
      Location: response headers. Corresponds to modern browser behavior.
      [William Rowe]
index d0f533fcc00316c0644a04f83dab2cb9f0b36da9..ea574abc4db80e79bcbb76c456be272b5fcc5704 100644 (file)
@@ -168,7 +168,7 @@ static int can_beam_file(void *ctx, h2_bucket_beam *beam,  apr_file_t *file)
     return 0;
 }
 
-static void have_out_data_for(h2_mplx *m, int stream_id);
+static void have_out_data_for(h2_mplx *m, h2_stream *stream, int response);
 static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master);
 
 static void check_tx_reservation(h2_mplx *m) 
@@ -713,6 +713,23 @@ void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx)
     m->input_consumed_ctx = ctx;
 }
 
+static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
+{
+    h2_mplx *m = ctx;
+    apr_status_t status;
+    h2_stream *stream;
+    int acquired;
+    
+    AP_DEBUG_ASSERT(m);
+    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+        stream = h2_ihash_get(m->streams, beam->id);
+        if (stream) {
+            have_out_data_for(m, stream, 0);
+        }
+        leave_mutex(m, acquired);
+    }
+}
+
 static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response)
 {
     apr_status_t status = APR_SUCCESS;
@@ -735,6 +752,7 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response)
         h2_beam_buffer_size_set(task->output.beam, m->stream_max_mem);
         h2_beam_timeout_set(task->output.beam, m->stream_timeout);
         h2_beam_on_consumed(task->output.beam, stream_output_consumed, task);
+        h2_beam_on_produced(task->output.beam, output_produced, m);
         m->tx_handles_reserved -= h2_beam_get_files_beamed(task->output.beam);
         if (!task->output.copy_files) {
             h2_beam_on_file_beam(task->output.beam, can_beam_file, m);
@@ -743,13 +761,12 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response)
         task->output.opened = 1;
     }
     
-    h2_ihash_add(m->sready, stream);
     if (response && response->http_status < 300) {
         /* we might see some file buckets in the output, see
          * if we have enough handles reserved. */
         check_tx_reservation(m);
     }
-    have_out_data_for(m, stream_id);
+    have_out_data_for(m, stream, 1);
     return status;
 }
 
@@ -803,7 +820,7 @@ static apr_status_t out_close(h2_mplx *m, h2_task *task)
                     APLOG_TRACE2);
     }
     output_consumed_signal(m, task);
-    have_out_data_for(m, task->stream_id);
+    have_out_data_for(m, stream, 0);
     return status;
 }
 
@@ -837,12 +854,18 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
     return status;
 }
 
-static void have_out_data_for(h2_mplx *m, int stream_id)
+static void have_out_data_for(h2_mplx *m, h2_stream *stream, int response)
 {
-    (void)stream_id;
-    AP_DEBUG_ASSERT(m);
-    if (m->added_output) {
-        apr_thread_cond_signal(m->added_output);
+    h2_ihash_t *set;
+    ap_assert(m);
+    ap_assert(stream);
+    
+    set = response?  m->sready : m->sresume;
+    if (!h2_ihash_get(set, stream->id)) {
+        h2_ihash_add(set, stream);
+        if (m->added_output) {
+            apr_thread_cond_signal(m->added_output);
+        }
     }
 }
 
@@ -1071,11 +1094,8 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
                           "h2_mplx(%s): task_done, stream still open", 
                           task->id);
-            if (h2_stream_is_suspended(stream)) {
-                /* more data will not arrive, resume the stream */
-                h2_ihash_add(m->sresume, stream);
-                have_out_data_for(m, stream->id);
-            }
+            /* more data will not arrive, resume the stream */
+            have_out_data_for(m, stream, 0);
         }
         else {
             /* stream no longer active, was it placed in hold? */
@@ -1473,25 +1493,6 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m,
     return status;
 }
 
-static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
-{
-    h2_mplx *m = ctx;
-    apr_status_t status;
-    h2_stream *stream;
-    int acquired;
-    
-    AP_DEBUG_ASSERT(m);
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        stream = h2_ihash_get(m->streams, beam->id);
-        if (stream && h2_stream_is_suspended(stream)) {
-            h2_ihash_add(m->sresume, stream);
-            h2_beam_on_produced(beam, NULL, NULL);
-            have_out_data_for(m, beam->id);
-        }
-        leave_mutex(m, acquired);
-    }
-}
-
 apr_status_t h2_mplx_suspend_stream(h2_mplx *m, int stream_id)
 {
     apr_status_t status;
@@ -1502,16 +1503,13 @@ apr_status_t h2_mplx_suspend_stream(h2_mplx *m, int stream_id)
     AP_DEBUG_ASSERT(m);
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         stream = h2_ihash_get(m->streams, stream_id);
-        if (stream) {
+        if (stream && !h2_ihash_get(m->sresume, stream->id)) {
+            /* not marked for resume again already */
             h2_stream_set_suspended(stream, 1);
             task = h2_ihash_get(m->tasks, stream->id);
             if (stream->started && (!task || task->worker_done)) {
                 h2_ihash_add(m->sresume, stream);
             }
-            else if (task->output.beam) {
-                /* register callback so that we can resume on new output */
-                h2_beam_on_produced(task->output.beam, output_produced, m);
-            }
         }
         leave_mutex(m, acquired);
     }
index c5d77a0f71981aa79024f79b5bb3e32b797680eb..1504073eafd8c9898c7e931933c9adfe91a58904 100644 (file)
@@ -1172,8 +1172,6 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
         return NGHTTP2_ERR_CALLBACK_FAILURE;
     }
     
-    AP_DEBUG_ASSERT(!h2_stream_is_suspended(stream));
-    
     status = h2_stream_out_prepare(stream, &nread, &eos);
     if (nread) {
         *data_flags |=  NGHTTP2_DATA_FLAG_NO_COPY;