]> granicus.if.org Git - apache/commitdiff
FLUSHing H2EOS and H2EOC buckets to preserve destruction order, improving bucket...
authorStefan Eissing <icing@apache.org>
Tue, 26 Apr 2016 14:50:57 +0000 (14:50 +0000)
committerStefan Eissing <icing@apache.org>
Tue, 26 Apr 2016 14:50:57 +0000 (14:50 +0000)
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1741045 13f79535-47bb-0310-9956-ffa450edef68

modules/http2/h2_bucket_beam.c
modules/http2/h2_bucket_beam.h
modules/http2/h2_conn_io.c
modules/http2/h2_conn_io.h
modules/http2/h2_session.c

index 64413aa7b1f0e691856d10687c8df4b6a792931a..6ca39e1de3ab4e43bb94b87a29a9414bb40d16a6 100644 (file)
 #include "h2_util.h"
 #include "h2_bucket_beam.h"
 
-static void h2_beam_emitted(h2_bucket_beam *beam, apr_bucket *bred);
+static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy);
+
+#define H2_BPROXY_NEXT(e)             APR_RING_NEXT((e), link)
+#define H2_BPROXY_PREV(e)             APR_RING_PREV((e), link)
+#define H2_BPROXY_REMOVE(e)           APR_RING_REMOVE((e), link)
+
+#define H2_BPROXY_LIST_INIT(b)        APR_RING_INIT(&(b)->list, h2_beam_proxy, link);
+#define H2_BPROXY_LIST_SENTINEL(b)    APR_RING_SENTINEL(&(b)->list, h2_beam_proxy, link)
+#define H2_BPROXY_LIST_EMPTY(b)       APR_RING_EMPTY(&(b)->list, h2_beam_proxy, link)
+#define H2_BPROXY_LIST_FIRST(b)       APR_RING_FIRST(&(b)->list)
+#define H2_BPROXY_LIST_LAST(b)       APR_RING_LAST(&(b)->list)
+#define H2_PROXY_BLIST_INSERT_HEAD(b, e) do {                          \
+       h2_beam_proxy *ap__b = (e);                                        \
+       APR_RING_INSERT_HEAD(&(b)->list, ap__b, h2_beam_proxy, link);   \
+    } while (0)
+#define H2_BPROXY_LIST_INSERT_TAIL(b, e) do {                          \
+       h2_beam_proxy *ap__b = (e);                                     \
+       APR_RING_INSERT_TAIL(&(b)->list, ap__b, h2_beam_proxy, link);   \
+    } while (0)
+#define H2_BPROXY_LIST_CONCAT(a, b) do {                                       \
+        APR_RING_CONCAT(&(a)->list, &(b)->list, h2_beam_proxy, link);  \
+    } while (0)
+#define H2_BPROXY_LIST_PREPEND(a, b) do {                                      \
+        APR_RING_PREPEND(&(a)->list, &(b)->list, h2_beam_proxy, link); \
+    } while (0)
+
 
 /*******************************************************************************
  * beam bucket with reference to beam and bucket it represents
@@ -37,18 +62,19 @@ const apr_bucket_type_t h2_bucket_type_beam;
 
 #define H2_BUCKET_IS_BEAM(e)     (e->type == &h2_bucket_type_beam)
 
-typedef struct {
+struct h2_beam_proxy {
     apr_bucket_refcount refcount;
+    APR_RING_ENTRY(h2_beam_proxy) link;
     h2_bucket_beam *beam;
     apr_bucket *bred;
-} h2_beam_bucket;
+};
 
 static const char Dummy = '\0';
 
 static apr_status_t beam_bucket_read(apr_bucket *b, const char **str, 
                                      apr_size_t *len, apr_read_type_e block)
 {
-    h2_beam_bucket *d = b->data;
+    h2_beam_proxy *d = b->data;
     if (d->bred) {
         const char *data;
         apr_status_t status = apr_bucket_read(d->bred, &data, len, block);
@@ -60,16 +86,21 @@ static apr_status_t beam_bucket_read(apr_bucket *b, const char **str,
     }
     *str = &Dummy;
     *len = 0;
-    return APR_SUCCESS;
+    return APR_ECONNRESET;
 }
 
 static void beam_bucket_destroy(void *data)
 {
-    h2_beam_bucket *d = data;
+    h2_beam_proxy *d = data;
 
     if (apr_bucket_shared_destroy(d)) {
-        if (d->bred) {
-            h2_beam_emitted(d->beam, d->bred);
+        /* When the beam gets destroyed before this bucket, it will
+         * NULLify its reference here. This is not protected by a mutex,
+         * so it will not help with race conditions.
+         * But it lets us shut down memory pool with circulare beam
+         * references. */
+        if (d->beam) {
+            h2_beam_emitted(d->beam, d);
         }
         apr_bucket_free(d);
     }
@@ -79,9 +110,10 @@ static apr_bucket * h2_beam_bucket_make(apr_bucket *b,
                                         h2_bucket_beam *beam,
                                         apr_bucket *bred)
 {
-    h2_beam_bucket *d;
+    h2_beam_proxy *d;
 
     d = apr_bucket_alloc(sizeof(*d), b->list);
+    H2_BPROXY_LIST_INSERT_TAIL(&beam->proxies, d);
     d->beam = beam;
     d->bred = bred;
 
@@ -103,6 +135,25 @@ static apr_bucket *h2_beam_bucket_create(h2_bucket_beam *beam,
     return h2_beam_bucket_make(b, beam, bred);
 }
 
+/*static apr_status_t beam_bucket_setaside(apr_bucket *b, apr_pool_t *pool)
+{
+    apr_status_t status = APR_SUCCESS;
+    h2_beam_proxy *d = b->data;
+    if (d->bred) {
+        const char *data;
+        apr_size_t len;
+        
+        status = apr_bucket_read(d->bred, &data, &len, APR_BLOCK_READ);
+        if (status == APR_SUCCESS) {
+            b = apr_bucket_heap_make(b, (char *)data + b->start, b->length, NULL);
+            if (b == NULL) {
+                return APR_ENOMEM;
+            }
+        }
+    }
+    return status;
+}*/
+
 const apr_bucket_type_t h2_bucket_type_beam = {
     "BEAM", 5, APR_BUCKET_DATA,
     beam_bucket_destroy,
@@ -239,7 +290,7 @@ static void h2_beam_prep_purge(h2_bucket_beam *beam, apr_bucket *bred)
     H2_BLIST_INSERT_TAIL(&beam->purge, bred);
 }
 
-static void h2_beam_emitted(h2_bucket_beam *beam, apr_bucket *bred)
+static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy)
 {
     apr_thread_mutex_t *lock;
     int acquired;
@@ -247,11 +298,14 @@ static void h2_beam_emitted(h2_bucket_beam *beam, apr_bucket *bred)
     if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
         /* even when beam buckets are split, only the one where
          * refcount drops to 0 will call us */
-        --beam->live_beam_buckets;
+        H2_BPROXY_REMOVE(proxy);
         /* invoked from green thread, the last beam bucket for the red
          * bucket bred is about to be destroyed.
          * remove it from the hold, where it should be now */
-        h2_beam_prep_purge(beam, bred);
+        if (proxy->bred) {
+            h2_beam_prep_purge(beam, proxy->bred);
+            proxy->bred = NULL;
+        }
         /* notify anyone waiting on space to become available */
         if (!lock) {
             r_purge_reds(beam);
@@ -282,17 +336,54 @@ static void h2_blist_cleanup(h2_blist *bl)
     }
 }
 
-static apr_status_t beam_cleanup(void *data)
+static apr_status_t beam_close(h2_bucket_beam *beam)
 {
-    h2_bucket_beam *beam = data;
+    if (!beam->closed) {
+        beam->closed = 1;
+        if (beam->m_cond) {
+            apr_thread_cond_broadcast(beam->m_cond);
+        }
+    }
+    return APR_SUCCESS;
+}
 
-    if (beam->live_beam_buckets) {
-        ap_log_perror(APLOG_MARK, APLOG_WARNING, 0, beam->life_pool, 
-                      "h2_beam(%d-%s) cleanup with live %d buckets", 
-                      beam->id, beam->tag, (int)beam->live_beam_buckets);
+static void beam_shutdown(h2_bucket_beam *beam, int disconnect)
+{
+    if (disconnect && !H2_BPROXY_LIST_EMPTY(&beam->proxies)) {
+        /* If we are called before all green buckets we put out
+         * there have been destroyed, we need to disentangle ourself.
+         * We NULLify the beam and red buckets in every proxy from us, so
+         * a) red memory is no longer read
+         * b) destruction of the proxy no longer calls back to this beam
+         * This does not protect against races when red and green thread are still
+         * running concurrently and it does not protect from passed out red
+         * memory to still being accessed.
+         */
+        while (!H2_BPROXY_LIST_EMPTY(&beam->proxies)) {
+            h2_beam_proxy *proxy = H2_BPROXY_LIST_FIRST(&beam->proxies);
+            H2_BPROXY_REMOVE(proxy);
+            proxy->beam = NULL;
+            if (proxy->bred) {
+                h2_beam_prep_purge(beam, proxy->bred);
+                proxy->bred = NULL;
+            }
+        }
     }
-    AP_DEBUG_ASSERT(beam->live_beam_buckets == 0);
+    r_purge_reds(beam);
     h2_blist_cleanup(&beam->red);
+    beam_close(beam);
+    report_consumption(beam);
+}
+
+static apr_status_t beam_cleanup(void *data)
+{
+    h2_bucket_beam *beam = data;
+    
+    if (beam->green) {
+        apr_brigade_destroy(beam->green);
+        beam->green = NULL;
+    }
+    beam_shutdown(beam, 0);
     h2_blist_cleanup(&beam->purge);
     h2_blist_cleanup(&beam->hold);
     return APR_SUCCESS;
@@ -321,11 +412,11 @@ apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *life_pool,
     H2_BLIST_INIT(&beam->red);
     H2_BLIST_INIT(&beam->hold);
     H2_BLIST_INIT(&beam->purge);
+    H2_BPROXY_LIST_INIT(&beam->proxies);
     beam->life_pool = life_pool;
     beam->max_buf_size = max_buf_size;
 
-    apr_pool_cleanup_register(life_pool, beam, beam_cleanup, 
-                              apr_pool_cleanup_null);
+    apr_pool_pre_cleanup_register(life_pool, beam, beam_cleanup);
     *pbeam = beam;
     
     return status;
@@ -421,17 +512,6 @@ void h2_beam_abort(h2_bucket_beam *beam)
     }
 }
 
-static apr_status_t beam_close(h2_bucket_beam *beam)
-{
-    if (!beam->closed) {
-        beam->closed = 1;
-        if (beam->m_cond) {
-            apr_thread_cond_broadcast(beam->m_cond);
-        }
-    }
-    return APR_SUCCESS;
-}
-
 apr_status_t h2_beam_close(h2_bucket_beam *beam)
 {
     apr_thread_mutex_t *lock;
@@ -452,23 +532,7 @@ void h2_beam_shutdown(h2_bucket_beam *beam)
     int acquired;
     
     if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
-        r_purge_reds(beam);
-        h2_blist_cleanup(&beam->red);
-        beam_close(beam);
-        report_consumption(beam);
-        leave_yellow(beam, lock, acquired);
-    }
-}
-
-void h2_beam_reset(h2_bucket_beam *beam)
-{
-    apr_thread_mutex_t *lock;
-    int acquired;
-    
-    if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
-        beam_cleanup(beam);
-        beam->closed = beam->close_sent = 0;
-        beam->sent_bytes = beam->received_bytes = beam->reported_bytes = 0;
+        beam_shutdown(beam, 1);
         leave_yellow(beam, lock, acquired);
     }
 }
@@ -708,7 +772,6 @@ transfer:
                  * the beam bucket will notify us on destruction that bred is
                  * no longer needed. */
                 bgreen = h2_beam_bucket_create(beam, bred, bb->bucket_alloc);
-                ++beam->live_beam_buckets;
             }
             
             /* Place the red bucket into our hold, to be destroyed when no
index 0b405990229ca7155ef8e1a559409f50c3a5b680..f94b1b9ecac45ff2f51c74acd05ffbc7e7e1133a 100644 (file)
@@ -150,6 +150,11 @@ typedef void h2_beam_mutex_leave(void *ctx,
 typedef void h2_beam_consumed_callback(void *ctx, h2_bucket_beam *beam,
                                        apr_off_t bytes);
 
+typedef struct h2_beam_proxy h2_beam_proxy;
+typedef struct {
+    APR_RING_HEAD(h2_beam_proxy_list, h2_beam_proxy) list;
+} h2_bproxy_list;
+
 typedef int h2_beam_can_beam_callback(void *ctx, h2_bucket_beam *beam,
                                       apr_file_t *file);
 
@@ -160,10 +165,10 @@ struct h2_bucket_beam {
     h2_blist hold;
     h2_blist purge;
     apr_bucket_brigade *green;
+    h2_bproxy_list proxies;
     apr_pool_t *life_pool;
     
     apr_size_t max_buf_size;
-    apr_size_t live_beam_buckets;
     apr_size_t files_beamed;  /* how many file handles have been set aside */
     apr_file_t *last_beamed;  /* last file beamed */
     apr_off_t sent_bytes;     /* amount of bytes send */
@@ -239,11 +244,6 @@ apr_status_t h2_beam_close(h2_bucket_beam *beam);
  */
 void h2_beam_shutdown(h2_bucket_beam *beam);
 
-/**
- * Reset the beam to its intial, empty state.
- */
-void h2_beam_reset(h2_bucket_beam *beam);
-
 void h2_beam_mutex_set(h2_bucket_beam *beam, 
                        h2_beam_mutex_enter m_enter,
                        h2_beam_mutex_leave m_leave,
index 59561ecd61982667a354b87fcda00c93d49e3743..badee774240dee27606b45015b96d064e43482c5 100644 (file)
@@ -254,9 +254,13 @@ static apr_status_t bucketeer_buffer(h2_conn_io *io)
     return APR_SUCCESS;
 }
 
-apr_status_t h2_conn_io_writeb(h2_conn_io *io, apr_bucket *b)
+apr_status_t h2_conn_io_writeb(h2_conn_io *io, apr_bucket *b, int flush)
 {
     APR_BRIGADE_INSERT_TAIL(io->output, b);
+    if (flush) {
+        b = apr_bucket_flush_create(io->c->bucket_alloc);
+        APR_BRIGADE_INSERT_TAIL(io->output, b);
+    }
     return APR_SUCCESS;
 }
 
@@ -315,7 +319,7 @@ apr_status_t h2_conn_io_write_eoc(h2_conn_io *io, h2_session *session)
 {
     apr_bucket *b = h2_bucket_eoc_create(io->c->bucket_alloc, session);
     APR_BRIGADE_INSERT_TAIL(io->output, b);
-    return h2_conn_io_flush_int(io, 0, 1);
+    return h2_conn_io_flush_int(io, 1, 1);
 }
 
 apr_status_t h2_conn_io_write(h2_conn_io *io, 
index b8be671d38e1214628f96f87720795a015fa1198..c397e9f608ea87277f33965a8867bb6a472a3f70 100644 (file)
@@ -64,7 +64,7 @@ apr_status_t h2_conn_io_write(h2_conn_io *io,
  * @param io the connection io
  * @param b the bucket to append
  */
-apr_status_t h2_conn_io_writeb(h2_conn_io *io, apr_bucket *b);
+apr_status_t h2_conn_io_writeb(h2_conn_io *io, apr_bucket *b, int flush);
 
 /**
  * Append an End-Of-Connection bucket to the output that, once destroyed,
index 76c9c26da430139b336e31b3975fe3d49ca052f4..ca9492029802084c911863e63a4b6a191fcf6c4e 100644 (file)
@@ -312,8 +312,9 @@ static apr_status_t stream_release(h2_session *session,
                                    h2_stream *stream,
                                    uint32_t error_code) 
 {
+    conn_rec *c = session->c;
     if (!error_code) {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
                       "h2_stream(%ld-%d): handled, closing", 
                       session->id, (int)stream->id);
         if (H2_STREAM_CLIENT_INITIATED(stream->id)) {
@@ -323,7 +324,7 @@ static apr_status_t stream_release(h2_session *session,
         }
     }
     else {
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03065)
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(03065)
                       "h2_stream(%ld-%d): closing with err=%d %s", 
                       session->id, (int)stream->id, (int)error_code,
                       h2_h2_err_description(error_code));
@@ -331,8 +332,7 @@ static apr_status_t stream_release(h2_session *session,
     }
     
     return h2_conn_io_writeb(&session->io,
-                             h2_bucket_eos_create(session->c->bucket_alloc, 
-                                                  stream));
+                             h2_bucket_eos_create(c->bucket_alloc, stream), 1);
 }
 
 static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id,
@@ -618,7 +618,7 @@ static int on_send_data_cb(nghttp2_session *ngh2,
         if (status == APR_SUCCESS && padlen) {
             b = apr_bucket_immortal_create(immortal_zeros, padlen, 
                                            session->c->bucket_alloc);
-            status = h2_conn_io_writeb(&session->io, b);
+            status = h2_conn_io_writeb(&session->io, b, 1);
         }
     }