From eb09b029d9e37f3f289c6b63f7f44ced9c7bde06 Mon Sep 17 00:00:00 2001 From: Stefan Eissing Date: Thu, 17 Mar 2016 15:22:09 +0000 Subject: [PATCH] mod_http2: pushing slave conn setup outside of lock area git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1735444 13f79535-47bb-0310-9956-ffa450edef68 --- modules/http2/h2_conn.c | 10 +++- modules/http2/h2_conn.h | 1 + modules/http2/h2_io.h | 2 +- modules/http2/h2_mplx.c | 99 +++++++++++++++++++--------------- modules/http2/h2_mplx.h | 7 ++- modules/http2/h2_task.c | 54 +++++++++++-------- modules/http2/h2_task.h | 19 +++++-- modules/http2/h2_task_input.c | 5 +- modules/http2/h2_task_output.c | 8 +-- modules/http2/h2_worker.c | 26 ++++++--- modules/http2/h2_worker.h | 3 +- modules/http2/h2_workers.c | 35 +++++++----- 12 files changed, 169 insertions(+), 100 deletions(-) diff --git a/modules/http2/h2_conn.c b/modules/http2/h2_conn.c index 3b28c1f925..30e315daa0 100644 --- a/modules/http2/h2_conn.c +++ b/modules/http2/h2_conn.c @@ -284,8 +284,7 @@ conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *parent, c->clogging_input_filters = 1; c->log = NULL; c->log_id = NULL; - /* Simulate that we had already a request on this connection. */ - c->keepalives = 1; + c->keepalives = 0; /* We cannot install the master connection socket on the slaves, as * modules mess with timeouts/blocking of the socket, with * unwanted side effects to the master connection processing. @@ -326,6 +325,13 @@ void h2_slave_destroy(conn_rec *slave, apr_allocator_t **pallocator) apr_status_t h2_slave_run_pre_connection(conn_rec *slave, apr_socket_t *csd) { + /* We always start slaves with 1 */ + slave->keepalives = 1; return ap_run_pre_connection(slave, csd); } +apr_status_t h2_slave_needs_pre_run(conn_rec *slave) +{ + return slave->keepalives == 0; +} + diff --git a/modules/http2/h2_conn.h b/modules/http2/h2_conn.h index e52fc8d69e..09440b5039 100644 --- a/modules/http2/h2_conn.h +++ b/modules/http2/h2_conn.h @@ -70,6 +70,7 @@ conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *parent, apr_allocator_t *allocator); void h2_slave_destroy(conn_rec *slave, apr_allocator_t **pallocator); +apr_status_t h2_slave_needs_pre_run(conn_rec *slave); apr_status_t h2_slave_run_pre_connection(conn_rec *slave, apr_socket_t *csd); void h2_slave_run_connection(conn_rec *slave); diff --git a/modules/http2/h2_io.h b/modules/http2/h2_io.h index 90d0cde8f2..15801cae7f 100644 --- a/modules/http2/h2_io.h +++ b/modules/http2/h2_io.h @@ -40,7 +40,7 @@ struct h2_io { apr_pool_t *pool; /* stream pool */ apr_bucket_alloc_t *bucket_alloc; - const struct h2_request *request;/* request on this io */ + struct h2_request *request; /* request on this io */ struct h2_response *response; /* response to request */ int rst_error; /* h2 related stream abort error */ diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index 1ca5332d05..fc40a44c5c 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -299,15 +299,16 @@ static void io_destroy(h2_mplx *m, h2_io *io, int events) } if (io->task) { - conn_rec *slave = io->task->c; + conn_rec *slave = h2_task_detach(io->task); h2_task_destroy(io->task); io->task = NULL; - - if (m->spare_slaves->nelts < m->spare_slaves->nalloc) { - APR_ARRAY_PUSH(m->spare_slaves, conn_rec*) = slave; - } - else { - h2_slave_destroy(slave, NULL); + if (slave) { + if (m->spare_slaves->nelts < m->spare_slaves->nalloc) { + APR_ARRAY_PUSH(m->spare_slaves, conn_rec*) = slave; + } + else { + h2_slave_destroy(slave, NULL); + } } } @@ -1050,14 +1051,15 @@ apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, const h2_request *req, return status; } -static h2_task *pop_task(h2_mplx *m) +static h2_request *pop_request(h2_mplx *m) { - h2_task *task = NULL; - int sid; - while (!m->aborted && !task + h2_request *req = NULL; + int stream_id; + + while (!m->aborted && !req && (m->workers_busy < m->workers_limit) - && (sid = h2_iq_shift(m->q)) > 0) { - h2_io *io = h2_io_set_get(m->stream_ios, sid); + && (stream_id = h2_iq_shift(m->q)) > 0) { + h2_io *io = h2_io_set_get(m->stream_ios, stream_id); if (io && io->orphaned) { io_destroy(m, io, 0); if (m->join_wait) { @@ -1065,35 +1067,45 @@ static h2_task *pop_task(h2_mplx *m) } } else if (io) { - conn_rec *slave, **pslave; - - pslave = (conn_rec **)apr_array_pop(m->spare_slaves); - if (pslave) { - slave = *pslave; - } - else { - slave = h2_slave_create(m->c, m->pool, NULL); - h2_slave_run_pre_connection(slave, ap_get_conn_socket(slave)); - } - - - io->task = task = h2_task_create(m->id, io->request, slave, m); - apr_table_setn(slave->notes, H2_TASK_ID_NOTE, task->id); - - io->worker_started = 1; + req = io->request; io->started_at = apr_time_now(); - if (sid > m->max_stream_started) { - m->max_stream_started = sid; + if (stream_id > m->max_stream_started) { + m->max_stream_started = stream_id; } + io->worker_started = 1; ++m->workers_busy; } } - return task; + return req; } -h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more) +static conn_rec *get_slave(h2_mplx *m) { - h2_task *task = NULL; + conn_rec **pslave = (conn_rec **)apr_array_pop(m->spare_slaves); + if (pslave) { + return *pslave; + } + else { + return h2_slave_create(m->c, m->pool, NULL); + } +} + +conn_rec *h2_mplx_get_slave(h2_mplx *m) +{ + conn_rec *slave = NULL; + int acquired; + + AP_DEBUG_ASSERT(m); + if (enter_mutex(m, &acquired) == APR_SUCCESS) { + slave = get_slave(m); + leave_mutex(m, acquired); + } + return slave; +} + +h2_request *h2_mplx_pop_request(h2_mplx *m, int *has_more) +{ + h2_request *req = NULL; apr_status_t status; int acquired; @@ -1103,16 +1115,16 @@ h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more) *has_more = 0; } else { - task = pop_task(m); + req = pop_request(m); *has_more = !h2_iq_empty(m->q); } - if (has_more && !task) { + if (!req && has_more) { m->need_registration = 1; } leave_mutex(m, acquired); } - return task; + return req; } static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) @@ -1130,7 +1142,7 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) apr_thread_cond_broadcast(m->task_thawed); } else { - h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id); + h2_io *io = h2_io_set_get(m->stream_ios, task->request->id); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): task(%s) done", m->id, task->id); @@ -1139,7 +1151,7 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) /* TODO: this will keep a worker attached to this h2_mplx as * long as it has requests to handle. Might no be fair to * other mplx's. Perhaps leave after n requests? */ - h2_mplx_out_close(m, task->stream_id, NULL); + h2_mplx_out_close(m, task->request->id, NULL); if (ngn && io) { apr_off_t bytes = io->output_consumed + h2_io_out_length(io); @@ -1216,16 +1228,16 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) } } -void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask) +void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_request **preq) { int acquired; if (enter_mutex(m, &acquired) == APR_SUCCESS) { task_done(m, task, NULL); --m->workers_busy; - if (ptask) { + if (preq) { /* caller wants another task */ - *ptask = pop_task(m); + *preq = pop_request(m); } leave_mutex(m, acquired); } @@ -1419,11 +1431,12 @@ apr_status_t h2_mplx_req_engine_push(const char *ngn_type, task->r = r; if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id); + h2_io *io = h2_io_set_get(m->stream_ios, task->request->id); if (!io || io->orphaned) { status = APR_ECONNABORTED; } else { + io->task = task; status = h2_ngn_shed_push_task(m->ngn_shed, ngn_type, task, einit); } leave_mutex(m, acquired); diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index eef8bf1aba..4e5c9957d6 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -141,9 +141,10 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, struct apr_thread_cond_t *wait */ void h2_mplx_abort(h2_mplx *mplx); -struct h2_task *h2_mplx_pop_task(h2_mplx *mplx, int *has_more); +struct h2_request *h2_mplx_pop_request(h2_mplx *mplx, int *has_more); -void h2_mplx_task_done(h2_mplx *m, struct h2_task *task, struct h2_task **ptask); +void h2_mplx_task_done(h2_mplx *m, struct h2_task *task, + struct h2_request **prequest); /** * Get the highest stream identifier that has been passed on to processing. @@ -153,6 +154,8 @@ void h2_mplx_task_done(h2_mplx *m, struct h2_task *task, struct h2_task **ptask) */ int h2_mplx_get_max_stream_started(h2_mplx *m); +conn_rec *h2_mplx_get_slave(h2_mplx *m); + /******************************************************************************* * IO lifetime of streams. ******************************************************************************/ diff --git a/modules/http2/h2_task.c b/modules/http2/h2_task.c index ff0f4ebe31..b6b04f328b 100644 --- a/modules/http2/h2_task.c +++ b/modules/http2/h2_task.c @@ -123,47 +123,57 @@ apr_status_t h2_task_init(apr_pool_t *pool, server_rec *s) return APR_SUCCESS; } -h2_task *h2_task_create(long session_id, const h2_request *req, - conn_rec *c, h2_mplx *mplx) +h2_task *h2_task_create(apr_pool_t *pool, const h2_request *req, h2_mplx *mplx) { - apr_pool_t *pool; - h2_task *task; - - apr_pool_create(&pool, c->pool); - task = apr_pcalloc(pool, sizeof(h2_task)); + h2_task *task = apr_pcalloc(pool, sizeof(h2_task)); if (task == NULL) { - ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, c, + ap_log_perror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, pool, APLOGNO(02941) "h2_task(%ld-%d): create stream task", - session_id, req->id); + mplx->id, req->id); h2_mplx_out_close(mplx, req->id, NULL); return NULL; } - - task->id = apr_psprintf(pool, "%ld-%d", session_id, req->id); - task->stream_id = req->id; - task->c = c; - task->mplx = mplx; + task->id = apr_psprintf(pool, "%ld-%d", mplx->id, req->id); task->pool = pool; + task->mplx = mplx; task->request = req; task->input_eos = !req->body; task->ser_headers = req->serialize; task->blocking = 1; - - h2_ctx_create_for(c, task); - /* Add our own, network level in- and output filters. */ - ap_add_input_filter("H2_TO_H1", task, NULL, c); - ap_add_output_filter("H1_TO_H2", task, NULL, c); - return task; } +conn_rec *h2_task_detach(h2_task *task) +{ + conn_rec *c = task->c; + if (c) { + task->c = NULL; + ap_remove_input_filter_byhandle(c->output_filters, "H2_TO_H1"); + ap_remove_output_filter_byhandle(c->output_filters, "H1_TO_H2"); + apr_table_setn(c->notes, H2_TASK_ID_NOTE, NULL); + } + return c; +} + void h2_task_destroy(h2_task *task) { - ap_remove_input_filter_byhandle(task->c->output_filters, "H2_TO_H1"); - ap_remove_output_filter_byhandle(task->c->output_filters, "H1_TO_H2"); + h2_task_detach(task); if (task->pool) { apr_pool_destroy(task->pool); + /* memory gone */ + } +} + +void h2_task_attach(h2_task *task, conn_rec *c) +{ + if (task->c) { + h2_task_detach(task); } + task->c = c; + h2_ctx_create_for(c, task); + apr_table_setn(c->notes, H2_TASK_ID_NOTE, task->id); + ap_add_input_filter("H2_TO_H1", task, NULL, c); + ap_add_output_filter("H1_TO_H2", task, NULL, c); } void h2_task_set_io_blocking(h2_task *task, int blocking) diff --git a/modules/http2/h2_task.h b/modules/http2/h2_task.h index 15a1d3cb2c..22220052b2 100644 --- a/modules/http2/h2_task.h +++ b/modules/http2/h2_task.h @@ -50,11 +50,10 @@ typedef struct h2_task h2_task; struct h2_task { const char *id; - int stream_id; - conn_rec *c; - struct h2_mplx *mplx; apr_pool_t *pool; + struct h2_mplx *mplx; const struct h2_request *request; + conn_rec *c; unsigned int filters_set : 1; unsigned int input_eos : 1; @@ -72,11 +71,21 @@ struct h2_task { request_rec *r; /* request being processed in this task */ }; -h2_task *h2_task_create(long session_id, const struct h2_request *req, - conn_rec *c, struct h2_mplx *mplx); +h2_task *h2_task_create(apr_pool_t *pool, const struct h2_request *req, + struct h2_mplx *mplx); void h2_task_destroy(h2_task *task); +/** + * Attach the task to the given connection, install filter etc. + */ +void h2_task_attach(h2_task *task, conn_rec *c); +/** + * Remove any attachments to the connection again, if still attached. + * Return the connection or NULL if none was attached. + */ +conn_rec *h2_task_detach(h2_task *task); + apr_status_t h2_task_do(h2_task *task, struct apr_thread_cond_t *cond); void h2_task_register_hooks(void); diff --git a/modules/http2/h2_task_input.c b/modules/http2/h2_task_input.c index 3993b6b40c..069160f29d 100644 --- a/modules/http2/h2_task_input.c +++ b/modules/http2/h2_task_input.c @@ -90,6 +90,9 @@ apr_status_t h2_task_input_read(h2_task_input *input, apr_status_t status = APR_SUCCESS; apr_off_t bblen = 0; + AP_DEBUG_ASSERT(input); + AP_DEBUG_ASSERT(input->task); + AP_DEBUG_ASSERT(f->c); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, "h2_task_input(%s): read, block=%d, mode=%d, readbytes=%ld", input->task->id, block, mode, (long)readbytes); @@ -133,7 +136,7 @@ apr_status_t h2_task_input_read(h2_task_input *input, * setting. */ status = h2_mplx_in_read(input->task->mplx, block, - input->task->stream_id, input->bb, + input->task->request->id, input->bb, f->r? f->r->trailers_in : NULL, input->task->io); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, diff --git a/modules/http2/h2_task_output.c b/modules/http2/h2_task_output.c index a15a80cf71..4a704ca7d1 100644 --- a/modules/http2/h2_task_output.c +++ b/modules/http2/h2_task_output.c @@ -39,7 +39,7 @@ h2_task_output *h2_task_output_create(h2_task *task, conn_rec *c) h2_task_output *output = apr_pcalloc(task->pool, sizeof(h2_task_output)); if (output) { output->task = task; - output->from_h1 = h2_from_h1_create(task->stream_id, task->pool); + output->from_h1 = h2_from_h1_create(task->request->id, task->pool); } return output; } @@ -96,7 +96,7 @@ static apr_status_t open_response(h2_task_output *output, ap_filter_t *f, output->task->id, output->task->request->method, output->task->request->authority, output->task->request->path); - return h2_mplx_out_open(output->task->mplx, output->task->stream_id, + return h2_mplx_out_open(output->task->mplx, output->task->request->id, response, f, bb, output->task->io); } @@ -111,7 +111,7 @@ static apr_status_t write_brigade_raw(h2_task_output *output, "h2_task(%s): write response body (%ld bytes)", output->task->id, (long)written); - status = h2_mplx_out_write(output->task->mplx, output->task->stream_id, + status = h2_mplx_out_write(output->task->mplx, output->task->request->id, f, output->task->blocking, bb, get_trailers(output), output->task->io); if (status == APR_INCOMPLETE) { @@ -145,7 +145,7 @@ apr_status_t h2_task_output_write(h2_task_output *output, } if (output->task->frozen) { - h2_util_bb_log(output->task->c, output->task->stream_id, APLOG_TRACE2, + h2_util_bb_log(output->task->c, output->task->request->id, APLOG_TRACE2, "frozen task output write, ignored", bb); return APR_SUCCESS; } diff --git a/modules/http2/h2_worker.c b/modules/http2/h2_worker.c index ca6ce3a2f2..77c1076c96 100644 --- a/modules/http2/h2_worker.c +++ b/modules/http2/h2_worker.c @@ -34,14 +34,27 @@ static void* APR_THREAD_FUNC execute(apr_thread_t *thread, void *wctx) { h2_worker *worker = (h2_worker *)wctx; + h2_mplx *mplx; + h2_request *req; int sticky; while (!worker->aborted) { - h2_task *task; /* Get a h2_task from the main workers queue. */ - worker->get_next(worker, worker->ctx, &task, &sticky); - while (task) { + worker->get_next(worker, worker->ctx, &mplx, &req, &sticky); + while (req) { + h2_task *task; + apr_pool_t *pool; + conn_rec *slave; + + slave = h2_mplx_get_slave(mplx); + if (h2_slave_needs_pre_run(slave)) { + h2_slave_run_pre_connection(slave, ap_get_conn_socket(slave)); + } + + apr_pool_create(&pool, slave->pool); + task = h2_task_create(pool, req, mplx); + h2_task_attach(task, slave); h2_task_do(task, worker->io); /* if someone was waiting on this task, time to wake up */ @@ -49,13 +62,14 @@ static void* APR_THREAD_FUNC execute(apr_thread_t *thread, void *wctx) /* report the task done and maybe get another one from the same * mplx (= master connection), if we can be sticky. */ + req = NULL; if (sticky && !worker->aborted) { - h2_mplx_task_done(task->mplx, task, &task); + h2_mplx_task_done(mplx, task, &req); } else { - h2_mplx_task_done(task->mplx, task, NULL); - task = NULL; + h2_mplx_task_done(mplx, task, NULL); } + task = NULL; } } diff --git a/modules/http2/h2_worker.h b/modules/http2/h2_worker.h index 7a8c254f5d..7165cfbcbd 100644 --- a/modules/http2/h2_worker.h +++ b/modules/http2/h2_worker.h @@ -31,7 +31,8 @@ typedef struct h2_worker h2_worker; * gets aborted (idle timeout, for example). */ typedef apr_status_t h2_worker_mplx_next_fn(h2_worker *worker, void *ctx, - struct h2_task **ptask, + struct h2_mplx **pmplx, + struct h2_request **prequest, int *psticky); /* Invoked just before the worker thread exits. */ diff --git a/modules/http2/h2_workers.c b/modules/http2/h2_workers.c index 2c1dc8dab4..83fd8152eb 100644 --- a/modules/http2/h2_workers.c +++ b/modules/http2/h2_workers.c @@ -61,9 +61,9 @@ static void cleanup_zombies(h2_workers *workers, int lock) } } -static h2_task *next_task(h2_workers *workers) +static h2_request *next_request(h2_workers *workers, h2_mplx **pmplx) { - h2_task *task = NULL; + h2_request *req = NULL; h2_mplx *last = NULL; int has_more; @@ -76,7 +76,7 @@ static h2_task *next_task(h2_workers *workers) * new mplx to arrive. Depending on how many workers do exist, * we do a timed wait or block indefinitely. */ - while (!task && !H2_MPLX_LIST_EMPTY(&workers->mplxs)) { + while (!req && !H2_MPLX_LIST_EMPTY(&workers->mplxs)) { h2_mplx *m = H2_MPLX_LIST_FIRST(&workers->mplxs); if (last == m) { @@ -85,7 +85,7 @@ static h2_task *next_task(h2_workers *workers) H2_MPLX_REMOVE(m); --workers->mplx_count; - task = h2_mplx_pop_task(m, &has_more); + req = h2_mplx_pop_request(m, &has_more); if (has_more) { H2_MPLX_LIST_INSERT_TAIL(&workers->mplxs, m); ++workers->mplx_count; @@ -93,8 +93,13 @@ static h2_task *next_task(h2_workers *workers) last = m; } } + + if (req) { + *pmplx = m; + return req; + } } - return task; + return req; } /** @@ -102,14 +107,17 @@ static h2_task *next_task(h2_workers *workers) * or the max_wait timer expires and more than min workers exist. */ static apr_status_t get_mplx_next(h2_worker *worker, void *ctx, - h2_task **ptask, int *psticky) + h2_mplx **pmplx, h2_request **preq, + int *psticky) { apr_status_t status; apr_time_t wait_until = 0, now; h2_workers *workers = ctx; - h2_task *task = NULL; + h2_request *req = NULL; + h2_mplx *mplx = NULL; - *ptask = NULL; + *preq = NULL; + *pmplx = NULL; *psticky = 0; status = apr_thread_mutex_lock(workers->lock); @@ -119,7 +127,7 @@ static apr_status_t get_mplx_next(h2_worker *worker, void *ctx, "h2_worker(%d): looking for work", h2_worker_get_id(worker)); while (!h2_worker_is_aborted(worker) && !workers->aborted - && !(task = next_task(workers))) { + && !(req = next_request(workers, &mplx))) { /* Need to wait for a new tasks to arrive. If we are above * minimum workers, we do a timed wait. When timeout occurs @@ -161,10 +169,10 @@ static apr_status_t get_mplx_next(h2_worker *worker, void *ctx, } } - /* Here, we either have gotten task or decided to shut down + /* Here, we either have gotten a request or decided to shut down * the calling worker. */ - if (task) { + if (req) { /* Ok, we got something to give back to the worker for execution. * If we have more idle workers than h2_mplx in our queue, then * we let the worker be sticky, e.g. making it poll the task's @@ -174,7 +182,8 @@ static apr_status_t get_mplx_next(h2_worker *worker, void *ctx, * has no new tasks to process, so the worker will get back here * eventually. */ - *ptask = task; + *preq = req; + *pmplx = mplx; *psticky = (workers->max_workers >= workers->mplx_count); if (workers->mplx_count && workers->idle_workers > 1) { @@ -186,7 +195,7 @@ static apr_status_t get_mplx_next(h2_worker *worker, void *ctx, apr_thread_mutex_unlock(workers->lock); } - return *ptask? APR_SUCCESS : APR_EOF; + return *preq? APR_SUCCESS : APR_EOF; } static void worker_done(h2_worker *worker, void *ctx) -- 2.40.0