From: Stefan Eissing Date: Mon, 14 Mar 2016 16:43:52 +0000 (+0000) Subject: sharing bucket_alloc for all streams inside mplx, explicit lifetime handling of EOR... X-Git-Tag: 2.5.0-alpha~1901 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=93b8969292dd2537ac29c6b59537bd254c91e336;p=apache sharing bucket_alloc for all streams inside mplx, explicit lifetime handling of EOR bucket and tasks git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1734957 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/modules/http2/h2_conn.c b/modules/http2/h2_conn.c index a0cd54e6ac..60e209492c 100644 --- a/modules/http2/h2_conn.c +++ b/modules/http2/h2_conn.c @@ -261,7 +261,7 @@ conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *parent, } apr_pool_create_ex(&pool, parent, NULL, allocator); apr_pool_tag(pool, "h2_slave_conn"); - apr_allocator_owner_set(allocator, parent); + apr_allocator_owner_set(allocator, pool); c = (conn_rec *) apr_palloc(pool, sizeof(conn_rec)); if (c == NULL) { @@ -309,15 +309,18 @@ conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *parent, void h2_slave_destroy(conn_rec *slave, apr_allocator_t **pallocator) { + apr_pool_t *parent; apr_allocator_t *allocator = apr_pool_allocator_get(slave->pool); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, slave, "h2_slave_conn(%ld): destroy (task=%s)", slave->id, apr_table_get(slave->notes, H2_TASK_ID_NOTE)); - apr_pool_destroy(slave->pool); - if (pallocator) { + /* Attache the allocator to the parent pool and return it for + * reuse, otherwise the own is still the slave pool and it will + * get destroyed with it. */ + parent = apr_pool_parent_get(slave->pool); + if (pallocator && parent) { + apr_allocator_owner_set(allocator, parent); *pallocator = allocator; } - else { - apr_allocator_destroy(allocator); - } + apr_pool_destroy(slave->pool); } diff --git a/modules/http2/h2_io.c b/modules/http2/h2_io.c index d66558e617..39ebad3a79 100644 --- a/modules/http2/h2_io.c +++ b/modules/http2/h2_io.c @@ -23,6 +23,7 @@ #include #include #include +#include #include "h2_private.h" #include "h2_h2.h" @@ -33,13 +34,15 @@ #include "h2_task.h" #include "h2_util.h" -h2_io *h2_io_create(int id, apr_pool_t *pool, const h2_request *request) +h2_io *h2_io_create(int id, apr_pool_t *pool, + apr_bucket_alloc_t *bucket_alloc, + const h2_request *request) { h2_io *io = apr_pcalloc(pool, sizeof(*io)); if (io) { io->id = id; io->pool = pool; - io->bucket_alloc = apr_bucket_alloc_create(pool); + io->bucket_alloc = bucket_alloc; io->request = h2_request_clone(pool, request); } return io; @@ -413,28 +416,36 @@ apr_status_t h2_io_out_write(h2_io *io, apr_bucket_brigade *bb, apr_size_t *pfile_buckets_allowed) { apr_status_t status; + apr_bucket *b; int start_allowed; if (io->rst_error) { return APR_ECONNABORTED; } + if (!io->eor) { + /* Filter the EOR bucket and set it aside. We prefer to tear down + * the request when the whole h2 stream is done */ + for (b = APR_BRIGADE_FIRST(bb); + b != APR_BRIGADE_SENTINEL(bb); + b = APR_BUCKET_NEXT(b)) + { + if (AP_BUCKET_IS_EOR(b)) { + APR_BUCKET_REMOVE(b); + io->eor = b; + break; + } + } + } + if (io->eos_out) { - apr_off_t len; + apr_off_t len = 0; /* We have already delivered an EOS bucket to a reader, no * sense in storing anything more here. */ - status = apr_brigade_length(bb, 1, &len); - if (status == APR_SUCCESS) { - if (len > 0) { - /* someone tries to write real data after EOS, that - * does not look right. */ - status = APR_EOF; - } - /* cleanup, as if we had moved the data */ - apr_brigade_cleanup(bb); - } - return status; + apr_brigade_length(bb, 0, &len); + apr_brigade_cleanup(bb); + return (len > 0)? APR_EOF : APR_SUCCESS; } process_trailers(io, trailers); diff --git a/modules/http2/h2_io.h b/modules/http2/h2_io.h index a602c0952e..90d0cde8f2 100644 --- a/modules/http2/h2_io.h +++ b/modules/http2/h2_io.h @@ -44,6 +44,9 @@ struct h2_io { struct h2_response *response; /* response to request */ int rst_error; /* h2 related stream abort error */ + apr_bucket *eor; /* the EOR bucket, set aside */ + struct h2_task *task; /* the task once started */ + apr_bucket_brigade *bbin; /* input data for stream */ apr_bucket_brigade *bbout; /* output data from stream */ apr_bucket_brigade *bbtmp; /* temporary data for chunking */ @@ -77,7 +80,9 @@ struct h2_io { /** * Creates a new h2_io for the given stream id. */ -h2_io *h2_io_create(int id, apr_pool_t *pool, const struct h2_request *request); +h2_io *h2_io_create(int id, apr_pool_t *pool, + apr_bucket_alloc_t *bucket_alloc, + const struct h2_request *request); /** * Set the response of this stream. diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index b60d328959..f77a404e7a 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -201,6 +201,7 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, return NULL; } + m->bucket_alloc = apr_bucket_alloc_create(m->pool); m->max_streams = h2_config_geti(conf, H2_CONF_MAX_STREAMS); m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM); m->q = h2_iq_create(m->pool, m->max_streams); @@ -266,7 +267,7 @@ static int io_process_events(h2_mplx *m, h2_io *io) static void io_destroy(h2_mplx *m, h2_io *io, int events) { - apr_pool_t *pool = io->pool; + apr_pool_t *pool; /* cleanup any buffered input */ h2_io_in_shutdown(io); @@ -275,7 +276,6 @@ static void io_destroy(h2_mplx *m, h2_io *io, int events) io_process_events(m, io); } - io->pool = NULL; /* The pool is cleared/destroyed which also closes all * allocated file handles. Give this count back to our * file handle pool. */ @@ -286,7 +286,19 @@ static void io_destroy(h2_mplx *m, h2_io *io, int events) if (m->redo_ios) { h2_io_set_remove(m->redo_ios, io); } - + + if (io->task) { + if (m->spare_allocator) { + apr_allocator_destroy(m->spare_allocator); + m->spare_allocator = NULL; + } + + h2_slave_destroy(io->task->c, &m->spare_allocator); + io->task = NULL; + } + + pool = io->pool; + io->pool = NULL; if (pool) { apr_pool_clear(pool); if (m->spare_pool) { @@ -856,12 +868,17 @@ apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id, apr_table_t *trailers) "h2_mplx(%ld-%d): close, no response, no rst", m->id, io->id); } - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c, - "h2_mplx(%ld-%d): close with trailers=%s", - m->id, io->id, trailers? "yes" : "no"); + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c, + "h2_mplx(%ld-%d): close with eor=%s, trailers=%s", + m->id, io->id, io->eor? "yes" : "no", + trailers? "yes" : "no"); status = h2_io_out_close(io, trailers); H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_close"); + if (io->eor) { + apr_bucket_delete(io->eor); + io->eor = NULL; + } have_out_data_for(m, stream_id); } else { @@ -987,7 +1004,7 @@ static h2_io *open_io(h2_mplx *m, int stream_id, const h2_request *request) m->spare_pool = NULL; } - io = h2_io_create(stream_id, io_pool, request); + io = h2_io_create(stream_id, io_pool, m->bucket_alloc, request); h2_io_set_add(m->stream_ios, io); return io; @@ -1044,9 +1061,9 @@ static h2_task *pop_task(h2_mplx *m) } } else if (io) { - conn_rec *slave = h2_slave_create(m->c, m->pool, m->spare_allocator); + conn_rec *slave = h2_slave_create(m->c, io->pool, m->spare_allocator); m->spare_allocator = NULL; - task = h2_task_create(m->id, io->request, slave, m); + io->task = task = h2_task_create(m->id, io->request, slave, m); apr_table_setn(slave->notes, H2_TASK_ID_NOTE, task->id); io->worker_started = 1; io->started_at = apr_time_now(); @@ -1119,14 +1136,6 @@ static void task_done(h2_mplx *m, h2_task *task) h2_ngn_shed_done_ngn(m->ngn_shed, task->engine); } - if (m->spare_allocator) { - apr_allocator_destroy(m->spare_allocator); - m->spare_allocator = NULL; - } - - h2_slave_destroy(task->c, &m->spare_allocator); - task = NULL; - if (io) { apr_time_t now = apr_time_now(); if (!io->orphaned && m->redo_ios @@ -1168,9 +1177,14 @@ static void task_done(h2_mplx *m, h2_task *task) } } else { - /* hang around until the stream deregisteres */ + /* hang around until the stream deregisters */ } } + else { + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, + "h2_mplx(%ld): task %s without corresp. h2_io", + m->id, task->id); + } } } } @@ -1410,7 +1424,6 @@ void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn) /* cannot report that as done until engine returns */ } else { - h2_task_output_close(task->output); task_done(m, task); } leave_mutex(m, acquired); diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index e33d5e5a2a..f50239c3a1 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -67,6 +67,7 @@ struct h2_mplx { volatile int refs; conn_rec *c; apr_pool_t *pool; + apr_bucket_alloc_t *bucket_alloc; unsigned int aborted : 1; unsigned int need_registration : 1; diff --git a/modules/http2/h2_ngn_shed.c b/modules/http2/h2_ngn_shed.c index 5b97cf914d..79ca72e846 100644 --- a/modules/http2/h2_ngn_shed.c +++ b/modules/http2/h2_ngn_shed.c @@ -261,8 +261,7 @@ apr_status_t h2_ngn_shed_pull_req(h2_ngn_shed *shed, } static apr_status_t ngn_done_task(h2_ngn_shed *shed, h2_req_engine *ngn, - h2_task *task, int waslive, int aborted, - int close) + h2_task *task, int waslive, int aborted) { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c, "h2_ngn_shed(%ld): task %s %s by %s", @@ -271,16 +270,13 @@ static apr_status_t ngn_done_task(h2_ngn_shed *shed, h2_req_engine *ngn, if (waslive) ngn->no_live--; ngn->no_assigned--; - if (close) { - h2_task_output_close(task->output); - } return APR_SUCCESS; } apr_status_t h2_ngn_shed_done_task(h2_ngn_shed *shed, struct h2_req_engine *ngn, h2_task *task) { - return ngn_done_task(shed, ngn, task, 1, 0, 0); + return ngn_done_task(shed, ngn, task, 1, 0); } void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn) @@ -308,7 +304,7 @@ void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn) "h2_ngn_shed(%ld): engine %s has queued task %s, " "frozen=%d, aborting", shed->c->id, ngn->id, task->id, task->frozen); - ngn_done_task(shed, ngn, task, 0, 1, 1); + ngn_done_task(shed, ngn, task, 0, 1); } } if (!shed->aborted && (ngn->no_assigned > 1 || ngn->no_live > 1)) { diff --git a/modules/http2/h2_task_output.c b/modules/http2/h2_task_output.c index 3ef884ffd2..87bbe38c3c 100644 --- a/modules/http2/h2_task_output.c +++ b/modules/http2/h2_task_output.c @@ -42,9 +42,6 @@ h2_task_output *h2_task_output_create(h2_task *task, conn_rec *c) output->task = task; output->state = H2_TASK_OUT_INIT; output->from_h1 = h2_from_h1_create(task->stream_id, c->pool); - if (!output->from_h1) { - return NULL; - } } return output; } @@ -66,47 +63,43 @@ static apr_table_t *get_trailers(h2_task_output *output) return NULL; } -static apr_status_t open_if_needed(h2_task_output *output, ap_filter_t *f, - apr_bucket_brigade *bb, const char *caller) +static apr_status_t open_response(h2_task_output *output, ap_filter_t *f, + apr_bucket_brigade *bb, const char *caller) { - if (output->state == H2_TASK_OUT_INIT) { - h2_response *response; - output->state = H2_TASK_OUT_STARTED; - response = h2_from_h1_get_response(output->from_h1); - if (!response) { - if (f) { - /* This happens currently when ap_die(status, r) is invoked - * by a read request filter. */ - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, output->c, APLOGNO(03204) - "h2_task_output(%s): write without response by %s " - "for %s %s %s", - output->task->id, caller, - output->task->request->method, - output->task->request->authority, - output->task->request->path); - output->c->aborted = 1; - } - if (output->task->io) { - apr_thread_cond_broadcast(output->task->io); - } - return APR_ECONNABORTED; + h2_response *response; + response = h2_from_h1_get_response(output->from_h1); + if (!response) { + if (f) { + /* This happens currently when ap_die(status, r) is invoked + * by a read request filter. */ + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, output->c, APLOGNO(03204) + "h2_task_output(%s): write without response by %s " + "for %s %s %s", + output->task->id, caller, + output->task->request->method, + output->task->request->authority, + output->task->request->path); + output->c->aborted = 1; } - - if (h2_task_logio_add_bytes_out) { - /* counter headers as if we'd do a HTTP/1.1 serialization */ - output->written = h2_util_table_bytes(response->headers, 3)+1; - h2_task_logio_add_bytes_out(output->c, output->written); + if (output->task->io) { + apr_thread_cond_broadcast(output->task->io); } - get_trailers(output); - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, output->c, APLOGNO(03348) - "h2_task(%s): open response to %s %s %s", - output->task->id, output->task->request->method, - output->task->request->authority, - output->task->request->path); - return h2_mplx_out_open(output->task->mplx, output->task->stream_id, - response, f, bb, output->task->io); + return APR_ECONNABORTED; + } + + if (h2_task_logio_add_bytes_out) { + /* count headers as if we'd do a HTTP/1.1 serialization */ + output->written = h2_util_table_bytes(response->headers, 3)+1; + h2_task_logio_add_bytes_out(output->c, output->written); } - return APR_SUCCESS; + get_trailers(output); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, output->c, APLOGNO(03348) + "h2_task(%s): open response to %s %s %s", + output->task->id, output->task->request->method, + output->task->request->authority, + output->task->request->path); + return h2_mplx_out_open(output->task->mplx, output->task->stream_id, + response, f, bb, output->task->io); } static apr_status_t write_brigade_raw(h2_task_output *output, @@ -145,7 +138,7 @@ static apr_status_t write_brigade_raw(h2_task_output *output, apr_status_t h2_task_output_write(h2_task_output *output, ap_filter_t* f, apr_bucket_brigade* bb) { - apr_status_t status; + apr_status_t status = APR_SUCCESS; if (APR_BRIGADE_EMPTY(bb)) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, output->c, @@ -159,7 +152,10 @@ apr_status_t h2_task_output_write(h2_task_output *output, return APR_SUCCESS; } - status = open_if_needed(output, f, bb, "write"); + if (output->state == H2_TASK_OUT_INIT) { + status = open_response(output, f, bb, "write"); + output->state = H2_TASK_OUT_STARTED; + } /* Attempt to write saved brigade first */ if (status == APR_SUCCESS && output->bb @@ -188,13 +184,3 @@ apr_status_t h2_task_output_write(h2_task_output *output, return status; } -void h2_task_output_close(h2_task_output *output) -{ - open_if_needed(output, NULL, NULL, "close"); - if (output->state != H2_TASK_OUT_DONE) { - h2_mplx_out_close(output->task->mplx, output->task->stream_id, - get_trailers(output)); - output->state = H2_TASK_OUT_DONE; - } -} - diff --git a/modules/http2/h2_task_output.h b/modules/http2/h2_task_output.h index 7861039e0d..7670582072 100644 --- a/modules/http2/h2_task_output.h +++ b/modules/http2/h2_task_output.h @@ -52,8 +52,6 @@ apr_status_t h2_task_output_write(h2_task_output *output, ap_filter_t* filter, apr_bucket_brigade* brigade); -void h2_task_output_close(h2_task_output *output); - apr_status_t h2_task_output_freeze(h2_task_output *output); apr_status_t h2_task_output_thaw(h2_task_output *output); diff --git a/modules/http2/h2_util.c b/modules/http2/h2_util.c index 904349658c..8fe2b26f4a 100644 --- a/modules/http2/h2_util.c +++ b/modules/http2/h2_util.c @@ -537,6 +537,7 @@ apr_status_t h2_util_move(apr_bucket_brigade *to, apr_bucket_brigade *from, else { const char *data; apr_size_t len; + status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ); if (status == APR_SUCCESS && len > 0) { status = apr_brigade_write(to, NULL, NULL, data, len);