]> granicus.if.org Git - apache/commitdiff
mod_http2: fixes in input/output bucket handling
authorStefan Eissing <icing@apache.org>
Thu, 28 Apr 2016 11:39:57 +0000 (11:39 +0000)
committerStefan Eissing <icing@apache.org>
Thu, 28 Apr 2016 11:39:57 +0000 (11:39 +0000)
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1741414 13f79535-47bb-0310-9956-ffa450edef68

CHANGES
modules/http2/h2_bucket_beam.c
modules/http2/h2_bucket_beam.h
modules/http2/h2_mplx.c
modules/http2/h2_ngn_shed.c
modules/http2/h2_session.c
modules/http2/h2_task.c
modules/http2/h2_task.h

diff --git a/CHANGES b/CHANGES
index 2d0fefcba74d517d880bd33275ede42da55fd2b3..c8e794b83858e711fa1526fa7d861f76757a7e3f 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -1,13 +1,19 @@
                                                          -*- coding: utf-8 -*-
 Changes with Apache 2.5.0
 
+  *) mod_http2: fixed a bug that caused mod_proxy_http2 to be called for window
+     updates on requests it had already reported done. Added synchronization
+     on early connection/stream close that lets ongoing requests safely drain
+     their input filters.
+     [Stefan Eissing]
+
   *) mod_http2: scoreboard updates that summarize the h2 session (and replace
      the last request information) will only happen when the session is idle or 
      in shutdown/done phase. [Stefan Eissing]
      
-  *) mod_http2: bucket beams now have safe mutex remove. Used for streams where
-     the task worker has finished and all processing happens in the same
-     thread again. [Stefan Eissing]
+  *) mod_http2: HTTP protocol string reported internally, logged and used in
+     SERVER_PROTOCOL changed to "HTTP/2.0" for HTTP/2 connections. 
+     [Stefan Eissing]
      
   *) mod_proxy, mod_ssl: Handle SSLProxy* directives in <Proxy> sections,
      allowing per backend TLS configuration.  [Yann Ylavic]
index e630f84ecb3dd2fa6b36ca4a4747b5b43fdf54a7..6d41fadc5bbf6356402677bec145d7f676a973fd 100644 (file)
@@ -67,6 +67,7 @@ struct h2_beam_proxy {
     APR_RING_ENTRY(h2_beam_proxy) link;
     h2_bucket_beam *beam;
     apr_bucket *bred;
+    apr_size_t n;
 };
 
 static const char Dummy = '\0';
@@ -108,7 +109,7 @@ static void beam_bucket_destroy(void *data)
 
 static apr_bucket * h2_beam_bucket_make(apr_bucket *b, 
                                         h2_bucket_beam *beam,
-                                        apr_bucket *bred)
+                                        apr_bucket *bred, apr_size_t n)
 {
     h2_beam_proxy *d;
 
@@ -116,7 +117,8 @@ static apr_bucket * h2_beam_bucket_make(apr_bucket *b,
     H2_BPROXY_LIST_INSERT_TAIL(&beam->proxies, d);
     d->beam = beam;
     d->bred = bred;
-
+    d->n = n;
+    
     b = apr_bucket_shared_make(b, d, 0, bred? bred->length : 0);
     b->type = &h2_bucket_type_beam;
 
@@ -125,14 +127,15 @@ static apr_bucket * h2_beam_bucket_make(apr_bucket *b,
 
 static apr_bucket *h2_beam_bucket_create(h2_bucket_beam *beam,
                                          apr_bucket *bred,
-                                         apr_bucket_alloc_t *list)
+                                         apr_bucket_alloc_t *list,
+                                         apr_size_t n)
 {
     apr_bucket *b = apr_bucket_alloc(sizeof(*b), list);
 
     APR_BUCKET_INIT(b);
     b->free = apr_bucket_free;
     b->list = list;
-    return h2_beam_bucket_make(b, beam, bred);
+    return h2_beam_bucket_make(b, beam, bred, n);
 }
 
 /*static apr_status_t beam_bucket_setaside(apr_bucket *b, apr_pool_t *pool)
@@ -282,15 +285,10 @@ static apr_status_t r_wait_space(h2_bucket_beam *beam, apr_read_type_e block,
     return beam->aborted? APR_ECONNABORTED : APR_SUCCESS;
 }
 
-static void h2_beam_prep_purge(h2_bucket_beam *beam, apr_bucket *bred)
-{
-    APR_BUCKET_REMOVE(bred);
-    H2_BLIST_INSERT_TAIL(&beam->purge, bred);
-}
-
 static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy)
 {
     h2_beam_lock bl;
+    apr_bucket *b, *next;
 
     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
         /* even when beam buckets are split, only the one where
@@ -300,8 +298,48 @@ static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy)
          * bucket bred is about to be destroyed.
          * remove it from the hold, where it should be now */
         if (proxy->bred) {
-            h2_beam_prep_purge(beam, proxy->bred);
-            proxy->bred = NULL;
+            for (b = H2_BLIST_FIRST(&beam->hold); 
+                 b != H2_BLIST_SENTINEL(&beam->hold);
+                 b = APR_BUCKET_NEXT(b)) {
+                 if (b == proxy->bred) {
+                    break;
+                 }
+            }
+            if (b != H2_BLIST_SENTINEL(&beam->hold)) {
+                /* bucket is in hold as it should be, mark this one
+                 * and all before it for purging. We might have placed meta
+                 * buckets without a green proxy into the hold before it 
+                 * and schedule them for purging now */
+                for (b = H2_BLIST_FIRST(&beam->hold); 
+                     b != H2_BLIST_SENTINEL(&beam->hold);
+                     b = next) {
+                    next = APR_BUCKET_NEXT(b);
+                    if (b == proxy->bred) {
+                        APR_BUCKET_REMOVE(b);
+                        H2_BLIST_INSERT_TAIL(&beam->purge, b);
+                        break;
+                    }
+                    else if (APR_BUCKET_IS_METADATA(b)) {
+                        APR_BUCKET_REMOVE(b);
+                        H2_BLIST_INSERT_TAIL(&beam->purge, b);
+                    }
+                    else {
+                        /* another data bucket before this one in hold. this
+                         * is normal since DATA buckets need not be destroyed
+                         * in order */
+                    }
+                }
+                
+                proxy->bred = NULL;
+            }
+            else {
+                /* it should be there unless we screwed up */
+                ap_log_perror(APLOG_MARK, APLOG_WARNING, 0, beam->red_pool, 
+                              APLOGNO() "h2_beam(%d-%s): emitted bucket not "
+                              "in hold, n=%d", beam->id, beam->tag, 
+                              (int)proxy->n);
+                AP_DEBUG_ASSERT(!proxy->bred);
+            }
         }
         /* notify anyone waiting on space to become available */
         if (!bl.mutex) {
@@ -344,45 +382,17 @@ static apr_status_t beam_close(h2_bucket_beam *beam)
     return APR_SUCCESS;
 }
 
-static void beam_shutdown(h2_bucket_beam *beam, int disconnect)
-{
-    if (disconnect && !H2_BPROXY_LIST_EMPTY(&beam->proxies)) {
-        /* If we are called before all green buckets we put out
-         * there have been destroyed, we need to disentangle ourself.
-         * We NULLify the beam and red buckets in every proxy from us, so
-         * a) red memory is no longer read
-         * b) destruction of the proxy no longer calls back to this beam
-         * This does not protect against races when red and green thread are still
-         * running concurrently and it does not protect from passed out red
-         * memory to still being accessed.
-         */
-        while (!H2_BPROXY_LIST_EMPTY(&beam->proxies)) {
-            h2_beam_proxy *proxy = H2_BPROXY_LIST_FIRST(&beam->proxies);
-            H2_BPROXY_REMOVE(proxy);
-            proxy->beam = NULL;
-            if (proxy->bred) {
-                h2_beam_prep_purge(beam, proxy->bred);
-                proxy->bred = NULL;
-            }
-        }
-    }
-    r_purge_reds(beam);
-    h2_blist_cleanup(&beam->red);
-    beam_close(beam);
-    report_consumption(beam);
-}
-
 static apr_status_t beam_cleanup(void *data)
 {
     h2_bucket_beam *beam = data;
     
-    if (beam->green) {
-        apr_brigade_destroy(beam->green);
-        beam->green = NULL;
-    }
-    beam_shutdown(beam, 0);
+    beam_close(beam);
+    r_purge_reds(beam);
+    h2_blist_cleanup(&beam->red);
+    report_consumption(beam);
     h2_blist_cleanup(&beam->purge);
     h2_blist_cleanup(&beam->hold);
+    
     return APR_SUCCESS;
 }
 
@@ -507,14 +517,29 @@ apr_status_t h2_beam_close(h2_bucket_beam *beam)
     return beam->aborted? APR_ECONNABORTED : APR_SUCCESS;
 }
 
-void h2_beam_shutdown(h2_bucket_beam *beam)
+apr_status_t h2_beam_shutdown(h2_bucket_beam *beam, apr_read_type_e block)
 {
+    apr_status_t status;
     h2_beam_lock bl;
     
-    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
-        beam_shutdown(beam, 1);
+    if ((status = enter_yellow(beam, &bl)) == APR_SUCCESS) {
+        r_purge_reds(beam);
+        h2_blist_cleanup(&beam->red);
+        beam_close(beam);
+        report_consumption(beam);
+        
+        while (status == APR_SUCCESS 
+               && (!H2_BPROXY_LIST_EMPTY(&beam->proxies)
+                   || (beam->green && !APR_BRIGADE_EMPTY(beam->green)))) {
+            if (block == APR_NONBLOCK_READ || !bl.mutex) {
+                status = APR_EAGAIN;
+                break;
+            }
+            status = wait_cond(beam, bl.mutex);
+        }
         leave_yellow(beam, &bl);
     }
+    return status;
 }
 
 static apr_status_t append_bucket(h2_bucket_beam *beam, 
@@ -763,7 +788,8 @@ transfer:
                  * the red brigade.
                  * the beam bucket will notify us on destruction that bred is
                  * no longer needed. */
-                bgreen = h2_beam_bucket_create(beam, bred, bb->bucket_alloc);
+                bgreen = h2_beam_bucket_create(beam, bred, bb->bucket_alloc,
+                                               beam->buckets_sent++);
             }
             
             /* Place the red bucket into our hold, to be destroyed when no
index a8abc908a1e37d8746a9eec82f75c8222431b155..1fc6656a09016e70b586e07c110deb55e6897236 100644 (file)
@@ -179,6 +179,7 @@ struct h2_bucket_beam {
     apr_off_t sent_bytes;     /* amount of bytes send */
     apr_off_t received_bytes; /* amount of bytes received */
     apr_off_t reported_bytes; /* amount of bytes reported as consumed */
+    apr_size_t buckets_sent;
     
     unsigned int aborted : 1;
     unsigned int closed : 1;
@@ -276,11 +277,16 @@ void h2_beam_abort(h2_bucket_beam *beam);
 apr_status_t h2_beam_close(h2_bucket_beam *beam);
 
 /**
- * Empty the buffer and close.
- * 
+ * Empty any buffered data and return APR_SUCCESS when all buckets
+ * in transit have been handled. When called with APR_BLOCK_READ and
+ * with a mutex installed, will wait until this is the case. Otherwise
+ * APR_EAGAIN is returned.
+ * If a timeout is set on the beam, waiting might also time out and
+ * return APR_ETIMEUP.
+ *
  * Call from the red side only.
  */
-void h2_beam_shutdown(h2_bucket_beam *beam);
+apr_status_t h2_beam_shutdown(h2_bucket_beam *beam, apr_read_type_e block);
 
 void h2_beam_mutex_set(h2_bucket_beam *beam, 
                        h2_beam_mutex_enter m_enter,
index 6a886ac59dd98ca00647ee57281933130163087c..1ab684877f6a092369d2b8e3be23ecfeaeaf1c84 100644 (file)
@@ -314,9 +314,15 @@ static void task_destroy(h2_mplx *m, h2_task *task, int events)
 {
     conn_rec *slave = NULL;
     int reuse_slave = 0;
+    apr_status_t status;
     
     /* cleanup any buffered input */
-    h2_task_shutdown(task);
+    status = h2_task_shutdown(task, 0);
+    if (status != APR_SUCCESS){
+        ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, m->c, APLOGNO() 
+                      "h2_task(%s): shutdown", task->id);
+    }
+    
     if (events) {
         /* Process outstanding events before destruction */
         input_consumed_signal(m, task);
@@ -365,14 +371,24 @@ static int task_stream_done(h2_mplx *m, h2_task *task, int rst_error)
     if (task->worker_done) {
         /* already finished or not even started yet */
         h2_iq_remove(m->q, task->stream_id);
-        task_destroy(m, task, 1);
+        task_destroy(m, task, 0);
         return 0;
     }
     else {
         /* cleanup once task is done */
         task->orphaned = 1;
         if (task->input.beam) {
-            h2_beam_shutdown(task->input.beam);
+            apr_status_t status;
+            status = h2_beam_shutdown(task->input.beam, APR_NONBLOCK_READ);
+            if (status == APR_EAGAIN) {
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
+                              "h2_stream(%ld-%d): wait on input shutdown", 
+                              m->id, task->stream_id);
+                status = h2_beam_shutdown(task->input.beam, APR_BLOCK_READ);
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c, 
+                              "h2_stream(%ld-%d): input shutdown returned", 
+                              m->id, task->stream_id);
+            }
             task->input.beam = NULL;
         }
         if (rst_error) {
@@ -471,13 +487,9 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
             }
         }
         
-        if (!h2_ihash_empty(m->tasks)) {
-            ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, m->c, 
-                          "h2_mplx(%ld): release_join, %d tasks still open", 
-                          m->id, (int)h2_ihash_count(m->tasks));
-        }
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
-                      "h2_mplx(%ld): release_join -> destroy", m->id);
+                      "h2_mplx(%ld): release_join (%d tasks left) -> destroy", 
+                      m->id, (int)h2_ihash_count(m->tasks));
         leave_mutex(m, acquired);
         h2_mplx_destroy(m);
         /* all gone */
@@ -612,7 +624,7 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams)
                 else {
                     /* hang around until the h2_task is done, but
                      * shutdown input/output and send out any events asap. */
-                    h2_task_shutdown(task);
+                    h2_task_shutdown(task, 0);
                     input_consumed_signal(m, task);
                 }
             }
@@ -951,7 +963,6 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
             }
             
             if (task->orphaned) {
-                /* TODO: add to purge list */
                 task_destroy(m, task, 0);
                 if (m->join_wait) {
                     apr_thread_cond_signal(m->join_wait);
index 18a32a1809c16ca0d3164f775a40e01a3725bccb..ed3459f0c8766129ee3282239562781ac484d245 100644 (file)
@@ -295,7 +295,8 @@ static apr_status_t ngn_done_task(h2_ngn_shed *shed, h2_req_engine *ngn,
     ngn->no_finished++;
     if (waslive) ngn->no_live--;
     ngn->no_assigned--;
-
+    task->assigned = NULL;
+    
     return APR_SUCCESS;
 }
                                 
index dff0d7986a99bf25cbc7d53cea5f3ab9bd9f697e..ab28afb21cf912e56850c65b6986dc3c01e7e542 100644 (file)
@@ -332,7 +332,7 @@ static apr_status_t stream_release(h2_session *session,
     }
     
     return h2_conn_io_writeb(&session->io,
-                             h2_bucket_eos_create(c->bucket_alloc, stream), 1);
+                             h2_bucket_eos_create(c->bucket_alloc, stream), 0);
 }
 
 static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id,
@@ -618,7 +618,7 @@ static int on_send_data_cb(nghttp2_session *ngh2,
         if (status == APR_SUCCESS && padlen) {
             b = apr_bucket_immortal_create(immortal_zeros, padlen, 
                                            session->c->bucket_alloc);
-            status = h2_conn_io_writeb(&session->io, b, 1);
+            status = h2_conn_io_writeb(&session->io, b, 0);
         }
     }
     
index 968f7ffb2a6997635eb4d736c46d47a446b42843..26f1bf5ba476563812d86d648f6a0b83056b9fb1 100644 (file)
@@ -253,7 +253,7 @@ static apr_status_t open_response(h2_task *task)
     if (!response) {
         /* This happens currently when ap_die(status, r) is invoked
          * by a read request filter. */
-        ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, task->c, APLOGNO(03204)
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->c, APLOGNO(03204)
                       "h2_task(%s): write without response for %s %s %s",
                       task->id, 
                       task->request->method, 
@@ -487,14 +487,21 @@ void h2_task_rst(h2_task *task, int error)
     }
 }
 
-void h2_task_shutdown(h2_task *task)
+apr_status_t h2_task_shutdown(h2_task *task, int block)
 {
-    if (task->input.beam) {
-        h2_beam_shutdown(task->input.beam);
-    }
     if (task->output.beam) {
-        h2_beam_shutdown(task->output.beam);
+        apr_status_t status;
+        status = h2_beam_shutdown(task->output.beam, APR_NONBLOCK_READ);
+        if (block && status == APR_EAGAIN) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, task->c,
+                          "h2_task(%s): output shutdown waiting", task->id);
+            status = h2_beam_shutdown(task->output.beam, APR_BLOCK_READ);
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, task->c,
+                          "h2_task(%s): output shutdown done", task->id);
+        }
+        return status;
     }
+    return APR_SUCCESS;
 }
 
 /*******************************************************************************
index b9d531b5617f123a7fc76884450420d541354ee8..58b64b0a1c6f2cfddc77e29f1bcfc4c4aed0ba33 100644 (file)
@@ -117,7 +117,7 @@ void h2_task_rst(h2_task *task, int error);
 /**
  * Shuts all input/output down. Clears any buckets buffered and closes.
  */
-void h2_task_shutdown(h2_task *task);
+apr_status_t h2_task_shutdown(h2_task *task, int block);
 
 void h2_task_register_hooks(void);
 /*