From: Stefan Eissing Date: Fri, 26 Feb 2016 13:26:25 +0000 (+0000) Subject: mod_proxy_http2: start of some sort of flow control, mod_http2: keeping spare allocat... X-Git-Tag: 2.5.0-alpha~2006 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=dce4ce7f57433dca950aebccf3f5b6bd77777074;p=apache mod_proxy_http2: start of some sort of flow control, mod_http2: keeping spare allocator_t for slave connections around git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1732477 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/modules/http2/h2_conn.c b/modules/http2/h2_conn.c index f6f814e634..e7bb1dd1e1 100644 --- a/modules/http2/h2_conn.c +++ b/modules/http2/h2_conn.c @@ -44,6 +44,7 @@ static struct h2_workers *workers; static h2_mpm_type_t mpm_type = H2_MPM_UNKNOWN; static module *mpm_module; static int async_mpm; +static apr_socket_t *dummy_socket; static void check_modules(int force) { @@ -154,7 +155,12 @@ apr_status_t h2_conn_child_init(apr_pool_t *pool, server_rec *s) NULL, AP_FTYPE_CONNECTION); status = h2_mplx_child_init(pool, s); - + + if (status == APR_SUCCESS) { + status = apr_socket_create(&dummy_socket, APR_INET, SOCK_STREAM, + APR_PROTO_TCP, pool); + } + return status; } @@ -234,22 +240,30 @@ apr_status_t h2_conn_pre_close(struct h2_ctx *ctx, conn_rec *c) } -conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *p, - apr_thread_t *thread, apr_socket_t *socket) +conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *parent, + apr_allocator_t *allocator) { + apr_pool_t *pool; conn_rec *c; void *cfg; AP_DEBUG_ASSERT(master); ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, master, - "h2_conn(%ld): created from master", master->id); + "h2_conn(%ld): create slave", master->id); - /* This is like the slave connection creation from 2.5-DEV. A - * very efficient way - not sure how compatible this is, since - * the core hooks are no longer run. - * But maybe it's is better this way, not sure yet. + /* We create a pool with its own allocator to be used for + * processing a request. This is the only way to have the processing + * independant of its parent pool in the sense that it can work in + * another thread. */ - c = (conn_rec *) apr_palloc(p, sizeof(conn_rec)); + if (!allocator) { + apr_allocator_create(&allocator); + } + apr_pool_create_ex(&pool, parent, NULL, allocator); + apr_pool_tag(pool, "h2_slave_conn"); + apr_allocator_owner_set(allocator, parent); + + c = (conn_rec *) apr_palloc(pool, sizeof(conn_rec)); if (c == NULL) { ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, master, APLOGNO(02913) "h2_task: creating conn"); @@ -260,13 +274,12 @@ conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *p, /* Replace these */ c->master = master; - c->pool = p; - c->current_thread = thread; - c->conn_config = ap_create_conn_config(p); - c->notes = apr_table_make(p, 5); + c->pool = pool; + c->conn_config = ap_create_conn_config(pool); + c->notes = apr_table_make(pool, 5); c->input_filters = NULL; c->output_filters = NULL; - c->bucket_alloc = apr_bucket_alloc_create(p); + c->bucket_alloc = apr_bucket_alloc_create(pool); c->data_in_input_filters = 0; c->data_in_output_filters = 0; c->clogging_input_filters = 1; @@ -274,11 +287,18 @@ conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *p, c->log_id = NULL; /* Simulate that we had already a request on this connection. */ c->keepalives = 1; - + /* We cannot install the master connection socket on the slaves, as + * modules mess with timeouts/blocking of the socket, with + * unwanted side effects to the master connection processing. + * Fortunately, since we never use the slave socket, we can just install + * a single, process-wide dummy and everyone is happy. + */ + ap_set_module_config(c->conn_config, &core_module, dummy_socket); /* TODO: these should be unique to this thread */ c->sbh = master->sbh; - - ap_set_module_config(c->conn_config, &core_module, socket); + /* TODO: not all mpm modules have learned about slave connections yet. + * copy their config from master to slave. + */ if (h2_conn_mpm_module()) { cfg = ap_get_module_config(master->conn_config, h2_conn_mpm_module()); ap_set_module_config(c->conn_config, h2_conn_mpm_module(), cfg); @@ -287,3 +307,14 @@ conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *p, return c; } +void h2_slave_destroy(conn_rec *slave, apr_allocator_t **pallocator) +{ + apr_allocator_t *allocator = apr_pool_allocator_get(slave->pool); + apr_pool_destroy(slave->pool); + if (pallocator) { + *pallocator = allocator; + } + else { + apr_allocator_destroy(allocator); + } +} diff --git a/modules/http2/h2_conn.h b/modules/http2/h2_conn.h index 0ffcf3b08d..023eecaaca 100644 --- a/modules/http2/h2_conn.h +++ b/modules/http2/h2_conn.h @@ -66,7 +66,8 @@ typedef enum { h2_mpm_type_t h2_conn_mpm_type(void); -conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *p, - apr_thread_t *thread, apr_socket_t *socket); +conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *parent, + apr_allocator_t *allocator); +void h2_slave_destroy(conn_rec *slave, apr_allocator_t **pallocator); #endif /* defined(__mod_h2__h2_conn__) */ diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index f18b3437aa..2d4c3ae369 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -203,13 +203,6 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, return NULL; } - status = apr_socket_create(&m->dummy_socket, APR_INET, SOCK_STREAM, - APR_PROTO_TCP, m->pool); - if (status != APR_SUCCESS) { - h2_mplx_destroy(m); - return NULL; - } - m->q = h2_iq_create(m->pool, h2_config_geti(conf, H2_CONF_MAX_STREAMS)); m->stream_ios = h2_io_set_create(m->pool); m->ready_ios = h2_io_set_create(m->pool); @@ -1061,26 +1054,9 @@ static h2_task *pop_task(h2_mplx *m) && (sid = h2_iq_shift(m->q)) > 0) { h2_io *io = h2_io_set_get(m->stream_ios, sid); if (io) { - conn_rec *c; - apr_pool_t *task_pool; - apr_allocator_t *task_allocator = NULL; - - /* We create a pool with its own allocator to be used for - * processing a request. This is the only way to have the processing - * independant of the worker pool as the h2_mplx pool as well as - * not sensitive to which thread it is in. - * In that sense, memory allocation and lifetime is similar to a master - * connection. - * The main goal in this is that slave connections and requests will - * - one day - be suspended and resumed in different threads. - */ - apr_allocator_create(&task_allocator); - apr_pool_create_ex(&task_pool, io->pool, NULL, task_allocator); - apr_pool_tag(task_pool, "h2_task"); - apr_allocator_owner_set(task_allocator, task_pool); - - c = h2_slave_create(m->c, task_pool, m->c->current_thread, m->dummy_socket); - task = h2_task_create(m->id, io->request, c, m); + conn_rec *slave = h2_slave_create(m->c, m->pool, m->spare_allocator); + m->spare_allocator = NULL; + task = h2_task_create(m->id, io->request, slave, m); io->processing_started = 1; if (sid > m->max_stream_started) { @@ -1133,10 +1109,15 @@ static void task_done(h2_mplx *m, h2_task *task) /* TODO: this will keep a worker attached to this h2_mplx as * long as it has requests to handle. Might no be fair to * other mplx's. Perhaps leave after n requests? */ + h2_mplx_out_close(m, task->stream_id, NULL); + if (m->spare_allocator) { + apr_allocator_destroy(m->spare_allocator); + m->spare_allocator = NULL; + } + h2_slave_destroy(task->c, &m->spare_allocator); task = NULL; if (io) { io->processing_done = 1; - h2_mplx_out_close(m, io->id, NULL); if (io->orphaned) { io_destroy(m, io, 0); if (m->join_wait) { @@ -1288,6 +1269,8 @@ apr_status_t h2_mplx_engine_push(const char *engine_type, m->id, m->next_eng_id++); engine->pub.pool = task->c->pool; engine->pub.type = apr_pstrdup(task->c->pool, engine_type); + engine->pub.window_bits = 30; + engine->pub.req_window_bits = h2_log2(m->stream_max_mem); engine->c = r->connection; APR_RING_INIT(&engine->entries, h2_req_entry, link); engine->m = m; diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index 12bb2d39ac..8dff6e0853 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -82,12 +82,13 @@ struct h2_mplx { struct apr_thread_cond_t *added_output; struct apr_thread_cond_t *task_done; struct apr_thread_cond_t *join_wait; - apr_socket_t *dummy_socket; apr_size_t stream_max_mem; apr_interval_time_t stream_timeout; apr_pool_t *spare_pool; /* spare pool, ready for next io */ + apr_allocator_t *spare_allocator; + struct h2_workers *workers; apr_size_t tx_handles_reserved; apr_size_t tx_chunk_size; diff --git a/modules/http2/h2_proxy_session.c b/modules/http2/h2_proxy_session.c index 9a1f808927..598de3e393 100644 --- a/modules/http2/h2_proxy_session.c +++ b/modules/http2/h2_proxy_session.c @@ -485,6 +485,8 @@ static ssize_t stream_data_read(nghttp2_session *ngh2, int32_t stream_id, h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn, proxy_server_conf *conf, + unsigned char window_bits_connection, + unsigned char window_bits_stream, h2_proxy_request_done *done) { if (!p_conn->data) { @@ -503,8 +505,8 @@ h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn, session->conf = conf; session->pool = p_conn->scpool; session->state = H2_PROXYS_ST_INIT; - session->window_bits_default = 30; - session->window_bits_connection = 30; + session->window_bits_stream = window_bits_stream; + session->window_bits_connection = window_bits_connection; session->streams = h2_ihash_create(pool, offsetof(h2_proxy_stream, id)); session->suspended = h2_iq_create(pool, 5); session->done = done; @@ -543,7 +545,7 @@ static apr_status_t session_start(h2_proxy_session *session) settings[0].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH; settings[0].value = 0; settings[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE; - settings[1].value = (1 << session->window_bits_default) - 1; + settings[1].value = (1 << session->window_bits_stream) - 1; rv = nghttp2_submit_settings(session->ngh2, NGHTTP2_FLAG_NONE, settings, H2_ALEN(settings)); diff --git a/modules/http2/h2_proxy_session.h b/modules/http2/h2_proxy_session.h index d4f68b3a19..3fad2b6003 100644 --- a/modules/http2/h2_proxy_session.h +++ b/modules/http2/h2_proxy_session.h @@ -64,8 +64,8 @@ struct h2_proxy_session { h2_proxy_request_done *done; void *user_data; - int window_bits_default; - int window_bits_connection; + unsigned char window_bits_stream; + unsigned char window_bits_connection; h2_proxys_state state; apr_interval_time_t wait_timeout; @@ -81,6 +81,8 @@ struct h2_proxy_session { h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn, proxy_server_conf *conf, + unsigned char window_bits_connection, + unsigned char window_bits_stream, h2_proxy_request_done *done); apr_status_t h2_proxy_session_submit(h2_proxy_session *s, const char *url, diff --git a/modules/http2/h2_push.c b/modules/http2/h2_push.c index a8b7c8591c..82615afdbb 100644 --- a/modules/http2/h2_push.c +++ b/modules/http2/h2_push.c @@ -328,9 +328,7 @@ static int add_push(link_ctx *ctx) * TLS (if any) parameters. */ path = apr_uri_unparse(ctx->pool, &uri, APR_URI_UNP_OMITSITEPART); - push = apr_pcalloc(ctx->pool, sizeof(*push)); - switch (ctx->req->push_policy) { case H2_PUSH_HEAD: method = "HEAD"; @@ -701,36 +699,6 @@ apr_array_header_t *h2_push_collect_update(h2_stream *stream, return h2_push_diary_update(stream->session, pushes); } -/* h2_log2(n) iff n is a power of 2 */ -static unsigned char h2_log2(apr_uint32_t n) -{ - int lz = 0; - if (!n) { - return 0; - } - if (!(n & 0xffff0000u)) { - lz += 16; - n = (n << 16); - } - if (!(n & 0xff000000u)) { - lz += 8; - n = (n << 8); - } - if (!(n & 0xf0000000u)) { - lz += 4; - n = (n << 4); - } - if (!(n & 0xc0000000u)) { - lz += 2; - n = (n << 2); - } - if (!(n & 0x80000000u)) { - lz += 1; - } - - return 31 - lz; -} - static apr_int32_t h2_log2inv(unsigned char log2) { return log2? (1 << log2) : 1; diff --git a/modules/http2/h2_task.c b/modules/http2/h2_task.c index 7299b299cf..06543d670c 100644 --- a/modules/http2/h2_task.c +++ b/modules/http2/h2_task.c @@ -95,8 +95,7 @@ static apr_status_t h2_response_freeze_filter(ap_filter_t* f, if (task->frozen) { ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, f->r, "h2_response_freeze_filter, saving"); - APR_BRIGADE_CONCAT(task->frozen_out, bb); - return APR_SUCCESS; + return ap_save_brigade(f, &task->frozen_out, &bb, task->c->pool); } if (APR_BRIGADE_EMPTY(bb)) { @@ -204,8 +203,7 @@ h2_task *h2_task_create(long session_id, const h2_request *req, return task; } -apr_status_t h2_task_do(h2_task *task, apr_thread_cond_t *cond, - apr_socket_t *socket) +apr_status_t h2_task_do(h2_task *task, apr_thread_cond_t *cond) { apr_status_t status; @@ -214,7 +212,7 @@ apr_status_t h2_task_do(h2_task *task, apr_thread_cond_t *cond, task->input = h2_task_input_create(task, task->c); task->output = h2_task_output_create(task, task->c); - ap_process_connection(task->c, socket); + ap_process_connection(task->c, ap_get_conn_socket(task->c)); if (task->frozen) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c, diff --git a/modules/http2/h2_task.h b/modules/http2/h2_task.h index f2cc6dfd8a..24bde946f3 100644 --- a/modules/http2/h2_task.h +++ b/modules/http2/h2_task.h @@ -69,7 +69,7 @@ struct h2_task { h2_task *h2_task_create(long session_id, const struct h2_request *req, conn_rec *c, struct h2_mplx *mplx); -apr_status_t h2_task_do(h2_task *task, struct apr_thread_cond_t *cond, apr_socket_t *socket); +apr_status_t h2_task_do(h2_task *task, struct apr_thread_cond_t *cond); void h2_task_register_hooks(void); /* diff --git a/modules/http2/h2_task_output.c b/modules/http2/h2_task_output.c index b717fc3d6e..0cf3d355e0 100644 --- a/modules/http2/h2_task_output.c +++ b/modules/http2/h2_task_output.c @@ -145,8 +145,8 @@ apr_status_t h2_task_output_write(h2_task_output *output, if (output->task->frozen) { h2_util_bb_log(output->c, output->task->stream_id, APLOG_TRACE2, "frozen task output write", bb); - APR_BRIGADE_CONCAT(output->task->frozen_out, bb); - return APR_SUCCESS; + return ap_save_brigade(f, &output->task->frozen_out, &bb, + output->c->pool); } status = open_if_needed(output, f, bb, "write"); diff --git a/modules/http2/h2_util.c b/modules/http2/h2_util.c index 52c858e609..54e6a2ab0b 100644 --- a/modules/http2/h2_util.c +++ b/modules/http2/h2_util.c @@ -28,6 +28,36 @@ #include "h2_request.h" #include "h2_util.h" +/* h2_log2(n) iff n is a power of 2 */ +unsigned char h2_log2(apr_uint32_t n) +{ + int lz = 0; + if (!n) { + return 0; + } + if (!(n & 0xffff0000u)) { + lz += 16; + n = (n << 16); + } + if (!(n & 0xff000000u)) { + lz += 8; + n = (n << 8); + } + if (!(n & 0xf0000000u)) { + lz += 4; + n = (n << 4); + } + if (!(n & 0xc0000000u)) { + lz += 2; + n = (n << 2); + } + if (!(n & 0x80000000u)) { + lz += 1; + } + + return 31 - lz; +} + size_t h2_util_hex_dump(char *buffer, size_t maxlen, const char *data, size_t datalen) { diff --git a/modules/http2/h2_util.h b/modules/http2/h2_util.h index cd2d8a12e3..97417f7261 100644 --- a/modules/http2/h2_util.h +++ b/modules/http2/h2_util.h @@ -66,6 +66,9 @@ void h2_ihash_clear(h2_ihash_t *ih); /******************************************************************************* * common helpers ******************************************************************************/ +/* h2_log2(n) iff n is a power of 2 */ +unsigned char h2_log2(apr_uint32_t n); + /** * Count the bytes that all key/value pairs in a table have * in length (exlucding terminating 0s), plus additional extra per pair. diff --git a/modules/http2/h2_worker.c b/modules/http2/h2_worker.c index 75d0ead916..23466e864f 100644 --- a/modules/http2/h2_worker.c +++ b/modules/http2/h2_worker.c @@ -43,7 +43,7 @@ static void* APR_THREAD_FUNC execute(apr_thread_t *thread, void *wctx) /* Get a h2_task from the main workers queue. */ status = worker->get_next(worker, worker->ctx, &task, &sticky); while (task) { - h2_task_do(task, worker->io, task->mplx->dummy_socket); + h2_task_do(task, worker->io); /* if someone was waiting on this task, time to wake up */ apr_thread_cond_signal(worker->io); diff --git a/modules/http2/mod_http2.h b/modules/http2/mod_http2.h index edacd0f134..ae13529310 100644 --- a/modules/http2/mod_http2.h +++ b/modules/http2/mod_http2.h @@ -56,7 +56,12 @@ struct h2_req_engine { const char *id; /* identifier */ apr_pool_t *pool; /* pool for engine specific allocations */ const char *type; /* name of the engine type */ - apr_size_t capacity; /* number of max assigned requests */ + unsigned char window_bits;/* preferred size of overall response data + * mod_http2 is willing to buffer as log2 */ + unsigned char req_window_bits;/* preferred size of response body data + * mod_http2 is willing to buffer per request, + * as log2 */ + apr_size_t capacity; /* maximum concurrent requests */ void *user_data; /* user specific data */ }; diff --git a/modules/http2/mod_proxy_http2.c b/modules/http2/mod_proxy_http2.c index a2d80d32e8..4a3c4cc95e 100644 --- a/modules/http2/mod_proxy_http2.c +++ b/modules/http2/mod_proxy_http2.c @@ -300,8 +300,10 @@ setup_backend: */ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, ctx->owner, "eng(%s): setup session", ctx->engine->id); - session = h2_proxy_session_setup(ctx->engine->id, ctx->p_conn, - ctx->conf, request_done); + session = h2_proxy_session_setup(ctx->engine->id, ctx->p_conn, ctx->conf, + ctx->engine->window_bits, + ctx->engine->req_window_bits, + request_done); if (!session) { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->p_conn->connection, "session unavailable"); @@ -470,6 +472,8 @@ static int proxy_http2_handler(request_rec *r, engine->type = engine_type; engine->pool = p; engine->capacity = 1; + engine->window_bits = 30; + engine->req_window_bits = 16; ctx->engine = engine; ctx->standalone = 1; ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,