From 1ad4ac969104e92e66489e0d5900e1b55149ee24 Mon Sep 17 00:00:00 2001 From: Stefan Eissing Date: Tue, 26 Apr 2016 14:50:57 +0000 Subject: [PATCH] FLUSHing H2EOS and H2EOC buckets to preserve destruction order, improving bucket beams to disengage from live green buckets on shutdown git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1741045 13f79535-47bb-0310-9956-ffa450edef68 --- modules/http2/h2_bucket_beam.c | 163 +++++++++++++++++++++++---------- modules/http2/h2_bucket_beam.h | 12 +-- modules/http2/h2_conn_io.c | 8 +- modules/http2/h2_conn_io.h | 2 +- modules/http2/h2_session.c | 10 +- 5 files changed, 131 insertions(+), 64 deletions(-) diff --git a/modules/http2/h2_bucket_beam.c b/modules/http2/h2_bucket_beam.c index 64413aa7b1..6ca39e1de3 100644 --- a/modules/http2/h2_bucket_beam.c +++ b/modules/http2/h2_bucket_beam.c @@ -27,7 +27,32 @@ #include "h2_util.h" #include "h2_bucket_beam.h" -static void h2_beam_emitted(h2_bucket_beam *beam, apr_bucket *bred); +static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy); + +#define H2_BPROXY_NEXT(e) APR_RING_NEXT((e), link) +#define H2_BPROXY_PREV(e) APR_RING_PREV((e), link) +#define H2_BPROXY_REMOVE(e) APR_RING_REMOVE((e), link) + +#define H2_BPROXY_LIST_INIT(b) APR_RING_INIT(&(b)->list, h2_beam_proxy, link); +#define H2_BPROXY_LIST_SENTINEL(b) APR_RING_SENTINEL(&(b)->list, h2_beam_proxy, link) +#define H2_BPROXY_LIST_EMPTY(b) APR_RING_EMPTY(&(b)->list, h2_beam_proxy, link) +#define H2_BPROXY_LIST_FIRST(b) APR_RING_FIRST(&(b)->list) +#define H2_BPROXY_LIST_LAST(b) APR_RING_LAST(&(b)->list) +#define H2_PROXY_BLIST_INSERT_HEAD(b, e) do { \ + h2_beam_proxy *ap__b = (e); \ + APR_RING_INSERT_HEAD(&(b)->list, ap__b, h2_beam_proxy, link); \ + } while (0) +#define H2_BPROXY_LIST_INSERT_TAIL(b, e) do { \ + h2_beam_proxy *ap__b = (e); \ + APR_RING_INSERT_TAIL(&(b)->list, ap__b, h2_beam_proxy, link); \ + } while (0) +#define H2_BPROXY_LIST_CONCAT(a, b) do { \ + APR_RING_CONCAT(&(a)->list, &(b)->list, h2_beam_proxy, link); \ + } while (0) +#define H2_BPROXY_LIST_PREPEND(a, b) do { \ + APR_RING_PREPEND(&(a)->list, &(b)->list, h2_beam_proxy, link); \ + } while (0) + /******************************************************************************* * beam bucket with reference to beam and bucket it represents @@ -37,18 +62,19 @@ const apr_bucket_type_t h2_bucket_type_beam; #define H2_BUCKET_IS_BEAM(e) (e->type == &h2_bucket_type_beam) -typedef struct { +struct h2_beam_proxy { apr_bucket_refcount refcount; + APR_RING_ENTRY(h2_beam_proxy) link; h2_bucket_beam *beam; apr_bucket *bred; -} h2_beam_bucket; +}; static const char Dummy = '\0'; static apr_status_t beam_bucket_read(apr_bucket *b, const char **str, apr_size_t *len, apr_read_type_e block) { - h2_beam_bucket *d = b->data; + h2_beam_proxy *d = b->data; if (d->bred) { const char *data; apr_status_t status = apr_bucket_read(d->bred, &data, len, block); @@ -60,16 +86,21 @@ static apr_status_t beam_bucket_read(apr_bucket *b, const char **str, } *str = &Dummy; *len = 0; - return APR_SUCCESS; + return APR_ECONNRESET; } static void beam_bucket_destroy(void *data) { - h2_beam_bucket *d = data; + h2_beam_proxy *d = data; if (apr_bucket_shared_destroy(d)) { - if (d->bred) { - h2_beam_emitted(d->beam, d->bred); + /* When the beam gets destroyed before this bucket, it will + * NULLify its reference here. This is not protected by a mutex, + * so it will not help with race conditions. + * But it lets us shut down memory pool with circulare beam + * references. */ + if (d->beam) { + h2_beam_emitted(d->beam, d); } apr_bucket_free(d); } @@ -79,9 +110,10 @@ static apr_bucket * h2_beam_bucket_make(apr_bucket *b, h2_bucket_beam *beam, apr_bucket *bred) { - h2_beam_bucket *d; + h2_beam_proxy *d; d = apr_bucket_alloc(sizeof(*d), b->list); + H2_BPROXY_LIST_INSERT_TAIL(&beam->proxies, d); d->beam = beam; d->bred = bred; @@ -103,6 +135,25 @@ static apr_bucket *h2_beam_bucket_create(h2_bucket_beam *beam, return h2_beam_bucket_make(b, beam, bred); } +/*static apr_status_t beam_bucket_setaside(apr_bucket *b, apr_pool_t *pool) +{ + apr_status_t status = APR_SUCCESS; + h2_beam_proxy *d = b->data; + if (d->bred) { + const char *data; + apr_size_t len; + + status = apr_bucket_read(d->bred, &data, &len, APR_BLOCK_READ); + if (status == APR_SUCCESS) { + b = apr_bucket_heap_make(b, (char *)data + b->start, b->length, NULL); + if (b == NULL) { + return APR_ENOMEM; + } + } + } + return status; +}*/ + const apr_bucket_type_t h2_bucket_type_beam = { "BEAM", 5, APR_BUCKET_DATA, beam_bucket_destroy, @@ -239,7 +290,7 @@ static void h2_beam_prep_purge(h2_bucket_beam *beam, apr_bucket *bred) H2_BLIST_INSERT_TAIL(&beam->purge, bred); } -static void h2_beam_emitted(h2_bucket_beam *beam, apr_bucket *bred) +static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy) { apr_thread_mutex_t *lock; int acquired; @@ -247,11 +298,14 @@ static void h2_beam_emitted(h2_bucket_beam *beam, apr_bucket *bred) if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { /* even when beam buckets are split, only the one where * refcount drops to 0 will call us */ - --beam->live_beam_buckets; + H2_BPROXY_REMOVE(proxy); /* invoked from green thread, the last beam bucket for the red * bucket bred is about to be destroyed. * remove it from the hold, where it should be now */ - h2_beam_prep_purge(beam, bred); + if (proxy->bred) { + h2_beam_prep_purge(beam, proxy->bred); + proxy->bred = NULL; + } /* notify anyone waiting on space to become available */ if (!lock) { r_purge_reds(beam); @@ -282,17 +336,54 @@ static void h2_blist_cleanup(h2_blist *bl) } } -static apr_status_t beam_cleanup(void *data) +static apr_status_t beam_close(h2_bucket_beam *beam) { - h2_bucket_beam *beam = data; + if (!beam->closed) { + beam->closed = 1; + if (beam->m_cond) { + apr_thread_cond_broadcast(beam->m_cond); + } + } + return APR_SUCCESS; +} - if (beam->live_beam_buckets) { - ap_log_perror(APLOG_MARK, APLOG_WARNING, 0, beam->life_pool, - "h2_beam(%d-%s) cleanup with live %d buckets", - beam->id, beam->tag, (int)beam->live_beam_buckets); +static void beam_shutdown(h2_bucket_beam *beam, int disconnect) +{ + if (disconnect && !H2_BPROXY_LIST_EMPTY(&beam->proxies)) { + /* If we are called before all green buckets we put out + * there have been destroyed, we need to disentangle ourself. + * We NULLify the beam and red buckets in every proxy from us, so + * a) red memory is no longer read + * b) destruction of the proxy no longer calls back to this beam + * This does not protect against races when red and green thread are still + * running concurrently and it does not protect from passed out red + * memory to still being accessed. + */ + while (!H2_BPROXY_LIST_EMPTY(&beam->proxies)) { + h2_beam_proxy *proxy = H2_BPROXY_LIST_FIRST(&beam->proxies); + H2_BPROXY_REMOVE(proxy); + proxy->beam = NULL; + if (proxy->bred) { + h2_beam_prep_purge(beam, proxy->bred); + proxy->bred = NULL; + } + } } - AP_DEBUG_ASSERT(beam->live_beam_buckets == 0); + r_purge_reds(beam); h2_blist_cleanup(&beam->red); + beam_close(beam); + report_consumption(beam); +} + +static apr_status_t beam_cleanup(void *data) +{ + h2_bucket_beam *beam = data; + + if (beam->green) { + apr_brigade_destroy(beam->green); + beam->green = NULL; + } + beam_shutdown(beam, 0); h2_blist_cleanup(&beam->purge); h2_blist_cleanup(&beam->hold); return APR_SUCCESS; @@ -321,11 +412,11 @@ apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *life_pool, H2_BLIST_INIT(&beam->red); H2_BLIST_INIT(&beam->hold); H2_BLIST_INIT(&beam->purge); + H2_BPROXY_LIST_INIT(&beam->proxies); beam->life_pool = life_pool; beam->max_buf_size = max_buf_size; - apr_pool_cleanup_register(life_pool, beam, beam_cleanup, - apr_pool_cleanup_null); + apr_pool_pre_cleanup_register(life_pool, beam, beam_cleanup); *pbeam = beam; return status; @@ -421,17 +512,6 @@ void h2_beam_abort(h2_bucket_beam *beam) } } -static apr_status_t beam_close(h2_bucket_beam *beam) -{ - if (!beam->closed) { - beam->closed = 1; - if (beam->m_cond) { - apr_thread_cond_broadcast(beam->m_cond); - } - } - return APR_SUCCESS; -} - apr_status_t h2_beam_close(h2_bucket_beam *beam) { apr_thread_mutex_t *lock; @@ -452,23 +532,7 @@ void h2_beam_shutdown(h2_bucket_beam *beam) int acquired; if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { - r_purge_reds(beam); - h2_blist_cleanup(&beam->red); - beam_close(beam); - report_consumption(beam); - leave_yellow(beam, lock, acquired); - } -} - -void h2_beam_reset(h2_bucket_beam *beam) -{ - apr_thread_mutex_t *lock; - int acquired; - - if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { - beam_cleanup(beam); - beam->closed = beam->close_sent = 0; - beam->sent_bytes = beam->received_bytes = beam->reported_bytes = 0; + beam_shutdown(beam, 1); leave_yellow(beam, lock, acquired); } } @@ -708,7 +772,6 @@ transfer: * the beam bucket will notify us on destruction that bred is * no longer needed. */ bgreen = h2_beam_bucket_create(beam, bred, bb->bucket_alloc); - ++beam->live_beam_buckets; } /* Place the red bucket into our hold, to be destroyed when no diff --git a/modules/http2/h2_bucket_beam.h b/modules/http2/h2_bucket_beam.h index 0b40599022..f94b1b9eca 100644 --- a/modules/http2/h2_bucket_beam.h +++ b/modules/http2/h2_bucket_beam.h @@ -150,6 +150,11 @@ typedef void h2_beam_mutex_leave(void *ctx, typedef void h2_beam_consumed_callback(void *ctx, h2_bucket_beam *beam, apr_off_t bytes); +typedef struct h2_beam_proxy h2_beam_proxy; +typedef struct { + APR_RING_HEAD(h2_beam_proxy_list, h2_beam_proxy) list; +} h2_bproxy_list; + typedef int h2_beam_can_beam_callback(void *ctx, h2_bucket_beam *beam, apr_file_t *file); @@ -160,10 +165,10 @@ struct h2_bucket_beam { h2_blist hold; h2_blist purge; apr_bucket_brigade *green; + h2_bproxy_list proxies; apr_pool_t *life_pool; apr_size_t max_buf_size; - apr_size_t live_beam_buckets; apr_size_t files_beamed; /* how many file handles have been set aside */ apr_file_t *last_beamed; /* last file beamed */ apr_off_t sent_bytes; /* amount of bytes send */ @@ -239,11 +244,6 @@ apr_status_t h2_beam_close(h2_bucket_beam *beam); */ void h2_beam_shutdown(h2_bucket_beam *beam); -/** - * Reset the beam to its intial, empty state. - */ -void h2_beam_reset(h2_bucket_beam *beam); - void h2_beam_mutex_set(h2_bucket_beam *beam, h2_beam_mutex_enter m_enter, h2_beam_mutex_leave m_leave, diff --git a/modules/http2/h2_conn_io.c b/modules/http2/h2_conn_io.c index 59561ecd61..badee77424 100644 --- a/modules/http2/h2_conn_io.c +++ b/modules/http2/h2_conn_io.c @@ -254,9 +254,13 @@ static apr_status_t bucketeer_buffer(h2_conn_io *io) return APR_SUCCESS; } -apr_status_t h2_conn_io_writeb(h2_conn_io *io, apr_bucket *b) +apr_status_t h2_conn_io_writeb(h2_conn_io *io, apr_bucket *b, int flush) { APR_BRIGADE_INSERT_TAIL(io->output, b); + if (flush) { + b = apr_bucket_flush_create(io->c->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(io->output, b); + } return APR_SUCCESS; } @@ -315,7 +319,7 @@ apr_status_t h2_conn_io_write_eoc(h2_conn_io *io, h2_session *session) { apr_bucket *b = h2_bucket_eoc_create(io->c->bucket_alloc, session); APR_BRIGADE_INSERT_TAIL(io->output, b); - return h2_conn_io_flush_int(io, 0, 1); + return h2_conn_io_flush_int(io, 1, 1); } apr_status_t h2_conn_io_write(h2_conn_io *io, diff --git a/modules/http2/h2_conn_io.h b/modules/http2/h2_conn_io.h index b8be671d38..c397e9f608 100644 --- a/modules/http2/h2_conn_io.h +++ b/modules/http2/h2_conn_io.h @@ -64,7 +64,7 @@ apr_status_t h2_conn_io_write(h2_conn_io *io, * @param io the connection io * @param b the bucket to append */ -apr_status_t h2_conn_io_writeb(h2_conn_io *io, apr_bucket *b); +apr_status_t h2_conn_io_writeb(h2_conn_io *io, apr_bucket *b, int flush); /** * Append an End-Of-Connection bucket to the output that, once destroyed, diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c index 76c9c26da4..ca94920298 100644 --- a/modules/http2/h2_session.c +++ b/modules/http2/h2_session.c @@ -312,8 +312,9 @@ static apr_status_t stream_release(h2_session *session, h2_stream *stream, uint32_t error_code) { + conn_rec *c = session->c; if (!error_code) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "h2_stream(%ld-%d): handled, closing", session->id, (int)stream->id); if (H2_STREAM_CLIENT_INITIATED(stream->id)) { @@ -323,7 +324,7 @@ static apr_status_t stream_release(h2_session *session, } } else { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03065) + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(03065) "h2_stream(%ld-%d): closing with err=%d %s", session->id, (int)stream->id, (int)error_code, h2_h2_err_description(error_code)); @@ -331,8 +332,7 @@ static apr_status_t stream_release(h2_session *session, } return h2_conn_io_writeb(&session->io, - h2_bucket_eos_create(session->c->bucket_alloc, - stream)); + h2_bucket_eos_create(c->bucket_alloc, stream), 1); } static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id, @@ -618,7 +618,7 @@ static int on_send_data_cb(nghttp2_session *ngh2, if (status == APR_SUCCESS && padlen) { b = apr_bucket_immortal_create(immortal_zeros, padlen, session->c->bucket_alloc); - status = h2_conn_io_writeb(&session->io, b); + status = h2_conn_io_writeb(&session->io, b, 1); } } -- 2.40.0