From: Stefan Eissing Date: Fri, 31 Mar 2017 19:41:01 +0000 (+0000) Subject: On the 2.4.x branch: X-Git-Tag: 2.4.26~193 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=766474e76885cb8f81df1f9c4517635ee9938faf;p=apache On the 2.4.x branch: Merged /httpd/httpd/trunk:r1786715,1787051,1787141,1787604,1788672,1788981,1789221,1789224,1789276,1789279,1789395,1789520,1789535,1789692 git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/branches/2.4.x@1789739 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/CHANGES b/CHANGES index a9336a34ce..c2e7798b27 100644 --- a/CHANGES +++ b/CHANGES @@ -2,6 +2,25 @@ Changes with Apache 2.4.26 + *) mod_http2: better performance, eliminated need for nested locks and + thread privates. Moving request setups from the main connection to the + worker threads. Increase number of spare connections kept. + [Stefan Eissing] + + *) mod_http2: input buffering and dynamic flow windows for increased + throughput. Requires nghttp2 >= v1.5.0 features. Announced at startup + in mod_http2 INFO log as feature 'DWINS'. [Stefan Eissing] + + *) mod_http2: h2 workers with improved scalability for better scheduling + performance. There are H2MaxWorkers threads created at start and the + number is kept constant for now. [Stefan Eissing] + + *) mod_http2: obsoleted option H2SessionExtraFiles, will be ignored and + just log a warning. [Stefan Eissing] + + *) mod_http2: fixed PR60869 by making h2 workers exit explicitly waking up + all threads to exit in a defined way. [Stefan Eissing] + *) core: Add %{REMOTE_PORT} to the expression parser. PR59938 [Hank Ibell ] @@ -44,17 +63,6 @@ Changes with Apache 2.4.26 modules already shut down and slave connections still operating. [Stefan Eissing] - *) mod_http2: stream timeouts now change to vhost values once the request - is parsed and processing starts. Initial values are taken from base - server or SNI host as before. [Stefan Eissing] - - *) mod_proxy_http2: fixed retry behaviour when frontend connection uses - http/1.1. [Stefan Eissing] - - *) mod_http2: separate mutex instances for each bucket beam, resulting in - less lock contention. input beams only created when necessary. - [Stefan Eissing] - *) mod_lua: Support for Lua 5.3 *) mod_proxy_http2: support for ProxyPreserverHost directive. [Stefan Eissing] diff --git a/CMakeLists.txt b/CMakeLists.txt index c5edf60241..6e8a6a1e21 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -393,7 +393,7 @@ SET(mod_http2_extra_sources modules/http2/h2_session.c modules/http2/h2_stream.c modules/http2/h2_switch.c modules/http2/h2_ngn_shed.c modules/http2/h2_task.c modules/http2/h2_util.c - modules/http2/h2_worker.c modules/http2/h2_workers.c + modules/http2/h2_workers.c ) SET(mod_ldap_extra_defines LDAP_DECLARE_EXPORT) SET(mod_ldap_extra_libs wldap32) diff --git a/docs/manual/mod/mod_http2.xml b/docs/manual/mod/mod_http2.xml index 63bcf9f74b..1f4b7e5de3 100644 --- a/docs/manual/mod/mod_http2.xml +++ b/docs/manual/mod/mod_http2.xml @@ -138,17 +138,6 @@ Protocols h2 h2c http/1.1 the connection will buffer this amount of data and then suspend the H2Worker.

-

- If you serve a lot of static files, H2SessionExtraFiles - is of interest. This tells the server how many file handles per - HTTP/2 connection it is allowed to waste for better performance. Because - when a request produces a static file as the response, the file handle - gets passed around and is buffered and not the file contents. That allows - to serve many large files without wasting memory or copying data - unnecessarily. However file handles are a limited resource for a process, - and if too many are used this way, requests may fail under load as - the amount of open handles has been exceeded. -

Multiple Hosts and Misdirected Requests @@ -680,47 +669,6 @@ H2MaxWorkerIdleSeconds 20 - - H2SessionExtraFiles - Number of Extra File Handles - H2SessionExtraFiles n - - server config - virtual host - - -

- This directive sets maximum number of extra file handles - a HTTP/2 session is allowed to use. A file handle is counted as - extra when it is transferred from a h2 worker thread to - the main HTTP/2 connection handling. This commonly happens when - serving static files. -

- Depending on the processing model configured on the server, the - number of connections times number of active streams may exceed - the number of file handles for the process. On the other hand, - converting every file into memory bytes early results in too - many buffer writes. This option helps to mitigate that. -

- The number of file handles used by a server process is then in - the order of: -

-
-                (h2_connections * extra_files) + (h2_max_worker)
-            
- Example - -H2SessionExtraFiles 10 - - -

- If nothing is configured, the module tries to make a conservative - guess how many files are safe to use. This depends largely on the - MPM chosen. -

-
-
- H2SerializeHeaders Serialize Request/Response Processing Switch diff --git a/modules/http2/NWGNUmod_http2 b/modules/http2/NWGNUmod_http2 index 74a7a97874..f6d4a38561 100644 --- a/modules/http2/NWGNUmod_http2 +++ b/modules/http2/NWGNUmod_http2 @@ -204,7 +204,6 @@ FILES_nlm_objs = \ $(OBJDIR)/h2_switch.o \ $(OBJDIR)/h2_task.o \ $(OBJDIR)/h2_util.o \ - $(OBJDIR)/h2_worker.o \ $(OBJDIR)/h2_workers.o \ $(OBJDIR)/mod_http2.o \ $(EOLIST) diff --git a/modules/http2/config2.m4 b/modules/http2/config2.m4 index a33b0df3f1..fac130b219 100644 --- a/modules/http2/config2.m4 +++ b/modules/http2/config2.m4 @@ -39,7 +39,6 @@ h2_stream.lo dnl h2_switch.lo dnl h2_task.lo dnl h2_util.lo dnl -h2_worker.lo dnl h2_workers.lo dnl " @@ -156,6 +155,9 @@ dnl # nghttp2 >= 1.5.0: changing stream priorities dnl # nghttp2 >= 1.14.0: invalid header callback AC_CHECK_FUNCS([nghttp2_session_callbacks_set_on_invalid_header_callback], [APR_ADDTO(MOD_CPPFLAGS, ["-DH2_NG2_INVALID_HEADER_CB"])], []) +dnl # nghttp2 >= 1.15.0: get/set stream window sizes + AC_CHECK_FUNCS([nghttp2_session_get_stream_local_window_size], + [APR_ADDTO(MOD_CPPFLAGS, ["-DH2_NG2_LOCAL_WIN_SIZE"])], []) else AC_MSG_WARN([nghttp2 version is too old]) fi diff --git a/modules/http2/h2.h b/modules/http2/h2.h index 21a673244f..df809fd411 100644 --- a/modules/http2/h2.h +++ b/modules/http2/h2.h @@ -115,7 +115,8 @@ typedef enum { H2_SEV_CLOSED_L, H2_SEV_CLOSED_R, H2_SEV_CANCELLED, - H2_SEV_EOS_SENT + H2_SEV_EOS_SENT, + H2_SEV_IN_DATA_PENDING, } h2_stream_event_t; diff --git a/modules/http2/h2_bucket_beam.c b/modules/http2/h2_bucket_beam.c index 53cc36f46f..17ad3d95f1 100644 --- a/modules/http2/h2_bucket_beam.c +++ b/modules/http2/h2_bucket_beam.c @@ -246,15 +246,17 @@ static int report_consumption(h2_bucket_beam *beam, h2_beam_lock *pbl) apr_off_t len = beam->received_bytes - beam->cons_bytes_reported; h2_beam_io_callback *cb = beam->cons_io_cb; - if (cb) { - void *ctx = beam->cons_ctx; - - if (pbl) leave_yellow(beam, pbl); - cb(ctx, beam, len); - if (pbl) enter_yellow(beam, pbl); - rv = 1; + if (len > 0) { + if (cb) { + void *ctx = beam->cons_ctx; + + if (pbl) leave_yellow(beam, pbl); + cb(ctx, beam, len); + if (pbl) enter_yellow(beam, pbl); + rv = 1; + } + beam->cons_bytes_reported += len; } - beam->cons_bytes_reported += len; return rv; } @@ -484,16 +486,14 @@ static apr_status_t beam_send_cleanup(void *data) static void beam_set_send_pool(h2_bucket_beam *beam, apr_pool_t *pool) { - if (beam->send_pool == pool || - (beam->send_pool && pool - && apr_pool_is_ancestor(beam->send_pool, pool))) { - /* when sender is same or sub-pool of existing, stick - * to the the pool we already have. */ - return; + if (beam->send_pool != pool) { + if (beam->send_pool && beam->send_pool != beam->pool) { + pool_kill(beam, beam->send_pool, beam_send_cleanup); + beam_send_cleanup(beam); + } + beam->send_pool = pool; + pool_register(beam, beam->send_pool, beam_send_cleanup); } - pool_kill(beam, beam->send_pool, beam_send_cleanup); - beam->send_pool = pool; - pool_register(beam, beam->send_pool, beam_send_cleanup); } static apr_status_t beam_cleanup(void *data) @@ -832,11 +832,10 @@ static apr_status_t append_bucket(h2_bucket_beam *beam, apr_bucket_file *bf = b->data; apr_file_t *fd = bf->fd; int can_beam = (bf->refcount.refcount == 1); - if (can_beam && beam->last_beamed != fd && beam->can_beam_fn) { + if (can_beam && beam->can_beam_fn) { can_beam = beam->can_beam_fn(beam->can_beam_ctx, beam, fd); } if (can_beam) { - beam->last_beamed = fd; status = apr_bucket_setaside(b, beam->send_pool); } /* else: enter ENOTIMPL case below */ @@ -893,10 +892,8 @@ apr_status_t h2_beam_send(h2_bucket_beam *beam, /* Called from the sender thread to add buckets to the beam */ if (enter_yellow(beam, &bl) == APR_SUCCESS) { + ap_assert(beam->send_pool); r_purge_sent(beam); - if (sender_bb && !beam->send_pool) { - beam_set_send_pool(beam, sender_bb->p); - } if (beam->aborted) { move_to_hold(beam, sender_bb); @@ -1254,9 +1251,9 @@ void h2_beam_log(h2_bucket_beam *beam, conn_rec *c, int level, const char *msg) if (beam && APLOG_C_IS_LEVEL(c,level)) { ap_log_cerror(APLOG_MARK, level, 0, c, "beam(%ld-%d,%s,closed=%d,aborted=%d,empty=%d,buf=%ld): %s", - c->id, beam->id, beam->tag, beam->closed, beam->aborted, - h2_beam_empty(beam), (long)h2_beam_get_buffered(beam), - msg); + (c->master? c->master->id : c->id), beam->id, beam->tag, + beam->closed, beam->aborted, h2_beam_empty(beam), + (long)h2_beam_get_buffered(beam), msg); } } diff --git a/modules/http2/h2_bucket_beam.h b/modules/http2/h2_bucket_beam.h index 18bc32629f..64117ff159 100644 --- a/modules/http2/h2_bucket_beam.h +++ b/modules/http2/h2_bucket_beam.h @@ -183,7 +183,6 @@ struct h2_bucket_beam { apr_size_t buckets_sent; /* # of beam buckets sent */ apr_size_t files_beamed; /* how many file handles have been set aside */ - apr_file_t *last_beamed; /* last file beamed */ unsigned int aborted : 1; unsigned int closed : 1; @@ -376,6 +375,16 @@ int h2_beam_report_consumption(h2_bucket_beam *beam); void h2_beam_on_produced(h2_bucket_beam *beam, h2_beam_io_callback *io_cb, void *ctx); +/** + * Register a callback that may prevent a file from being beam as + * file handle, forcing the file content to be copied. Then no callback + * is set (NULL), file handles are transferred directly. + * @param beam the beam to set the callback on + * @param io_cb the callback or NULL, called on receiver with bytes produced + * @param ctx the context to use in callback invocation + * + * Call from the receiver side, callbacks invoked on either side. + */ void h2_beam_on_file_beam(h2_bucket_beam *beam, h2_beam_can_beam_callback *cb, void *ctx); diff --git a/modules/http2/h2_config.c b/modules/http2/h2_config.c index 0adc552852..66e61558a3 100644 --- a/modules/http2/h2_config.c +++ b/modules/http2/h2_config.c @@ -53,7 +53,6 @@ static h2_config defconf = { -1, /* alt-svc max age */ 0, /* serialize headers */ -1, /* h2 direct mode */ - -1, /* # session extra files */ 1, /* modern TLS only */ -1, /* HTTP/1 Upgrade support */ 1024*1024, /* TLS warmup size */ @@ -88,7 +87,6 @@ static void *h2_config_create(apr_pool_t *pool, conf->alt_svc_max_age = DEF_VAL; conf->serialize_headers = DEF_VAL; conf->h2_direct = DEF_VAL; - conf->session_extra_files = DEF_VAL; conf->modern_tls_only = DEF_VAL; conf->h2_upgrade = DEF_VAL; conf->tls_warmup_size = DEF_VAL; @@ -130,7 +128,6 @@ static void *h2_config_merge(apr_pool_t *pool, void *basev, void *addv) n->alt_svc_max_age = H2_CONFIG_GET(add, base, alt_svc_max_age); n->serialize_headers = H2_CONFIG_GET(add, base, serialize_headers); n->h2_direct = H2_CONFIG_GET(add, base, h2_direct); - n->session_extra_files = H2_CONFIG_GET(add, base, session_extra_files); n->modern_tls_only = H2_CONFIG_GET(add, base, modern_tls_only); n->h2_upgrade = H2_CONFIG_GET(add, base, h2_upgrade); n->tls_warmup_size = H2_CONFIG_GET(add, base, tls_warmup_size); @@ -194,8 +191,6 @@ apr_int64_t h2_config_geti64(const h2_config *conf, h2_config_var_t var) return H2_CONFIG_GET(conf, &defconf, h2_upgrade); case H2_CONF_DIRECT: return H2_CONFIG_GET(conf, &defconf, h2_direct); - case H2_CONF_SESSION_FILES: - return H2_CONFIG_GET(conf, &defconf, session_extra_files); case H2_CONF_TLS_WARMUP_SIZE: return H2_CONFIG_GET(conf, &defconf, tls_warmup_size); case H2_CONF_TLS_COOLDOWN_SECS: @@ -336,13 +331,11 @@ static const char *h2_conf_set_alt_svc_max_age(cmd_parms *parms, static const char *h2_conf_set_session_extra_files(cmd_parms *parms, void *arg, const char *value) { - h2_config *cfg = (h2_config *)h2_config_sget(parms->server); - apr_int64_t max = (int)apr_atoi64(value); - if (max < 0) { - return "value must be a non-negative number"; - } - cfg->session_extra_files = (int)max; + /* deprecated, ignore */ (void)arg; + (void)value; + ap_log_perror(APLOG_MARK, APLOG_WARNING, 0, parms->pool, /* NO LOGNO */ + "H2SessionExtraFiles is obsolete and will be ignored"); return NULL; } @@ -638,7 +631,7 @@ const command_rec h2_cmds[] = { AP_INIT_TAKE1("H2Direct", h2_conf_set_direct, NULL, RSRC_CONF, "on to enable direct HTTP/2 mode"), AP_INIT_TAKE1("H2SessionExtraFiles", h2_conf_set_session_extra_files, NULL, - RSRC_CONF, "number of extra file a session might keep open"), + RSRC_CONF, "number of extra file a session might keep open (obsolete)"), AP_INIT_TAKE1("H2TLSWarmUpSize", h2_conf_set_tls_warmup_size, NULL, RSRC_CONF, "number of bytes on TLS connection before doing max writes"), AP_INIT_TAKE1("H2TLSCoolDownSecs", h2_conf_set_tls_cooldown_secs, NULL, diff --git a/modules/http2/h2_config.h b/modules/http2/h2_config.h index 1f2fe309d0..9b38b8660c 100644 --- a/modules/http2/h2_config.h +++ b/modules/http2/h2_config.h @@ -33,7 +33,6 @@ typedef enum { H2_CONF_ALT_SVC_MAX_AGE, H2_CONF_SER_HEADERS, H2_CONF_DIRECT, - H2_CONF_SESSION_FILES, H2_CONF_MODERN_TLS_ONLY, H2_CONF_UPGRADE, H2_CONF_TLS_WARMUP_SIZE, @@ -67,7 +66,6 @@ typedef struct h2_config { int serialize_headers; /* Use serialized HTTP/1.1 headers for processing, better compatibility */ int h2_direct; /* if mod_h2 is active directly */ - int session_extra_files; /* # of extra files a session may keep open */ int modern_tls_only; /* Accept only modern TLS in HTTP/2 connections */ int h2_upgrade; /* Allow HTTP/1 upgrade to h2/h2c */ apr_int64_t tls_warmup_size; /* Amount of TLS data to send before going full write size */ diff --git a/modules/http2/h2_conn.c b/modules/http2/h2_conn.c index 220387db2b..fcf6bad4d4 100644 --- a/modules/http2/h2_conn.c +++ b/modules/http2/h2_conn.c @@ -38,7 +38,6 @@ #include "h2_stream.h" #include "h2_h2.h" #include "h2_task.h" -#include "h2_worker.h" #include "h2_workers.h" #include "h2_conn.h" #include "h2_version.h" @@ -103,7 +102,7 @@ apr_status_t h2_conn_child_init(apr_pool_t *pool, server_rec *s) { const h2_config *config = h2_config_sget(s); apr_status_t status = APR_SUCCESS; - int minw, maxw, max_tx_handles, n; + int minw, maxw; int max_threads_per_child = 0; int idle_secs = 0; @@ -126,34 +125,18 @@ apr_status_t h2_conn_child_init(apr_pool_t *pool, server_rec *s) minw = max_threads_per_child; } if (maxw <= 0) { - maxw = minw; + /* As a default, this seems to work quite well under mpm_event. + * For people enabling http2 under mpm_prefork, start 4 threads unless + * configured otherwise. People get unhappy if their http2 requests are + * blocking each other. */ + maxw = H2MAX(3 * minw / 2, 4); } - /* How many file handles is it safe to use for transfer - * to the master connection to be streamed out? - * Is there a portable APR rlimit on NOFILES? Have not - * found it. And if, how many of those would we set aside? - * This leads all into a process wide handle allocation strategy - * which ultimately would limit the number of accepted connections - * with the assumption of implicitly reserving n handles for every - * connection and requiring modules with excessive needs to allocate - * from a central pool. - */ - n = h2_config_geti(config, H2_CONF_SESSION_FILES); - if (n < 0) { - max_tx_handles = maxw * 2; - } - else { - max_tx_handles = maxw * n; - } - - ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, s, - "h2_workers: min=%d max=%d, mthrpchild=%d, tx_files=%d", - minw, maxw, max_threads_per_child, max_tx_handles); - workers = h2_workers_create(s, pool, minw, maxw, max_tx_handles); - idle_secs = h2_config_geti(config, H2_CONF_MAX_WORKER_IDLE_SECS); - h2_workers_set_max_idle_secs(workers, idle_secs); + ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, s, + "h2_workers: min=%d max=%d, mthrpchild=%d, idle_secs=%d", + minw, maxw, max_threads_per_child, idle_secs); + workers = h2_workers_create(s, pool, minw, maxw, idle_secs); ap_register_input_filter("H2_IN", h2_filter_core_input, NULL, AP_FTYPE_CONNECTION); @@ -317,12 +300,15 @@ conn_rec *h2_slave_create(conn_rec *master, int slave_id, apr_pool_t *parent) c->bucket_alloc = apr_bucket_alloc_create(pool); c->data_in_input_filters = 0; c->data_in_output_filters = 0; + /* prevent mpm_event from making wrong assumptions about this connection, + * like e.g. using its socket for an async read check. */ c->clogging_input_filters = 1; c->log = NULL; c->log_id = apr_psprintf(pool, "%ld-%d", master->id, slave_id); /* Simulate that we had already a request on this connection. */ c->keepalives = 1; + c->aborted = 0; /* We cannot install the master connection socket on the slaves, as * modules mess with timeouts/blocking of the socket, with * unwanted side effects to the master connection processing. @@ -350,6 +336,7 @@ void h2_slave_destroy(conn_rec *slave) ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, slave, "h2_stream(%s): destroy slave", apr_table_get(slave->notes, H2_TASK_ID_NOTE)); + slave->sbh = NULL; apr_pool_destroy(slave->pool); } diff --git a/modules/http2/h2_filter.c b/modules/http2/h2_filter.c index 617d350d59..3a8a3b1ad1 100644 --- a/modules/http2/h2_filter.c +++ b/modules/http2/h2_filter.c @@ -44,55 +44,80 @@ #define UNSET -1 #define H2MIN(x,y) ((x) < (y) ? (x) : (y)) -static apr_status_t consume_brigade(h2_filter_cin *cin, - apr_bucket_brigade *bb, - apr_read_type_e block) +static apr_status_t recv_RAW_DATA(conn_rec *c, h2_filter_cin *cin, + apr_bucket *b, apr_read_type_e block) { + h2_session *session = cin->session; apr_status_t status = APR_SUCCESS; - apr_size_t readlen = 0; + apr_size_t len; + const char *data; + ssize_t n; - while (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) { + status = apr_bucket_read(b, &data, &len, block); + + while (status == APR_SUCCESS && len > 0) { + n = nghttp2_session_mem_recv(session->ngh2, (const uint8_t *)data, len); - apr_bucket* bucket = APR_BRIGADE_FIRST(bb); - if (APR_BUCKET_IS_METADATA(bucket)) { - /* we do nothing regarding any meta here */ + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, + H2_SSSN_MSG(session, "fed %ld bytes to nghttp2, %ld read"), + (long)len, (long)n); + if (n < 0) { + if (nghttp2_is_fatal((int)n)) { + h2_session_event(session, H2_SESSION_EV_PROTO_ERROR, + (int)n, nghttp2_strerror((int)n)); + status = APR_EGENERAL; + } } else { - const char *bucket_data = NULL; - apr_size_t bucket_length = 0; - status = apr_bucket_read(bucket, &bucket_data, - &bucket_length, block); - - if (status == APR_SUCCESS && bucket_length > 0) { - apr_size_t consumed = 0; - - status = cin->cb(cin->cb_ctx, bucket_data, bucket_length, &consumed); - if (status == APR_SUCCESS && bucket_length > consumed) { - /* We have data left in the bucket. Split it. */ - status = apr_bucket_split(bucket, consumed); - } - readlen += consumed; - cin->start_read = apr_time_now(); + session->io.bytes_read += n; + if (len <= n) { + break; } + len -= n; + data += n; } - apr_bucket_delete(bucket); } - if (readlen == 0 && status == APR_SUCCESS && block == APR_NONBLOCK_READ) { + return status; +} + +static apr_status_t recv_RAW_brigade(conn_rec *c, h2_filter_cin *cin, + apr_bucket_brigade *bb, + apr_read_type_e block) +{ + apr_status_t status = APR_SUCCESS; + apr_bucket* b; + int consumed = 0; + + h2_util_bb_log(c, c->id, APLOG_TRACE2, "RAW_in", bb); + while (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) { + b = APR_BRIGADE_FIRST(bb); + + if (APR_BUCKET_IS_METADATA(b)) { + /* nop */ + } + else { + status = recv_RAW_DATA(c, cin, b, block); + } + consumed = 1; + apr_bucket_delete(b); + } + + if (!consumed && status == APR_SUCCESS && block == APR_NONBLOCK_READ) { return APR_EAGAIN; } return status; } -h2_filter_cin *h2_filter_cin_create(apr_pool_t *p, h2_filter_cin_cb *cb, void *ctx) +h2_filter_cin *h2_filter_cin_create(h2_session *session) { h2_filter_cin *cin; - cin = apr_pcalloc(p, sizeof(*cin)); - cin->pool = p; - cin->cb = cb; - cin->cb_ctx = ctx; - cin->start_read = UNSET; + cin = apr_pcalloc(session->pool, sizeof(*cin)); + if (!cin) { + return NULL; + } + cin->session = session; return cin; } @@ -110,11 +135,14 @@ apr_status_t h2_filter_core_input(ap_filter_t* f, h2_filter_cin *cin = f->ctx; apr_status_t status = APR_SUCCESS; apr_interval_time_t saved_timeout = UNSET; + const int trace1 = APLOGctrace1(f->c); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, - "h2_session(%ld): read, %s, mode=%d, readbytes=%ld", - (long)f->c->id, (block == APR_BLOCK_READ)? - "BLOCK_READ" : "NONBLOCK_READ", mode, (long)readbytes); + if (trace1) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, + "h2_session(%ld): read, %s, mode=%d, readbytes=%ld", + (long)f->c->id, (block == APR_BLOCK_READ)? + "BLOCK_READ" : "NONBLOCK_READ", mode, (long)readbytes); + } if (mode == AP_MODE_INIT || mode == AP_MODE_SPECULATIVE) { return ap_get_brigade(f->next, brigade, mode, block, readbytes); @@ -125,20 +153,16 @@ apr_status_t h2_filter_core_input(ap_filter_t* f, } if (!cin->bb) { - cin->bb = apr_brigade_create(cin->pool, f->c->bucket_alloc); + cin->bb = apr_brigade_create(cin->session->pool, f->c->bucket_alloc); } if (!cin->socket) { cin->socket = ap_get_conn_socket(f->c); } - cin->start_read = apr_time_now(); if (APR_BRIGADE_EMPTY(cin->bb)) { /* We only do a blocking read when we have no streams to process. So, * in httpd scoreboard lingo, we are in a KEEPALIVE connection state. - * When reading non-blocking, we do have streams to process and update - * child with NULL request. That way, any current request information - * in the scoreboard is preserved. */ if (block == APR_BLOCK_READ) { if (cin->timeout > 0) { @@ -155,13 +179,15 @@ apr_status_t h2_filter_core_input(ap_filter_t* f, switch (status) { case APR_SUCCESS: - status = consume_brigade(cin, cin->bb, block); + status = recv_RAW_brigade(f->c, cin, cin->bb, block); break; case APR_EOF: case APR_EAGAIN: case APR_TIMEUP: - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, - "h2_session(%ld): read", f->c->id); + if (trace1) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, + "h2_session(%ld): read", f->c->id); + } break; default: ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, f->c, APLOGNO(03046) diff --git a/modules/http2/h2_filter.h b/modules/http2/h2_filter.h index b3e34cc5ba..093d4ea3bc 100644 --- a/modules/http2/h2_filter.h +++ b/modules/http2/h2_filter.h @@ -21,21 +21,16 @@ struct h2_headers; struct h2_stream; struct h2_session; -typedef apr_status_t h2_filter_cin_cb(void *ctx, - const char *data, apr_size_t len, - apr_size_t *readlen); - typedef struct h2_filter_cin { apr_pool_t *pool; - apr_bucket_brigade *bb; - h2_filter_cin_cb *cb; - void *cb_ctx; apr_socket_t *socket; apr_interval_time_t timeout; - apr_time_t start_read; + apr_bucket_brigade *bb; + struct h2_session *session; + apr_bucket *cur; } h2_filter_cin; -h2_filter_cin *h2_filter_cin_create(apr_pool_t *p, h2_filter_cin_cb *cb, void *ctx); +h2_filter_cin *h2_filter_cin_create(struct h2_session *session); void h2_filter_cin_timeout_set(h2_filter_cin *cin, apr_interval_time_t timeout); diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index b8244b0a77..04fbbd05cb 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -44,7 +44,6 @@ #include "h2_stream.h" #include "h2_session.h" #include "h2_task.h" -#include "h2_worker.h" #include "h2_workers.h" #include "h2_util.h" @@ -58,35 +57,40 @@ typedef struct { /* NULL or the mutex hold by this thread, used for recursive calls */ +static const int nested_lock = 0; + static apr_threadkey_t *thread_lock; apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s) { - return apr_threadkey_private_create(&thread_lock, NULL, pool); + if (nested_lock) { + return apr_threadkey_private_create(&thread_lock, NULL, pool); + } + return APR_SUCCESS; } static apr_status_t enter_mutex(h2_mplx *m, int *pacquired) { apr_status_t status; - void *mutex = NULL; - /* Enter the mutex if this thread already holds the lock or - * if we can acquire it. Only on the later case do we unlock - * onleaving the mutex. - * This allow recursive entering of the mutex from the saem thread, - * which is what we need in certain situations involving callbacks - */ - ap_assert(m); - apr_threadkey_private_get(&mutex, thread_lock); - if (mutex == m->lock) { - *pacquired = 0; - return APR_SUCCESS; + if (nested_lock) { + void *mutex = NULL; + /* Enter the mutex if this thread already holds the lock or + * if we can acquire it. Only on the later case do we unlock + * onleaving the mutex. + * This allow recursive entering of the mutex from the saem thread, + * which is what we need in certain situations involving callbacks + */ + apr_threadkey_private_get(&mutex, thread_lock); + if (mutex == m->lock) { + *pacquired = 0; + ap_assert(NULL); /* nested, why? */ + return APR_SUCCESS; + } } - - ap_assert(m->lock); status = apr_thread_mutex_lock(m->lock); *pacquired = (status == APR_SUCCESS); - if (*pacquired) { + if (nested_lock && *pacquired) { apr_threadkey_private_set(m->lock, thread_lock); } return status; @@ -95,7 +99,9 @@ static apr_status_t enter_mutex(h2_mplx *m, int *pacquired) static void leave_mutex(h2_mplx *m, int acquired) { if (acquired) { - apr_threadkey_private_set(NULL, thread_lock); + if (nested_lock) { + apr_threadkey_private_set(NULL, thread_lock); + } apr_thread_mutex_unlock(m->lock); } } @@ -106,75 +112,23 @@ static void stream_output_consumed(void *ctx, h2_bucket_beam *beam, apr_off_t length) { h2_stream *stream = ctx; - h2_mplx *m = stream->session->mplx; h2_task *task = stream->task; - int acquired; if (length > 0 && task && task->assigned) { - if (enter_mutex(m, &acquired) == APR_SUCCESS) { - h2_req_engine_out_consumed(task->assigned, task->c, length); - leave_mutex(m, acquired); - } + h2_req_engine_out_consumed(task->assigned, task->c, length); } } static void stream_input_ev(void *ctx, h2_bucket_beam *beam) { - h2_mplx *m = ctx; + h2_stream *stream = ctx; + h2_mplx *m = stream->session->mplx; apr_atomic_set32(&m->event_pending, 1); } -static void stream_input_consumed(void *ctx, - h2_bucket_beam *beam, apr_off_t length) +static void stream_input_consumed(void *ctx, h2_bucket_beam *beam, apr_off_t length) { - if (length > 0) { - h2_mplx *m = ctx; - int acquired; - - if (enter_mutex(m, &acquired) == APR_SUCCESS) { - if (m->input_consumed) { - m->input_consumed(m->input_consumed_ctx, beam->id, length); - } - leave_mutex(m, acquired); - } - } -} - -static int can_beam_file(void *ctx, h2_bucket_beam *beam, apr_file_t *file) -{ - h2_mplx *m = ctx; - if (m->tx_handles_reserved > 0) { - --m->tx_handles_reserved; - ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, - "h2_stream(%ld-%d,%s): beaming file, tx_avail %d", - m->id, beam->id, beam->tag, m->tx_handles_reserved); - return 1; - } - ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, - "h2_stream(%ld-%d,%s): can_beam_file denied", - m->id, beam->id, beam->tag); - return 0; -} - -static void check_tx_reservation(h2_mplx *m) -{ - if (m->tx_handles_reserved <= 0) { - m->tx_handles_reserved += h2_workers_tx_reserve(m->workers, - H2MIN(m->tx_chunk_size, h2_ihash_count(m->streams))); - } -} - -static void check_tx_free(h2_mplx *m) -{ - if (m->tx_handles_reserved > m->tx_chunk_size) { - apr_size_t count = m->tx_handles_reserved - m->tx_chunk_size; - m->tx_handles_reserved = m->tx_chunk_size; - h2_workers_tx_free(m->workers, count); - } - else if (m->tx_handles_reserved && h2_ihash_empty(m->streams)) { - h2_workers_tx_free(m->workers, m->tx_handles_reserved); - m->tx_handles_reserved = 0; - } + h2_stream_in_consumed(ctx, length); } static void stream_joined(h2_mplx *m, h2_stream *stream) @@ -183,10 +137,6 @@ static void stream_joined(h2_mplx *m, h2_stream *stream) h2_ihash_remove(m->shold, stream->id); h2_ihash_add(m->spurge, stream); - if (stream->input) { - m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input); - } - m->tx_handles_reserved += h2_beam_get_files_beamed(stream->output); } static void stream_cleanup(h2_mplx *m, h2_stream *stream) @@ -197,8 +147,10 @@ static void stream_cleanup(h2_mplx *m, h2_stream *stream) h2_beam_on_consumed(stream->input, NULL, NULL, NULL); h2_beam_abort(stream->input); } - h2_beam_on_produced(stream->output, NULL, NULL); - h2_beam_leave(stream->output); + if (stream->output) { + h2_beam_on_produced(stream->output, NULL, NULL); + h2_beam_leave(stream->output); + } h2_stream_cleanup(stream); @@ -239,7 +191,6 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, m = apr_pcalloc(parent, sizeof(h2_mplx)); if (m) { m->id = c->id; - APR_RING_ELEM_INIT(m, link); m->c = c; /* We create a pool with its own allocator to be used for @@ -292,13 +243,10 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, m->readyq = h2_iq_create(m->pool, m->max_streams); m->workers = workers; - m->workers_max = workers->max_workers; - m->workers_limit = 6; /* the original h1 max parallel connections */ + m->max_active = workers->max_workers; + m->limit_active = 6; /* the original h1 max parallel connections */ m->last_limit_change = m->last_idle_block = apr_time_now(); - m->limit_change_interval = apr_time_from_msec(200); - - m->tx_handles_reserved = 0; - m->tx_chunk_size = 4; + m->limit_change_interval = apr_time_from_msec(100); m->spare_slaves = apr_array_make(m->pool, 10, sizeof(conn_rec*)); @@ -330,6 +278,12 @@ static int input_consumed_signal(h2_mplx *m, h2_stream *stream) return 0; } +static int report_consumption_iter(void *ctx, void *val) +{ + input_consumed_signal(ctx, val); + return 1; +} + static int output_consumed_signal(h2_mplx *m, h2_task *task) { if (task->output.beam) { @@ -344,7 +298,7 @@ static void task_destroy(h2_mplx *m, h2_task *task) int reuse_slave = 0; slave = task->c; - reuse_slave = ((m->spare_slaves->nelts < m->spare_slaves->nalloc) + reuse_slave = ((m->spare_slaves->nelts < (m->limit_active * 3 / 2)) && !task->rst_error); if (slave) { @@ -371,12 +325,6 @@ static int stream_destroy_iter(void *ctx, void *val) h2_ihash_remove(m->spurge, stream->id); ap_assert(stream->state == H2_SS_CLEANUP); - if (stream->output == NULL) { - ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, m->c, - H2_STRM_MSG(stream, "already with beams==NULL")); - return 0; - } - if (stream->input) { /* Process outstanding events before destruction */ input_consumed_signal(m, stream); @@ -384,10 +332,7 @@ static int stream_destroy_iter(void *ctx, void *val) h2_beam_destroy(stream->input); stream->input = NULL; } - - h2_beam_log(stream->output, m->c, APLOG_TRACE2, "stream_destroy"); - h2_beam_destroy(stream->output); - stream->output = NULL; + if (stream->task) { task_destroy(m, stream->task); stream->task = NULL; @@ -402,7 +347,6 @@ static void purge_streams(h2_mplx *m) while (!h2_ihash_iter(m->spurge, stream_destroy_iter, m)) { /* repeat until empty */ } - check_tx_free(m); } } @@ -468,6 +412,10 @@ static int stream_cancel_iter(void *ctx, void *val) { h2_mplx *m = ctx; h2_stream *stream = val; + /* disabled input consumed reporting */ + if (stream->input) { + h2_beam_on_consumed(stream->input, NULL, NULL, NULL); + } /* take over event monitoring */ h2_stream_set_monitor(stream, NULL); /* Reset, should transit to CLOSED state */ @@ -485,14 +433,14 @@ void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) int acquired; /* How to shut down a h2 connection: - * 0. tell the workers that no more tasks will come from us */ + * 0. abort and tell the workers that no more tasks will come from us */ + m->aborted = 1; h2_workers_unregister(m->workers, m); enter_mutex(m, &acquired); /* How to shut down a h2 connection: - * 1. set aborted flag and cancel all streams still active */ - m->aborted = 1; + * 1. cancel all streams still active */ while (!h2_ihash_iter(m->streams, stream_cancel_iter, m)) { /* until empty */ } @@ -533,13 +481,12 @@ void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) m->id, (int)h2_ihash_count(m->shold)); h2_ihash_iter(m->shold, unexpected_stream_iter, m); } - /*ap_assert(h2_ihash_empty(m->shold));*/ - - /* 5. return any file resources allocated */ - check_tx_free(m); leave_mutex(m, acquired); + /* 5. unregister again, now that our workers are done */ + h2_workers_unregister(m->workers, m); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): released", m->id); } @@ -570,12 +517,6 @@ h2_stream *h2_mplx_stream_get(h2_mplx *m, int id) return s; } -void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx) -{ - m->input_consumed = cb; - m->input_consumed_ctx = ctx; -} - static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes) { h2_mplx *m = ctx; @@ -593,12 +534,14 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam) { apr_status_t status = APR_SUCCESS; h2_stream *stream = h2_ihash_get(m->streams, stream_id); - apr_size_t beamed_count; - if (!stream || !stream->task) { + if (!stream || !stream->task || m->aborted) { return APR_ECONNABORTED; } + ap_assert(stream->output == NULL); + stream->output = beam; + if (APLOGctrace2(m->c)) { h2_beam_log(beam, m->c, APLOG_TRACE2, "out_open"); } @@ -609,15 +552,8 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam) h2_beam_on_consumed(stream->output, NULL, stream_output_consumed, stream); h2_beam_on_produced(stream->output, output_produced, m); - beamed_count = h2_beam_get_files_beamed(stream->output); - if (m->tx_handles_reserved >= beamed_count) { - m->tx_handles_reserved -= beamed_count; - } - else { - m->tx_handles_reserved = 0; - } - if (!stream->task->output.copy_files) { - h2_beam_on_file_beam(stream->output, can_beam_file, m); + if (stream->task->output.copy_files) { + h2_beam_on_file_beam(stream->output, h2_beam_no_files, NULL); } /* time to protect the beam against multi-threaded use */ @@ -625,7 +561,6 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam) /* we might see some file buckets in the output, see * if we have enough handles reserved. */ - check_tx_reservation(m); check_data_for(m, stream->id); return status; } @@ -685,6 +620,7 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout, } else { purge_streams(m); + h2_ihash_iter(m->streams, report_consumption_iter, m); m->added_output = iowait; status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout); if (APLOGctrace2(m->c)) { @@ -728,11 +664,24 @@ apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx) return status; } +static void register_if_needed(h2_mplx *m) +{ + if (!m->is_registered && !h2_iq_empty(m->q)) { + apr_status_t status = h2_workers_register(m->workers, m); + if (status == APR_SUCCESS) { + m->is_registered = 1; + } + else { + ap_log_cerror(APLOG_MARK, APLOG_ERR, status, m->c, APLOGNO(10021) + "h2_mplx(%ld): register at workers", m->id); + } + } +} + apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream, h2_stream_pri_cmp *cmp, void *ctx) { apr_status_t status; - int do_registration = 0; int acquired; if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { @@ -744,27 +693,18 @@ apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream, if (h2_stream_is_ready(stream)) { /* already have a response */ check_data_for(m, stream->id); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c, + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, H2_STRM_MSG(stream, "process, add to readyq")); } else { - if (!m->need_registration) { - m->need_registration = h2_iq_empty(m->q); - } - if (m->workers_busy < m->workers_max) { - do_registration = m->need_registration; - } - h2_iq_add(m->q, stream->id, cmp, ctx); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c, - H2_STRM_MSG(stream, "process, add to q")); + h2_iq_add(m->q, stream->id, cmp, ctx); + register_if_needed(m); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + H2_STRM_MSG(stream, "process, added to q")); } } leave_mutex(m, acquired); } - if (do_registration) { - m->need_registration = 0; - h2_workers_register(m->workers, m); - } return status; } @@ -772,49 +712,47 @@ static h2_task *next_stream_task(h2_mplx *m) { h2_stream *stream; int sid; - while (!m->aborted && (m->workers_busy < m->workers_limit) + while (!m->aborted && (m->tasks_active < m->limit_active) && (sid = h2_iq_shift(m->q)) > 0) { stream = h2_ihash_get(m->streams, sid); if (stream) { conn_rec *slave, **pslave; - int new_conn = 0; pslave = (conn_rec **)apr_array_pop(m->spare_slaves); if (pslave) { slave = *pslave; + slave->aborted = 0; } else { slave = h2_slave_create(m->c, stream->id, m->pool); - new_conn = 1; } - slave->sbh = m->c->sbh; - slave->aborted = 0; if (!stream->task) { - stream->task = h2_task_create(stream, slave); - + m->c->keepalives++; - apr_table_setn(slave->notes, H2_TASK_ID_NOTE, stream->task->id); - if (new_conn) { - h2_slave_run_pre_connection(slave, ap_get_conn_socket(slave)); - } if (sid > m->max_stream_started) { m->max_stream_started = sid; } - if (stream->input) { h2_beam_on_consumed(stream->input, stream_input_ev, - stream_input_consumed, m); - h2_beam_on_file_beam(stream->input, can_beam_file, m); - h2_beam_mutex_enable(stream->input); + stream_input_consumed, stream); + } + + stream->task = h2_task_create(slave, stream->id, + stream->request, m, stream->input, + stream->session->s->timeout, + m->stream_max_mem); + if (!stream->task) { + ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, slave, + H2_STRM_LOG(APLOGNO(02941), stream, + "create task")); + return NULL; } - h2_beam_buffer_size_set(stream->output, m->stream_max_mem); } - stream->task->worker_started = 1; - stream->task->started_at = apr_time_now(); - ++m->workers_busy; + + ++m->tasks_active; return stream->task; } } @@ -827,17 +765,16 @@ h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more) apr_status_t status; int acquired; + *has_more = 0; if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - if (m->aborted) { - *has_more = 0; - } - else { + if (!m->aborted) { task = next_stream_task(m); - *has_more = !h2_iq_empty(m->q); - } - - if (has_more && !task) { - m->need_registration = 1; + if (task != NULL && !h2_iq_empty(m->q)) { + *has_more = 1; + } + else { + m->is_registered = 0; /* h2_workers will discard this mplx */ + } } leave_mutex(m, acquired); } @@ -896,15 +833,14 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) * a block by flow control. */ if (task->done_at- m->last_limit_change >= m->limit_change_interval - && m->workers_limit < m->workers_max) { + && m->limit_active < m->max_active) { /* Well behaving stream, allow it more workers */ - m->workers_limit = H2MIN(m->workers_limit * 2, - m->workers_max); + m->limit_active = H2MIN(m->limit_active * 2, + m->max_active); m->last_limit_change = task->done_at; - m->need_registration = 1; ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): increase worker limit to %d", - m->id, m->workers_limit); + m->id, m->limit_active); } } @@ -926,7 +862,9 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) h2_beam_mutex_disable(stream->input); h2_beam_leave(stream->input); } - h2_beam_mutex_disable(stream->output); + if (stream->output) { + h2_beam_mutex_disable(stream->output); + } check_data_for(m, stream->id); } else if ((stream = h2_ihash_get(m->shold, task->stream_id)) != NULL) { @@ -937,7 +875,9 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) h2_beam_mutex_disable(stream->input); h2_beam_leave(stream->input); } - h2_beam_mutex_disable(stream->output); + if (stream->output) { + h2_beam_mutex_disable(stream->output); + } stream_joined(m, stream); } else if ((stream = h2_ihash_get(m->spurge, task->stream_id)) != NULL) { @@ -959,7 +899,7 @@ void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask) if (enter_mutex(m, &acquired) == APR_SUCCESS) { task_done(m, task, NULL); - --m->workers_busy; + --m->tasks_active; if (m->join_wait) { apr_thread_cond_signal(m->join_wait); } @@ -967,6 +907,7 @@ void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask) /* caller wants another task */ *ptask = next_stream_task(m); } + register_if_needed(m); leave_mutex(m, acquired); } } @@ -1037,14 +978,14 @@ static apr_status_t unschedule_slow_tasks(h2_mplx *m) /* Try to get rid of streams that occupy workers. Look for safe requests * that are repeatable. If none found, fail the connection. */ - n = (m->workers_busy - m->workers_limit - (int)h2_ihash_count(m->sredo)); + n = (m->tasks_active - m->limit_active - (int)h2_ihash_count(m->sredo)); while (n > 0 && (stream = get_latest_repeatable_unsubmitted_stream(m))) { h2_task_rst(stream->task, H2_ERR_CANCEL); h2_ihash_add(m->sredo, stream); --n; } - if ((m->workers_busy - h2_ihash_count(m->sredo)) > m->workers_limit) { + if ((m->tasks_active - h2_ihash_count(m->sredo)) > m->limit_active) { h2_stream *stream = get_timed_out_busy_stream(m); if (stream) { /* Too many busy workers, unable to cancel enough streams @@ -1064,7 +1005,7 @@ apr_status_t h2_mplx_idle(h2_mplx *m) if (enter_mutex(m, &acquired) == APR_SUCCESS) { apr_size_t scount = h2_ihash_count(m->streams); - if (scount > 0 && m->workers_busy) { + if (scount > 0 && m->tasks_active) { /* If we have streams in connection state 'IDLE', meaning * all streams are ready to sent data out, but lack * WINDOW_UPDATEs. @@ -1079,30 +1020,31 @@ apr_status_t h2_mplx_idle(h2_mplx *m) */ now = apr_time_now(); m->last_idle_block = now; - if (m->workers_limit > 2 + if (m->limit_active > 2 && now - m->last_limit_change >= m->limit_change_interval) { - if (m->workers_limit > 16) { - m->workers_limit = 16; + if (m->limit_active > 16) { + m->limit_active = 16; } - else if (m->workers_limit > 8) { - m->workers_limit = 8; + else if (m->limit_active > 8) { + m->limit_active = 8; } - else if (m->workers_limit > 4) { - m->workers_limit = 4; + else if (m->limit_active > 4) { + m->limit_active = 4; } - else if (m->workers_limit > 2) { - m->workers_limit = 2; + else if (m->limit_active > 2) { + m->limit_active = 2; } m->last_limit_change = now; ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): decrease worker limit to %d", - m->id, m->workers_limit); + m->id, m->limit_active); } - if (m->workers_busy > m->workers_limit) { + if (m->tasks_active > m->limit_active) { status = unschedule_slow_tasks(m); } } + register_if_needed(m); leave_mutex(m, acquired); } return status; @@ -1252,12 +1194,6 @@ int h2_mplx_has_master_events(h2_mplx *m) return apr_atomic_read32(&m->event_pending) > 0; } -static int report_consumption_iter(void *ctx, void *val) -{ - input_consumed_signal(ctx, val); - return 1; -} - apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, stream_ev_callback *on_resume, void *on_ctx) @@ -1272,16 +1208,19 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, "h2_mplx(%ld): dispatch events", m->id); apr_atomic_set32(&m->event_pending, 0); + purge_streams(m); + /* update input windows for streams */ h2_ihash_iter(m->streams, report_consumption_iter, m); - purge_streams(m); if (!h2_iq_empty(m->readyq)) { n = h2_iq_mshift(m->readyq, ids, H2_ALEN(ids)); for (i = 0; i < n; ++i) { stream = h2_ihash_get(m->streams, ids[i]); if (stream) { + leave_mutex(m, acquired); on_resume(on_ctx, stream); + enter_mutex(m, &acquired); } } } @@ -1314,7 +1253,7 @@ int h2_mplx_awaits_data(h2_mplx *m) if (h2_ihash_empty(m->streams)) { waiting = 0; } - if (h2_iq_empty(m->readyq) && h2_iq_empty(m->q) && !m->workers_busy) { + if (h2_iq_empty(m->readyq) && h2_iq_empty(m->q) && !m->tasks_active) { waiting = 0; } leave_mutex(m, acquired); diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index 992c24cbe4..82a98fce0a 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -53,22 +53,14 @@ struct h2_req_engine; typedef struct h2_mplx h2_mplx; -/** - * Callback invoked for every stream that had input data read since - * the last invocation. - */ -typedef void h2_mplx_consumed_cb(void *ctx, int stream_id, apr_off_t consumed); - struct h2_mplx { long id; conn_rec *c; apr_pool_t *pool; - APR_RING_ENTRY(h2_mplx) link; - unsigned int event_pending; - unsigned int aborted : 1; - unsigned int need_registration : 1; + unsigned int aborted; + unsigned int is_registered; /* is registered at h2_workers */ struct h2_ihash_t *streams; /* all streams currently processing */ struct h2_ihash_t *sredo; /* all streams that need to be re-started */ @@ -82,9 +74,9 @@ struct h2_mplx { int max_streams; /* max # of concurrent streams */ int max_stream_started; /* highest stream id that started processing */ - int workers_busy; /* # of workers processing on this mplx */ - int workers_limit; /* current # of workers limit, dynamic */ - int workers_max; /* max, hard limit # of workers in a process */ + int tasks_active; /* # of tasks being processed from this mplx */ + int limit_active; /* current limit on active tasks, dynamic */ + int max_active; /* max, hard limit # of active tasks in a process */ apr_time_t last_idle_block; /* last time, this mplx entered IDLE while * streams were ready */ apr_time_t last_limit_change; /* last time, worker limit changed */ @@ -101,12 +93,7 @@ struct h2_mplx { apr_array_header_t *spare_slaves; /* spare slave connections */ struct h2_workers *workers; - int tx_handles_reserved; - int tx_chunk_size; - h2_mplx_consumed_cb *input_consumed; - void *input_consumed_ctx; - struct h2_ngn_shed *ngn_shed; }; @@ -198,18 +185,6 @@ apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream, */ apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx); -/** - * Register a callback for the amount of input data consumed per stream. The - * will only ever be invoked from the thread creating this h2_mplx, e.g. when - * calls from that thread into this h2_mplx are made. - * - * @param m the multiplexer to register the callback at - * @param cb the function to invoke - * @param ctx user supplied argument to invocation. - */ -void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx); - - typedef apr_status_t stream_ev_callback(void *ctx, struct h2_stream *stream); /** diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c index dd16820776..f37741b61f 100644 --- a/modules/http2/h2_session.c +++ b/modules/http2/h2_session.c @@ -56,6 +56,7 @@ static void transit(h2_session *session, const char *action, static void on_stream_state_enter(void *ctx, h2_stream *stream); static void on_stream_state_event(void *ctx, h2_stream *stream, h2_stream_event_t ev); +static void on_stream_event(void *ctx, h2_stream *stream, h2_stream_event_t ev); static int h2_session_status_from_apr_status(apr_status_t rv) { @@ -71,26 +72,20 @@ static int h2_session_status_from_apr_status(apr_status_t rv) return NGHTTP2_ERR_PROTO; } -static void update_window(void *ctx, int stream_id, apr_off_t bytes_read) +static h2_stream *get_stream(h2_session *session, int stream_id) { - h2_session *session = (h2_session*)ctx; - while (bytes_read > 0) { - int len = (bytes_read > INT_MAX)? INT_MAX : bytes_read; - nghttp2_session_consume(session->ngh2, stream_id, (int)bytes_read); - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, - "h2_stream(%ld-%d): consumed %d bytes", - session->id, stream_id, len); - bytes_read -= len; - } + return nghttp2_session_get_stream_user_data(session->ngh2, stream_id); } -static apr_status_t h2_session_receive(void *ctx, - const char *data, apr_size_t len, - apr_size_t *readlen); - static void dispatch_event(h2_session *session, h2_session_event_t ev, int err, const char *msg); +void h2_session_event(h2_session *session, h2_session_event_t ev, + int err, const char *msg) +{ + dispatch_event(session, ev, err, msg); +} + static int rst_unprocessed_stream(h2_stream *stream, void *ctx) { int unprocessed = (!h2_stream_was_closed(stream) @@ -227,11 +222,6 @@ static int on_invalid_frame_recv_cb(nghttp2_session *ngh2, return 0; } -static h2_stream *get_stream(h2_session *session, int stream_id) -{ - return nghttp2_session_get_stream_user_data(session->ngh2, stream_id); -} - static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags, int32_t stream_id, const uint8_t *data, size_t len, void *userp) @@ -703,7 +693,6 @@ static apr_status_t session_cleanup(h2_session *session, const char *trigger) } transit(session, trigger, H2_SESSION_ST_CLEANUP); - h2_mplx_set_consumed_cb(session->mplx, NULL, NULL); h2_mplx_release_and_join(session->mplx, session->iowait); session->mplx = NULL; @@ -803,23 +792,33 @@ static apr_status_t h2_session_create_int(h2_session **psession, return status; } + session->in_pending = h2_iq_create(session->pool, session->max_stream_count); + if (session->in_pending == NULL) { + apr_pool_destroy(pool); + return APR_ENOMEM; + } + + session->in_process = h2_iq_create(session->pool, session->max_stream_count); + if (session->in_process == NULL) { + apr_pool_destroy(pool); + return APR_ENOMEM; + } + session->monitor = apr_pcalloc(pool, sizeof(h2_stream_monitor)); if (session->monitor == NULL) { apr_pool_destroy(pool); - return status; + return APR_ENOMEM; } session->monitor->ctx = session; session->monitor->on_state_enter = on_stream_state_enter; session->monitor->on_state_event = on_stream_state_event; + session->monitor->on_event = on_stream_event; session->mplx = h2_mplx_create(c, session->pool, session->config, workers); - h2_mplx_set_consumed_cb(session->mplx, update_window, session); - - /* Install the connection input filter that feeds the session */ - session->cin = h2_filter_cin_create(session->pool, - h2_session_receive, session); + /* connection input filter that feeds the session */ + session->cin = h2_filter_cin_create(session); ap_add_input_filter("H2_IN", session->cin, r, c); h2_conn_io_init(&session->io, c, session->config); @@ -871,8 +870,8 @@ static apr_status_t h2_session_create_int(h2_session **psession, "push_diary(type=%d,N=%d)"), (int)session->max_stream_count, (int)session->max_stream_mem, - session->mplx->workers_limit, - session->mplx->workers_max, + session->mplx->limit_active, + session->mplx->max_active, session->push_diary->dtype, (int)session->push_diary->N); } @@ -1431,7 +1430,8 @@ send_headers: if (!stream->has_response) { /* but no response */ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, - H2_STRM_LOG(APLOGNO(03466), stream, "no response, RST_STREAM")); + H2_STRM_LOG(APLOGNO(03466), stream, + "no response, RST_STREAM")); h2_stream_rst(stream, H2_ERR_PROTOCOL_ERROR); return APR_SUCCESS; } @@ -1444,32 +1444,32 @@ send_headers: return status; } -static apr_status_t h2_session_receive(void *ctx, const char *data, - apr_size_t len, apr_size_t *readlen) +static void h2_session_in_flush(h2_session *session) { - h2_session *session = ctx; - ssize_t n; - - if (len > 0) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, - H2_SSSN_MSG(session, "feeding %ld bytes to nghttp2"), - (long)len); - n = nghttp2_session_mem_recv(session->ngh2, (const uint8_t *)data, len); - if (n < 0) { - if (nghttp2_is_fatal((int)n)) { - dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, (int)n, nghttp2_strerror((int)n)); - return APR_EGENERAL; + int id; + + while ((id = h2_iq_shift(session->in_process)) > 0) { + h2_stream *stream = get_stream(session, id); + if (stream) { + ap_assert(!stream->scheduled); + if (h2_stream_prep_processing(stream) == APR_SUCCESS) { + h2_mplx_process(session->mplx, stream, stream_pri_cmp, session); + } + else { + h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR); } } - else { - *readlen = n; - session->io.bytes_read += n; + } + + while ((id = h2_iq_shift(session->in_pending)) > 0) { + h2_stream *stream = get_stream(session, id); + if (stream) { + h2_stream_flush_input(stream); } } - return APR_SUCCESS; } -static apr_status_t h2_session_read(h2_session *session, int block) +static apr_status_t session_read(h2_session *session, apr_size_t readlen, int block) { apr_status_t status, rstatus = APR_EAGAIN; conn_rec *c = session->c; @@ -1481,7 +1481,7 @@ static apr_status_t h2_session_read(h2_session *session, int block) status = ap_get_brigade(c->input_filters, session->bbtmp, AP_MODE_READBYTES, block? APR_BLOCK_READ : APR_NONBLOCK_READ, - APR_BUCKET_BUFF_SIZE); + H2MAX(APR_BUCKET_BUFF_SIZE, readlen)); /* get rid of any possible data we do not expect to get */ apr_brigade_cleanup(session->bbtmp); @@ -1523,16 +1523,25 @@ static apr_status_t h2_session_read(h2_session *session, int block) * status. */ return rstatus; } - if ((session->io.bytes_read - read_start) > (64*1024)) { + if ((session->io.bytes_read - read_start) > readlen) { /* read enough in one go, give write a chance */ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c, - H2_SSSN_MSG(session, "read 64k, returning")); + H2_SSSN_MSG(session, "read enough, returning")); break; } } return rstatus; } +static apr_status_t h2_session_read(h2_session *session, int block) +{ + apr_status_t status = session_read(session, session->max_stream_mem + * H2MAX(2, session->open_streams), + block); + h2_session_in_flush(session); + return status; +} + static const char *StateNames[] = { "INIT", /* H2_SESSION_ST_INIT */ "DONE", /* H2_SESSION_ST_DONE */ @@ -1769,24 +1778,17 @@ static void h2_session_ev_pre_close(h2_session *session, int arg, const char *ms static void ev_stream_open(h2_session *session, h2_stream *stream) { + h2_iq_append(session->in_process, stream->id); switch (session->state) { case H2_SESSION_ST_IDLE: if (session->open_streams == 1) { - /* enter tiomeout, since we have a stream again */ + /* enter timeout, since we have a stream again */ session->idle_until = (session->s->timeout + apr_time_now()); } break; default: break; } - - ap_assert(!stream->scheduled); - if (h2_stream_prep_processing(stream) == APR_SUCCESS) { - h2_mplx_process(session->mplx, stream, stream_pri_cmp, session); - } - else { - h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR); - } } static void ev_stream_closed(h2_session *session, h2_stream *stream) @@ -1863,6 +1865,20 @@ static void on_stream_state_enter(void *ctx, h2_stream *stream) } } +static void on_stream_event(void *ctx, h2_stream *stream, + h2_stream_event_t ev) +{ + h2_session *session = ctx; + switch (ev) { + case H2_SEV_IN_DATA_PENDING: + h2_iq_append(session->in_pending, stream->id); + break; + default: + /* NOP */ + break; + } +} + static void on_stream_state_event(void *ctx, h2_stream *stream, h2_stream_event_t ev) { diff --git a/modules/http2/h2_session.h b/modules/http2/h2_session.h index fb3cd3d573..5751aed7bd 100644 --- a/modules/http2/h2_session.h +++ b/modules/http2/h2_session.h @@ -125,6 +125,10 @@ typedef struct h2_session { char status[64]; /* status message for scoreboard */ int last_status_code; /* the one already reported */ const char *last_status_msg; /* the one already reported */ + + struct h2_iqueue *in_pending; /* all streams with input pending */ + struct h2_iqueue *in_process; /* all streams ready for processing on slave */ + } h2_session; const char *h2_session_state_str(h2_session_state state); @@ -155,6 +159,9 @@ apr_status_t h2_session_rcreate(h2_session **psession, request_rec *r, struct h2_ctx *ctx, struct h2_workers *workers); +void h2_session_event(h2_session *session, h2_session_event_t ev, + int err, const char *msg); + /** * Process the given HTTP/2 session until it is ended or a fatal * error occurred. diff --git a/modules/http2/h2_stream.c b/modules/http2/h2_stream.c index 1108e4d10d..7bf35aa3b2 100644 --- a/modules/http2/h2_stream.c +++ b/modules/http2/h2_stream.c @@ -43,18 +43,6 @@ #include "h2_util.h" -#define S_XXX (-2) -#define S_ERR (-1) -#define S_NOP (0) -#define S_IDL (H2_SS_IDL + 1) -#define S_RS_L (H2_SS_RSVD_L + 1) -#define S_RS_R (H2_SS_RSVD_R + 1) -#define S_OPEN (H2_SS_OPEN + 1) -#define S_CL_L (H2_SS_CLOSED_L + 1) -#define S_CL_R (H2_SS_CLOSED_R + 1) -#define S_CLS (H2_SS_CLOSED + 1) -#define S_CLN (H2_SS_CLEANUP + 1) - static const char *h2_ss_str(h2_stream_state_t state) { switch (state) { @@ -84,37 +72,54 @@ const char *h2_stream_state_str(h2_stream *stream) return h2_ss_str(stream->state); } +/* Abbreviations for stream transit tables */ +#define S_XXX (-2) /* Programming Error */ +#define S_ERR (-1) /* Protocol Error */ +#define S_NOP (0) /* No Change */ +#define S_IDL (H2_SS_IDL + 1) +#define S_RS_L (H2_SS_RSVD_L + 1) +#define S_RS_R (H2_SS_RSVD_R + 1) +#define S_OPEN (H2_SS_OPEN + 1) +#define S_CL_L (H2_SS_CLOSED_L + 1) +#define S_CL_R (H2_SS_CLOSED_R + 1) +#define S_CLS (H2_SS_CLOSED + 1) +#define S_CLN (H2_SS_CLEANUP + 1) + +/* state transisitions when certain frame types are sent */ static int trans_on_send[][H2_SS_MAX] = { -/* S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, */ -/* DATA, */ { S_ERR, S_ERR, S_ERR, S_NOP, S_NOP, S_ERR, S_NOP, S_NOP, }, -/* HEADERS, */ { S_ERR, S_ERR, S_CL_R, S_NOP, S_NOP, S_ERR, S_NOP, S_NOP, }, -/* PRIORITY, */ { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, }, -/* RST_STREAM, */ { S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, }, -/* SETTINGS, */ { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, }, -/* PUSH_PROMISE, */ { S_RS_L,S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, }, -/* PING, */ { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, }, -/* GOAWAY, */ { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, }, -/* WINDOW_UPDATE,*/ { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, }, -/* CONT */ { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, }, +/*S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, */ +{ S_ERR, S_ERR, S_ERR, S_NOP, S_NOP, S_ERR, S_NOP, S_NOP, },/* DATA */ +{ S_ERR, S_ERR, S_CL_R, S_NOP, S_NOP, S_ERR, S_NOP, S_NOP, },/* HEADERS */ +{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* PRIORITY */ +{ S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, },/* RST_STREAM */ +{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* SETTINGS */ +{ S_RS_L,S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* PUSH_PROMISE */ +{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* PING */ +{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* GOAWAY */ +{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* WINDOW_UPDATE */ +{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* CONT */ }; +/* state transisitions when certain frame types are received */ static int trans_on_recv[][H2_SS_MAX] = { -/* S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, */ -/* DATA, */ { S_ERR, S_ERR, S_ERR, S_NOP, S_ERR, S_NOP, S_NOP, S_NOP, }, -/* HEADERS, */ { S_OPEN,S_CL_L, S_ERR, S_NOP, S_ERR, S_NOP, S_NOP, S_NOP, }, -/* PRIORITY, */ { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, }, -/* RST_STREAM, */ { S_ERR, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, }, -/* SETTINGS, */ { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, }, -/* PUSH_PROMISE, */ { S_RS_R,S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, }, -/* PING, */ { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, }, -/* GOAWAY, */ { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, }, -/* WINDOW_UPDATE,*/ { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, }, -/* CONT */ { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, }, +/*S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, */ +{ S_ERR, S_ERR, S_ERR, S_NOP, S_ERR, S_NOP, S_NOP, S_NOP, },/* DATA */ +{ S_OPEN,S_CL_L, S_ERR, S_NOP, S_ERR, S_NOP, S_NOP, S_NOP, },/* HEADERS */ +{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* PRIORITY */ +{ S_ERR, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, },/* RST_STREAM */ +{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* SETTINGS */ +{ S_RS_R,S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* PUSH_PROMISE */ +{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* PING */ +{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* GOAWAY */ +{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* WINDOW_UPDATE */ +{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* CONT */ }; +/* state transisitions when certain events happen */ static int trans_on_event[][H2_SS_MAX] = { -/* H2_SEV_CLOSED_L*/{ S_XXX, S_ERR, S_ERR, S_CL_L, S_CLS, S_XXX, S_XXX, S_XXX, }, -/* H2_SEV_CLOSED_R*/{ S_ERR, S_ERR, S_ERR, S_CL_R, S_ERR, S_CLS, S_NOP, S_NOP, }, -/* H2_SEV_CANCELLED*/{S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, }, -/* H2_SEV_EOS_SENT*/{ S_NOP, S_XXX, S_XXX, S_XXX, S_XXX, S_CLS, S_CLN, S_XXX, }, +/*S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, */ +{ S_XXX, S_ERR, S_ERR, S_CL_L, S_CLS, S_XXX, S_XXX, S_XXX, },/* EV_CLOSED_L*/ +{ S_ERR, S_ERR, S_ERR, S_CL_R, S_ERR, S_CLS, S_NOP, S_NOP, },/* EV_CLOSED_R*/ +{ S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, },/* EV_CANCELLED*/ +{ S_NOP, S_XXX, S_XXX, S_XXX, S_XXX, S_CLS, S_CLN, S_XXX, },/* EV_EOS_SENT*/ }; static int on_map(h2_stream_state_t state, int map[H2_SS_MAX]) @@ -137,7 +142,7 @@ static int on_frame(h2_stream_state_t state, int frame_type, ap_assert(frame_type >= 0); ap_assert(state >= 0); if (frame_type >= maxlen) { - return state; /* NOP */ + return state; /* NOP, ignore unknown frame types */ } return on_map(state, frame_map[frame_type]); } @@ -152,9 +157,15 @@ static int on_frame_recv(h2_stream_state_t state, int frame_type) return on_frame(state, frame_type, trans_on_recv, H2_ALEN(trans_on_recv)); } -static int on_event(h2_stream_state_t state, h2_stream_event_t ev) +static int on_event(h2_stream* stream, h2_stream_event_t ev) { - return on_map(state, trans_on_event[ev]); + if (stream->monitor && stream->monitor->on_event) { + stream->monitor->on_event(stream->monitor->ctx, stream, ev); + } + if (ev < H2_ALEN(trans_on_event)) { + return on_map(stream->state, trans_on_event[ev]); + } + return stream->state; } static void H2_STREAM_OUT_LOG(int lvl, h2_stream *s, const char *tag) @@ -171,11 +182,16 @@ static void H2_STREAM_OUT_LOG(int lvl, h2_stream *s, const char *tag) } static apr_status_t setup_input(h2_stream *stream) { - if (stream->input == NULL && !stream->input_eof) { - h2_beam_create(&stream->input, stream->pool, stream->id, - "input", H2_BEAM_OWNER_SEND, 0, - stream->session->s->timeout); - h2_beam_send_from(stream->input, stream->pool); + if (stream->input == NULL) { + int empty = (stream->input_eof + && (!stream->in_buffer + || APR_BRIGADE_EMPTY(stream->in_buffer))); + if (!empty) { + h2_beam_create(&stream->input, stream->pool, stream->id, + "input", H2_BEAM_OWNER_SEND, 0, + stream->session->s->timeout); + h2_beam_send_from(stream->input, stream->pool); + } } return APR_SUCCESS; } @@ -197,27 +213,27 @@ static apr_status_t close_input(h2_stream *stream) } if (stream->trailers && !apr_is_empty_table(stream->trailers)) { - apr_bucket_brigade *tmp; apr_bucket *b; h2_headers *r; - tmp = apr_brigade_create(stream->pool, c->bucket_alloc); + if (!stream->in_buffer) { + stream->in_buffer = apr_brigade_create(stream->pool, c->bucket_alloc); + } r = h2_headers_create(HTTP_OK, stream->trailers, NULL, stream->pool); stream->trailers = NULL; b = h2_bucket_headers_create(c->bucket_alloc, r); - APR_BRIGADE_INSERT_TAIL(tmp, b); + APR_BRIGADE_INSERT_TAIL(stream->in_buffer, b); b = apr_bucket_eos_create(c->bucket_alloc); - APR_BRIGADE_INSERT_TAIL(tmp, b); + APR_BRIGADE_INSERT_TAIL(stream->in_buffer, b); ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, H2_STRM_MSG(stream, "added trailers")); - setup_input(stream); - status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ); - apr_brigade_destroy(tmp); + h2_stream_dispatch(stream, H2_SEV_IN_DATA_PENDING); } if (stream->input) { + h2_stream_flush_input(stream); return h2_beam_close(stream->input); } return status; @@ -225,7 +241,7 @@ static apr_status_t close_input(h2_stream *stream) static apr_status_t close_output(h2_stream *stream) { - if (h2_beam_is_closed(stream->output)) { + if (!stream->output || h2_beam_is_closed(stream->output)) { return APR_SUCCESS; } ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, @@ -324,7 +340,7 @@ void h2_stream_dispatch(h2_stream *stream, h2_stream_event_t ev) ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, H2_STRM_MSG(stream, "dispatch event %d"), ev); - new_state = on_event(stream->state, ev); + new_state = on_event(stream, ev); if (new_state < 0) { ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c, H2_STRM_LOG(APLOGNO(10002), stream, "invalid event %d"), ev); @@ -335,7 +351,7 @@ void h2_stream_dispatch(h2_stream *stream, h2_stream_event_t ev) else if (new_state == stream->state) { /* nop */ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, - H2_STRM_MSG(stream, "ignored event %d"), ev); + H2_STRM_MSG(stream, "non-state event %d"), ev); return; } else { @@ -394,7 +410,7 @@ apr_status_t h2_stream_send_frame(h2_stream *stream, int ftype, int flags) H2_STRM_MSG(stream, "send frame %d, eos=%d"), ftype, eos); status = transit(stream, new_state); if (status == APR_SUCCESS && eos) { - status = transit(stream, on_event(stream->state, H2_SEV_CLOSED_L)); + status = transit(stream, on_event(stream, H2_SEV_CLOSED_L)); } return status; } @@ -444,7 +460,23 @@ apr_status_t h2_stream_recv_frame(h2_stream *stream, int ftype, int flags) } status = transit(stream, new_state); if (status == APR_SUCCESS && eos) { - status = transit(stream, on_event(stream->state, H2_SEV_CLOSED_R)); + status = transit(stream, on_event(stream, H2_SEV_CLOSED_R)); + } + return status; +} + +apr_status_t h2_stream_flush_input(h2_stream *stream) +{ + apr_status_t status = APR_SUCCESS; + + if (stream->in_buffer && !APR_BRIGADE_EMPTY(stream->in_buffer)) { + setup_input(stream); + status = h2_beam_send(stream->input, stream->in_buffer, APR_BLOCK_READ); + stream->in_last_write = apr_time_now(); + } + if (stream->input_eof + && stream->input && !h2_beam_is_closed(stream->input)) { + status = h2_beam_close(stream->input); } return status; } @@ -454,21 +486,27 @@ apr_status_t h2_stream_recv_DATA(h2_stream *stream, uint8_t flags, { h2_session *session = stream->session; apr_status_t status = APR_SUCCESS; - apr_bucket_brigade *tmp; - ap_assert(stream); + stream->in_data_frames++; if (len > 0) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, - H2_STRM_MSG(stream, "recv DATA, len=%d"), (int)len); - - tmp = apr_brigade_create(stream->pool, session->c->bucket_alloc); - apr_brigade_write(tmp, NULL, NULL, (const char *)data, len); - setup_input(stream); - status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ); - apr_brigade_destroy(tmp); + if (APLOGctrace3(session->c)) { + const char *load = apr_pstrndup(stream->pool, (const char *)data, len); + ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, session->c, + H2_STRM_MSG(stream, "recv DATA, len=%d: -->%s<--"), + (int)len, load); + } + else { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, + H2_STRM_MSG(stream, "recv DATA, len=%d"), (int)len); + } + stream->in_data_octets += len; + if (!stream->in_buffer) { + stream->in_buffer = apr_brigade_create(stream->pool, + session->c->bucket_alloc); + } + apr_brigade_write(stream->in_buffer, NULL, NULL, (const char *)data, len); + h2_stream_dispatch(stream, H2_SEV_IN_DATA_PENDING); } - stream->in_data_frames++; - stream->in_data_octets += len; return status; } @@ -493,9 +531,12 @@ h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session, stream->monitor = monitor; stream->max_mem = session->max_stream_mem; - h2_beam_create(&stream->output, pool, id, "output", H2_BEAM_OWNER_RECV, 0, - session->s->timeout); - +#ifdef H2_NG2_LOCAL_WIN_SIZE + stream->in_window_size = + nghttp2_session_get_stream_local_window_size( + stream->session->ngh2, stream->id); +#endif + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, H2_STRM_LOG(APLOGNO(03082), stream, "created")); on_state_enter(stream); @@ -563,7 +604,9 @@ void h2_stream_rst(h2_stream *stream, int error_code) if (stream->input) { h2_beam_abort(stream->input); } - h2_beam_leave(stream->output); + if (stream->output) { + h2_beam_leave(stream->output); + } ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, H2_STRM_MSG(stream, "reset, error=%d"), error_code); h2_stream_dispatch(stream, H2_SEV_CANCELLED); @@ -733,6 +776,8 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen, *presponse = NULL; } + ap_assert(stream); + if (stream->rst_error) { *plen = 0; *peos = 1; @@ -741,7 +786,7 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen, c = stream->session->c; prep_output(stream); - + /* determine how much we'd like to send. We cannot send more than * is requested. But we can reduce the size in case the master * connection operates in smaller chunks. (TSL warmup) */ @@ -753,8 +798,15 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen, h2_util_bb_avail(stream->out_buffer, plen, peos); if (!*peos && *plen < requested && *plen < stream->max_mem) { H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "pre"); - status = h2_beam_receive(stream->output, stream->out_buffer, - APR_NONBLOCK_READ, stream->max_mem - *plen); + if (stream->output) { + status = h2_beam_receive(stream->output, stream->out_buffer, + APR_NONBLOCK_READ, + stream->max_mem - *plen); + } + else { + status = APR_EOF; + } + if (APR_STATUS_IS_EOF(status)) { apr_bucket *eos = apr_bucket_eos_create(c->bucket_alloc); APR_BRIGADE_INSERT_TAIL(stream->out_buffer, eos); @@ -925,4 +977,64 @@ int h2_stream_was_closed(const h2_stream *stream) } } +apr_status_t h2_stream_in_consumed(h2_stream *stream, apr_off_t amount) +{ + h2_session *session = stream->session; + + if (amount > 0) { + apr_off_t consumed = amount; + + while (consumed > 0) { + int len = (consumed > INT_MAX)? INT_MAX : consumed; + nghttp2_session_consume(session->ngh2, stream->id, len); + consumed -= len; + } + +#ifdef H2_NG2_LOCAL_WIN_SIZE + if (1) { + int cur_size = nghttp2_session_get_stream_local_window_size( + session->ngh2, stream->id); + int win = stream->in_window_size; + int thigh = win * 8/10; + int tlow = win * 2/10; + const int win_max = 2*1024*1024; + const int win_min = 32*1024; + + /* Work in progress, probably should add directives for these + * values once this stabilizes somewhat. The general idea is + * to adapt stream window sizes if the input window changes + * a) very quickly (< good RTT) from full to empty + * b) only a little bit (> bad RTT) + * where in a) it grows and in b) it shrinks again. + */ + if (cur_size > thigh && amount > thigh && win < win_max) { + /* almost empty again with one reported consumption, how + * long did this take? */ + long ms = apr_time_msec(apr_time_now() - stream->in_last_write); + if (ms < 40) { + win = H2MIN(win_max, win + (64*1024)); + } + } + else if (cur_size < tlow && amount < tlow && win > win_min) { + /* staying full, for how long already? */ + long ms = apr_time_msec(apr_time_now() - stream->in_last_write); + if (ms > 700) { + win = H2MAX(win_min, win - (32*1024)); + } + } + + if (win != stream->in_window_size) { + stream->in_window_size = win; + nghttp2_session_set_local_window_size(session->ngh2, + NGHTTP2_FLAG_NONE, stream->id, win); + } + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, + "h2_stream(%ld-%d): consumed %ld bytes, window now %d/%d", + session->id, stream->id, (long)amount, + cur_size, stream->in_window_size); + } +#endif + } + return APR_SUCCESS; +} diff --git a/modules/http2/h2_stream.h b/modules/http2/h2_stream.h index 241c4fefe5..f328714951 100644 --- a/modules/http2/h2_stream.h +++ b/modules/http2/h2_stream.h @@ -25,10 +25,12 @@ * connection to the client. The h2_session writes to the h2_stream, * adding HEADERS and DATA and finally an EOS. When headers are done, * h2_stream is scheduled for handling, which is expected to produce - * a h2_headers. + * a response h2_headers at least. * - * The h2_headers gives the HEADER frames to sent to the client, followed - * by DATA frames read from the h2_stream until EOS is reached. + * The h2_headers may be followed by more h2_headers (interim responses) and + * by DATA frames read from the h2_stream until EOS is reached. Trailers + * are send when a last h2_headers is received. This always closes the stream + * output. */ struct h2_mplx; @@ -45,6 +47,9 @@ typedef void h2_stream_state_cb(void *ctx, h2_stream *stream); typedef void h2_stream_event_cb(void *ctx, h2_stream *stream, h2_stream_event_t ev); +/** + * Callback structure for events and stream state transisitions + */ typedef struct h2_stream_monitor { void *ctx; h2_stream_state_cb *on_state_enter; /* called when a state is entered */ @@ -52,6 +57,8 @@ typedef struct h2_stream_monitor { was detected */ h2_stream_event_cb *on_state_event; /* called right before the given event result in a new stream state */ + h2_stream_event_cb *on_event; /* called for events that do not + trigger a state change */ } h2_stream_monitor; struct h2_stream { @@ -69,9 +76,13 @@ struct h2_stream { int request_headers_added; /* number of request headers added */ struct h2_bucket_beam *input; + apr_bucket_brigade *in_buffer; + int in_window_size; + apr_time_t in_last_write; + struct h2_bucket_beam *output; - apr_size_t max_mem; /* maximum amount of data buffered */ apr_bucket_brigade *out_buffer; + apr_size_t max_mem; /* maximum amount of data buffered */ int rst_error; /* stream error for RST_STREAM */ unsigned int aborted : 1; /* was aborted */ @@ -99,6 +110,10 @@ struct h2_stream { * @param id the stream identifier * @param pool the memory pool to use for this stream * @param session the session this stream belongs to + * @param monitor an optional monitor to be called for events and + * state transisitions + * @param initiated_on the id of the stream this one was initiated on (PUSH) + * * @return the newly opened stream */ h2_stream *h2_stream_create(int id, apr_pool_t *pool, @@ -111,6 +126,13 @@ h2_stream *h2_stream_create(int id, apr_pool_t *pool, */ void h2_stream_destroy(h2_stream *stream); +/** + * Prepare the stream so that processing may start. + * + * This is the time to allocated resources not needed before. + * + * @param stream the stream to prep + */ apr_status_t h2_stream_prep_processing(h2_stream *stream); /* @@ -142,6 +164,12 @@ void h2_stream_cleanup(h2_stream *stream); */ apr_pool_t *h2_stream_detach_pool(h2_stream *stream); +/** + * Notify the stream that amount bytes have been consumed of its input + * since the last invocation of this method (delta amount). + */ +apr_status_t h2_stream_in_consumed(h2_stream *stream, apr_off_t amount); + /** * Set complete stream headers from given h2_request. * @@ -189,6 +217,8 @@ apr_status_t h2_stream_recv_frame(h2_stream *stream, int frame_type, int flags); apr_status_t h2_stream_recv_DATA(h2_stream *stream, uint8_t flags, const uint8_t *data, size_t len); +apr_status_t h2_stream_flush_input(h2_stream *stream); + /** * Reset the stream. Stream write/reads will return errors afterwards. * @@ -275,7 +305,6 @@ const char *h2_stream_state_str(h2_stream *stream); */ int h2_stream_is_ready(h2_stream *stream); - #define H2_STRM_MSG(s, msg) \ "h2_stream(%ld-%d,%s): "msg, s->session->id, s->id, h2_stream_state_str(s) diff --git a/modules/http2/h2_task.c b/modules/http2/h2_task.c index 3c2810a294..5ab485faab 100644 --- a/modules/http2/h2_task.c +++ b/modules/http2/h2_task.c @@ -45,7 +45,6 @@ #include "h2_session.h" #include "h2_stream.h" #include "h2_task.h" -#include "h2_worker.h" #include "h2_util.h" static void H2_TASK_OUT_LOG(int lvl, h2_task *task, apr_bucket_brigade *bb, @@ -217,14 +216,18 @@ static apr_status_t h2_filter_slave_in(ap_filter_t* f, apr_status_t status = APR_SUCCESS; apr_bucket *b, *next; apr_off_t bblen; - apr_size_t rmax; + const int trace1 = APLOGctrace1(f->c); + apr_size_t rmax = ((readbytes <= APR_SIZE_MAX)? + (apr_size_t)readbytes : APR_SIZE_MAX); task = h2_ctx_cget_task(f->c); ap_assert(task); - rmax = ((readbytes <= APR_SIZE_MAX)? (apr_size_t)readbytes : APR_SIZE_MAX); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, - "h2_slave_in(%s): read, mode=%d, block=%d, readbytes=%ld", - task->id, mode, block, (long)readbytes); + + if (trace1) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, + "h2_slave_in(%s): read, mode=%d, block=%d, readbytes=%ld", + task->id, mode, block, (long)readbytes); + } if (mode == AP_MODE_INIT) { return ap_get_brigade(f->c->input_filters, bb, mode, block, readbytes); @@ -250,19 +253,23 @@ static apr_status_t h2_filter_slave_in(ap_filter_t* f, while (APR_BRIGADE_EMPTY(task->input.bb)) { /* Get more input data for our request. */ - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, - "h2_slave_in(%s): get more data from mplx, block=%d, " - "readbytes=%ld", task->id, block, (long)readbytes); + if (trace1) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, + "h2_slave_in(%s): get more data from mplx, block=%d, " + "readbytes=%ld", task->id, block, (long)readbytes); + } if (task->input.beam) { status = h2_beam_receive(task->input.beam, task->input.bb, block, - H2MIN(readbytes, 32*1024)); + 128*1024); } else { status = APR_EOF; } - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, f->c, - "h2_slave_in(%s): read returned", task->id); + if (trace1) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, f->c, + "h2_slave_in(%s): read returned", task->id); + } if (APR_STATUS_IS_EAGAIN(status) && (mode == AP_MODE_GETLINE || block == APR_BLOCK_READ)) { /* chunked input handling does not seem to like it if we @@ -276,9 +283,11 @@ static apr_status_t h2_filter_slave_in(ap_filter_t* f, else if (status != APR_SUCCESS) { return status; } - - h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2, - "input.beam recv raw", task->input.bb); + + if (trace1) { + h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2, + "input.beam recv raw", task->input.bb); + } if (h2_task_logio_add_bytes_in) { apr_brigade_length(bb, 0, &bblen); h2_task_logio_add_bytes_in(f->c, bblen); @@ -292,12 +301,16 @@ static apr_status_t h2_filter_slave_in(ap_filter_t* f, return (mode == AP_MODE_SPECULATIVE)? APR_EAGAIN : APR_EOF; } - h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2, - "task_input.bb", task->input.bb); + if (trace1) { + h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2, + "task_input.bb", task->input.bb); + } if (APR_BRIGADE_EMPTY(task->input.bb)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, - "h2_slave_in(%s): no data", task->id); + if (trace1) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, + "h2_slave_in(%s): no data", task->id); + } return (block == APR_NONBLOCK_READ)? APR_EAGAIN : APR_EOF; } @@ -322,9 +335,11 @@ static apr_status_t h2_filter_slave_in(ap_filter_t* f, apr_size_t len = sizeof(buffer)-1; apr_brigade_flatten(bb, buffer, &len); buffer[len] = 0; - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, - "h2_slave_in(%s): getline: %s", - task->id, buffer); + if (trace1) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, + "h2_slave_in(%s): getline: %s", + task->id, buffer); + } } } else { @@ -337,7 +352,7 @@ static apr_status_t h2_filter_slave_in(ap_filter_t* f, status = APR_ENOTIMPL; } - if (APLOGctrace1(f->c)) { + if (trace1) { apr_brigade_length(bb, 0, &bblen); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, "h2_slave_in(%s): %ld data bytes", task->id, (long)bblen); @@ -481,42 +496,44 @@ static int h2_task_pre_conn(conn_rec* c, void *arg) return OK; } -h2_task *h2_task_create(h2_stream *stream, conn_rec *slave) +h2_task *h2_task_create(conn_rec *slave, int stream_id, + const h2_request *req, h2_mplx *m, + h2_bucket_beam *input, + apr_interval_time_t timeout, + apr_size_t output_max_mem) { apr_pool_t *pool; h2_task *task; ap_assert(slave); - ap_assert(stream); - ap_assert(stream->request); + ap_assert(req); apr_pool_create(&pool, slave->pool); task = apr_pcalloc(pool, sizeof(h2_task)); if (task == NULL) { - ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, slave, - H2_STRM_LOG(APLOGNO(02941), stream, "create task")); return NULL; } - task->id = apr_psprintf(pool, "%ld-%d", - stream->session->id, stream->id); - task->stream_id = stream->id; + task->id = "000"; + task->stream_id = stream_id; task->c = slave; - task->mplx = stream->session->mplx; - task->c->keepalives = slave->master->keepalives; + task->mplx = m; task->pool = pool; - task->request = stream->request; - task->input.beam = stream->input; - task->output.beam = stream->output; - task->timeout = stream->session->s->timeout; - - h2_beam_send_from(stream->output, task->pool); - h2_ctx_create_for(slave, task); - + task->request = req; + task->timeout = timeout; + task->input.beam = input; + task->output.max_buffer = output_max_mem; + return task; } void h2_task_destroy(h2_task *task) { + if (task->output.beam) { + h2_beam_log(task->output.beam, task->c, APLOG_TRACE2, "task_destroy"); + h2_beam_destroy(task->output.beam); + task->output.beam = NULL; + } + if (task->eor) { apr_bucket_destroy(task->eor); } @@ -527,9 +544,14 @@ void h2_task_destroy(h2_task *task) apr_status_t h2_task_do(h2_task *task, apr_thread_t *thread, int worker_id) { + conn_rec *c; + ap_assert(task); - - if (task->c->master) { + c = task->c; + task->worker_started = 1; + task->started_at = apr_time_now(); + + if (c->master) { /* Each conn_rec->id is supposed to be unique at a point in time. Since * some modules (and maybe external code) uses this id as an identifier * for the request_rec they handle, it needs to be unique for slave @@ -547,6 +569,8 @@ apr_status_t h2_task_do(h2_task *task, apr_thread_t *thread, int worker_id) */ int slave_id, free_bits; + task->id = apr_psprintf(task->pool, "%ld-%d", c->master->id, + task->stream_id); if (sizeof(unsigned long) >= 8) { free_bits = 32; slave_id = task->stream_id; @@ -558,12 +582,31 @@ apr_status_t h2_task_do(h2_task *task, apr_thread_t *thread, int worker_id) free_bits = 8; slave_id = worker_id; } - task->c->id = (task->c->master->id << free_bits)^slave_id; + task->c->id = (c->master->id << free_bits)^slave_id; + c->keepalive = AP_CONN_KEEPALIVE; } + + h2_beam_create(&task->output.beam, c->pool, task->stream_id, "output", + H2_BEAM_OWNER_SEND, 0, task->timeout); + if (!task->output.beam) { + return APR_ENOMEM; + } + + h2_beam_buffer_size_set(task->output.beam, task->output.max_buffer); + h2_beam_send_from(task->output.beam, task->pool); - task->input.bb = apr_brigade_create(task->pool, task->c->bucket_alloc); + h2_ctx_create_for(c, task); + apr_table_setn(c->notes, H2_TASK_ID_NOTE, task->id); + + if (task->input.beam) { + h2_beam_mutex_enable(task->input.beam); + } + + h2_slave_run_pre_connection(c, ap_get_conn_socket(c)); + + task->input.bb = apr_brigade_create(task->pool, c->bucket_alloc); if (task->request->serialize) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c, + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "h2_task(%s): serialize request %s %s", task->id, task->request->method, task->request->path); apr_brigade_printf(task->input.bb, NULL, @@ -573,20 +616,21 @@ apr_status_t h2_task_do(h2_task *task, apr_thread_t *thread, int worker_id) apr_brigade_puts(task->input.bb, NULL, NULL, "\r\n"); } - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c, + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "h2_task(%s): process connection", task->id); + task->c->current_thread = thread; - ap_run_process_connection(task->c); + ap_run_process_connection(c); if (task->frozen) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c, + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "h2_task(%s): process_conn returned frozen task", task->id); /* cleanup delayed */ return APR_EAGAIN; } else { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c, + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "h2_task(%s): processing done", task->id); return output_finish(task); } diff --git a/modules/http2/h2_task.h b/modules/http2/h2_task.h index b2aaf80777..a0875574ec 100644 --- a/modules/http2/h2_task.h +++ b/modules/http2/h2_task.h @@ -73,6 +73,7 @@ struct h2_task { unsigned int copy_files : 1; struct h2_response_parser *rparser; apr_bucket_brigade *bb; + apr_size_t max_buffer; } output; struct h2_mplx *mplx; @@ -91,7 +92,11 @@ struct h2_task { struct h2_req_engine *assigned; /* engine that task has been assigned to */ }; -h2_task *h2_task_create(struct h2_stream *stream, conn_rec *slave); +h2_task *h2_task_create(conn_rec *slave, int stream_id, + const h2_request *req, struct h2_mplx *m, + struct h2_bucket_beam *input, + apr_interval_time_t timeout, + apr_size_t output_max_mem); void h2_task_destroy(h2_task *task); diff --git a/modules/http2/h2_util.c b/modules/http2/h2_util.c index 171c195ca1..0389193e88 100644 --- a/modules/http2/h2_util.c +++ b/modules/http2/h2_util.c @@ -15,6 +15,8 @@ #include #include +#include +#include #include #include @@ -603,6 +605,294 @@ int h2_iq_contains(h2_iqueue *q, int sid) return 0; } +/******************************************************************************* + * FIFO queue + ******************************************************************************/ + +struct h2_fifo { + void **elems; + int nelems; + int head; + int count; + int aborted; + apr_thread_mutex_t *lock; + apr_thread_cond_t *not_empty; + apr_thread_cond_t *not_full; +}; + +static int nth_index(h2_fifo *fifo, int n) +{ + return (fifo->head + n) % fifo->nelems; +} + +static apr_status_t fifo_destroy(void *data) +{ + h2_fifo *fifo = data; + + apr_thread_cond_destroy(fifo->not_empty); + apr_thread_cond_destroy(fifo->not_full); + apr_thread_mutex_destroy(fifo->lock); + + return APR_SUCCESS; +} + +apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity) +{ + apr_status_t rv; + h2_fifo *fifo; + + fifo = apr_pcalloc(pool, sizeof(*fifo)); + if (fifo == NULL) { + return APR_ENOMEM; + } + + rv = apr_thread_mutex_create(&fifo->lock, + APR_THREAD_MUTEX_UNNESTED, pool); + if (rv != APR_SUCCESS) { + return rv; + } + + rv = apr_thread_cond_create(&fifo->not_empty, pool); + if (rv != APR_SUCCESS) { + return rv; + } + + rv = apr_thread_cond_create(&fifo->not_full, pool); + if (rv != APR_SUCCESS) { + return rv; + } + + fifo->elems = apr_pcalloc(pool, capacity * sizeof(void*)); + if (fifo->elems == NULL) { + return APR_ENOMEM; + } + fifo->nelems = capacity; + + *pfifo = fifo; + apr_pool_cleanup_register(pool, fifo, fifo_destroy, apr_pool_cleanup_null); + + return APR_SUCCESS; +} + +apr_status_t h2_fifo_term(h2_fifo *fifo) +{ + apr_status_t rv; + if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) { + fifo->aborted = 1; + apr_thread_mutex_unlock(fifo->lock); + } + return rv; +} + +apr_status_t h2_fifo_interrupt(h2_fifo *fifo) +{ + apr_status_t rv; + if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) { + apr_thread_cond_broadcast(fifo->not_empty); + apr_thread_cond_broadcast(fifo->not_full); + apr_thread_mutex_unlock(fifo->lock); + } + return rv; +} + +int h2_fifo_count(h2_fifo *fifo) +{ + return fifo->count; +} + +static apr_status_t check_not_empty(h2_fifo *fifo, int block) +{ + if (fifo->count == 0) { + if (!block) { + return APR_EAGAIN; + } + while (fifo->count == 0) { + if (fifo->aborted) { + return APR_EOF; + } + apr_thread_cond_wait(fifo->not_empty, fifo->lock); + } + } + return APR_SUCCESS; +} + +static apr_status_t fifo_push(h2_fifo *fifo, void *elem, int block) +{ + apr_status_t rv; + + if (fifo->aborted) { + return APR_EOF; + } + + if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) { + if (fifo->count == fifo->nelems) { + if (block) { + while (fifo->count == fifo->nelems) { + if (fifo->aborted) { + apr_thread_mutex_unlock(fifo->lock); + return APR_EOF; + } + apr_thread_cond_wait(fifo->not_full, fifo->lock); + } + } + else { + apr_thread_mutex_unlock(fifo->lock); + return APR_EAGAIN; + } + } + + ap_assert(fifo->count < fifo->nelems); + fifo->elems[nth_index(fifo, fifo->count)] = elem; + ++fifo->count; + if (fifo->count == 1) { + apr_thread_cond_broadcast(fifo->not_empty); + } + apr_thread_mutex_unlock(fifo->lock); + } + return rv; +} + +apr_status_t h2_fifo_push(h2_fifo *fifo, void *elem) +{ + return fifo_push(fifo, elem, 1); +} + +apr_status_t h2_fifo_try_push(h2_fifo *fifo, void *elem) +{ + return fifo_push(fifo, elem, 0); +} + +static apr_status_t fifo_pull(h2_fifo *fifo, void **pelem, int block) +{ + apr_status_t rv; + + if (fifo->aborted) { + return APR_EOF; + } + + if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) { + if ((rv = check_not_empty(fifo, block)) != APR_SUCCESS) { + apr_thread_mutex_unlock(fifo->lock); + *pelem = NULL; + return rv; + } + + ap_assert(fifo->count > 0); + *pelem = fifo->elems[fifo->head]; + --fifo->count; + if (fifo->count > 0) { + fifo->head = nth_index(fifo, 1); + if (fifo->count+1 == fifo->nelems) { + apr_thread_cond_broadcast(fifo->not_full); + } + } + apr_thread_mutex_unlock(fifo->lock); + } + return rv; +} + +apr_status_t h2_fifo_pull(h2_fifo *fifo, void **pelem) +{ + return fifo_pull(fifo, pelem, 1); +} + +apr_status_t h2_fifo_try_pull(h2_fifo *fifo, void **pelem) +{ + return fifo_pull(fifo, pelem, 0); +} + +static apr_status_t fifo_peek(h2_fifo *fifo, h2_fifo_peek_fn *fn, void *ctx, int block) +{ + apr_status_t rv; + void *elem; + + if (fifo->aborted) { + return APR_EOF; + } + + if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) { + if ((rv = check_not_empty(fifo, block)) != APR_SUCCESS) { + apr_thread_mutex_unlock(fifo->lock); + return rv; + } + + ap_assert(fifo->count > 0); + elem = fifo->elems[fifo->head]; + + switch (fn(elem, ctx)) { + case H2_FIFO_OP_PULL: + --fifo->count; + if (fifo->count > 0) { + fifo->head = nth_index(fifo, 1); + if (fifo->count+1 == fifo->nelems) { + apr_thread_cond_broadcast(fifo->not_full); + } + } + break; + case H2_FIFO_OP_REPUSH: + if (fifo->count > 1) { + fifo->head = nth_index(fifo, 1); + if (fifo->count < fifo->nelems) { + fifo->elems[nth_index(fifo, fifo->count-1)] = elem; + } + } + break; + } + + apr_thread_mutex_unlock(fifo->lock); + } + return rv; +} + +apr_status_t h2_fifo_peek(h2_fifo *fifo, h2_fifo_peek_fn *fn, void *ctx) +{ + return fifo_peek(fifo, fn, ctx, 1); +} + +apr_status_t h2_fifo_try_peek(h2_fifo *fifo, h2_fifo_peek_fn *fn, void *ctx) +{ + return fifo_peek(fifo, fn, ctx, 0); +} + +apr_status_t h2_fifo_remove(h2_fifo *fifo, void *elem) +{ + apr_status_t rv; + + if (fifo->aborted) { + return APR_EOF; + } + + if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) { + int i, rc; + void *e; + + rc = 0; + for (i = 0; i < fifo->count; ++i) { + e = fifo->elems[nth_index(fifo, i)]; + if (e == elem) { + ++rc; + } + else if (rc) { + fifo->elems[nth_index(fifo, i-rc)] = e; + } + } + if (rc) { + fifo->count -= rc; + if (fifo->count + rc == fifo->nelems) { + apr_thread_cond_broadcast(fifo->not_full); + } + rv = APR_SUCCESS; + } + else { + rv = APR_EAGAIN; + } + + apr_thread_mutex_unlock(fifo->lock); + } + return rv; +} + + /******************************************************************************* * h2_util for apt_table_t ******************************************************************************/ @@ -701,17 +991,16 @@ apr_status_t h2_brigade_concat_length(apr_bucket_brigade *dest, apr_bucket_brigade *src, apr_off_t length) { - apr_bucket *b, *next; + apr_bucket *b; apr_off_t remain = length; apr_status_t status = APR_SUCCESS; - for (b = APR_BRIGADE_FIRST(src); - b != APR_BRIGADE_SENTINEL(src); - b = next) { - next = APR_BUCKET_NEXT(b); + while (!APR_BRIGADE_EMPTY(src)) { + b = APR_BRIGADE_FIRST(src); if (APR_BUCKET_IS_METADATA(b)) { - /* fall through */ + APR_BUCKET_REMOVE(b); + APR_BRIGADE_INSERT_TAIL(dest, b); } else { if (remain == b->length) { @@ -734,10 +1023,10 @@ apr_status_t h2_brigade_concat_length(apr_bucket_brigade *dest, apr_bucket_split(b, remain); } } + APR_BUCKET_REMOVE(b); + APR_BRIGADE_INSERT_TAIL(dest, b); + remain -= b->length; } - APR_BUCKET_REMOVE(b); - APR_BRIGADE_INSERT_TAIL(dest, b); - remain -= b->length; } return status; } @@ -925,55 +1214,14 @@ apr_size_t h2_util_bucket_print(char *buffer, apr_size_t bmax, if (bmax <= off) { return off; } - if (APR_BUCKET_IS_METADATA(b)) { - if (APR_BUCKET_IS_EOS(b)) { - off += apr_snprintf(buffer+off, bmax-off, "eos"); - } - else if (APR_BUCKET_IS_FLUSH(b)) { - off += apr_snprintf(buffer+off, bmax-off, "flush"); - } - else if (AP_BUCKET_IS_EOR(b)) { - off += apr_snprintf(buffer+off, bmax-off, "eor"); - } - else { - off += apr_snprintf(buffer+off, bmax-off, "%s", b->type->name); - } + else if (APR_BUCKET_IS_METADATA(b)) { + off += apr_snprintf(buffer+off, bmax-off, "%s", b->type->name); } - else { - const char *btype = b->type->name; - if (APR_BUCKET_IS_FILE(b)) { - btype = "file"; - } - else if (APR_BUCKET_IS_PIPE(b)) { - btype = "pipe"; - } - else if (APR_BUCKET_IS_SOCKET(b)) { - btype = "socket"; - } - else if (APR_BUCKET_IS_HEAP(b)) { - btype = "heap"; - } - else if (APR_BUCKET_IS_TRANSIENT(b)) { - btype = "transient"; - } - else if (APR_BUCKET_IS_IMMORTAL(b)) { - btype = "immortal"; - } -#if APR_HAS_MMAP - else if (APR_BUCKET_IS_MMAP(b)) { - btype = "mmap"; - } -#endif - else if (APR_BUCKET_IS_POOL(b)) { - btype = "pool"; - } - - if (bmax > off) { - off += apr_snprintf(buffer+off, bmax-off, "%s[%ld]", - btype, - (long)(b->length == ((apr_size_t)-1)? - -1 : b->length)); - } + else if (bmax > off) { + off += apr_snprintf(buffer+off, bmax-off, "%s[%ld]", + b->type->name, + (long)(b->length == ((apr_size_t)-1)? + -1 : b->length)); } return off; } diff --git a/modules/http2/h2_util.h b/modules/http2/h2_util.h index 7b92553445..f6a4b9a43d 100644 --- a/modules/http2/h2_util.h +++ b/modules/http2/h2_util.h @@ -183,6 +183,57 @@ size_t h2_iq_mshift(h2_iqueue *q, int *pint, size_t max); */ int h2_iq_contains(h2_iqueue *q, int sid); +/******************************************************************************* + * FIFO queue + ******************************************************************************/ + +/** + * A thread-safe FIFO queue with some extra bells and whistles, if you + * do not need anything special, better use 'apr_queue'. + */ +typedef struct h2_fifo h2_fifo; + +apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity); +apr_status_t h2_fifo_term(h2_fifo *fifo); +apr_status_t h2_fifo_interrupt(h2_fifo *fifo); + +int h2_fifo_count(h2_fifo *fifo); + +apr_status_t h2_fifo_push(h2_fifo *fifo, void *elem); +apr_status_t h2_fifo_try_push(h2_fifo *fifo, void *elem); + +apr_status_t h2_fifo_pull(h2_fifo *fifo, void **pelem); +apr_status_t h2_fifo_try_pull(h2_fifo *fifo, void **pelem); + +typedef enum { + H2_FIFO_OP_PULL, /* pull the element from the queue, ie discard it */ + H2_FIFO_OP_REPUSH, /* pull and immediatley re-push it */ +} h2_fifo_op_t; + +typedef h2_fifo_op_t h2_fifo_peek_fn(void *head, void *ctx); + +/** + * Call given function on the head of the queue, once it exists, and + * perform the returned operation on it. The queue will hold its lock during + * this time, so no other operations on the queue are possible. + * @param fifo the queue to peek at + * @param fn the function to call on the head, once available + * @param ctx context to pass in call to function + */ +apr_status_t h2_fifo_peek(h2_fifo *fifo, h2_fifo_peek_fn *fn, void *ctx); + +/** + * Non-blocking version of h2_fifo_peek. + */ +apr_status_t h2_fifo_try_peek(h2_fifo *fifo, h2_fifo_peek_fn *fn, void *ctx); + +/** + * Remove the elem from the queue, will remove multiple appearances. + * @param elem the element to remove + * @return APR_SUCCESS iff > 0 elems were removed, APR_EAGAIN otherwise. + */ +apr_status_t h2_fifo_remove(h2_fifo *fifo, void *elem); + /******************************************************************************* * common helpers ******************************************************************************/ @@ -379,8 +430,8 @@ do { \ const char *line = "(null)"; \ apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]); \ len = h2_util_bb_print(buffer, bmax, (tag), "", (bb)); \ - ap_log_cerror(APLOG_MARK, level, 0, (c), "bb_dump(%s): %s", \ - (c)->log_id, (len? buffer : line)); \ + ap_log_cerror(APLOG_MARK, level, 0, (c), "bb_dump(%ld): %s", \ + ((c)->master? (c)->master->id : (c)->id), (len? buffer : line)); \ } while(0) diff --git a/modules/http2/h2_version.h b/modules/http2/h2_version.h index 6dcee88135..e6765e5a03 100644 --- a/modules/http2/h2_version.h +++ b/modules/http2/h2_version.h @@ -26,7 +26,7 @@ * @macro * Version number of the http2 module as c string */ -#define MOD_HTTP2_VERSION "1.9.3" +#define MOD_HTTP2_VERSION "1.10.0" /** * @macro @@ -34,7 +34,7 @@ * release. This is a 24 bit number with 8 bits for major number, 8 bits * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203. */ -#define MOD_HTTP2_VERSION_NUM 0x010903 +#define MOD_HTTP2_VERSION_NUM 0x010a00 #endif /* mod_h2_h2_version_h */ diff --git a/modules/http2/h2_worker.c b/modules/http2/h2_worker.c deleted file mode 100644 index 84e8f989eb..0000000000 --- a/modules/http2/h2_worker.c +++ /dev/null @@ -1,103 +0,0 @@ -/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include - -#include - -#include -#include -#include -#include - -#include "h2.h" -#include "h2_private.h" -#include "h2_conn.h" -#include "h2_ctx.h" -#include "h2_h2.h" -#include "h2_mplx.h" -#include "h2_task.h" -#include "h2_worker.h" - -static void* APR_THREAD_FUNC execute(apr_thread_t *thread, void *wctx) -{ - h2_worker *worker = (h2_worker *)wctx; - int sticky; - - while (!worker->aborted) { - h2_task *task; - - /* Get a h2_task from the main workers queue. */ - worker->get_next(worker, worker->ctx, &task, &sticky); - while (task) { - - h2_task_do(task, thread, worker->id); - /* report the task done and maybe get another one from the same - * mplx (= master connection), if we can be sticky. - */ - if (sticky && !worker->aborted) { - h2_mplx_task_done(task->mplx, task, &task); - } - else { - h2_mplx_task_done(task->mplx, task, NULL); - task = NULL; - } - } - } - - worker->worker_done(worker, worker->ctx); - return NULL; -} - -h2_worker *h2_worker_create(int id, - apr_pool_t *pool, - apr_threadattr_t *attr, - h2_worker_mplx_next_fn *get_next, - h2_worker_done_fn *worker_done, - void *ctx) -{ - h2_worker *w = apr_pcalloc(pool, sizeof(h2_worker)); - if (w) { - w->id = id; - APR_RING_ELEM_INIT(w, link); - w->get_next = get_next; - w->worker_done = worker_done; - w->ctx = ctx; - apr_thread_create(&w->thread, attr, execute, w, pool); - } - return w; -} - -apr_status_t h2_worker_destroy(h2_worker *worker) -{ - if (worker->thread) { - apr_status_t status; - apr_thread_join(&status, worker->thread); - worker->thread = NULL; - } - return APR_SUCCESS; -} - -void h2_worker_abort(h2_worker *worker) -{ - worker->aborted = 1; -} - -int h2_worker_is_aborted(h2_worker *worker) -{ - return worker->aborted; -} - - diff --git a/modules/http2/h2_worker.h b/modules/http2/h2_worker.h deleted file mode 100644 index 04ff570361..0000000000 --- a/modules/http2/h2_worker.h +++ /dev/null @@ -1,135 +0,0 @@ -/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef __mod_h2__h2_worker__ -#define __mod_h2__h2_worker__ - -struct h2_mplx; -struct h2_request; -struct h2_task; - -/* h2_worker is a basically a apr_thread_t that reads fromt he h2_workers - * task queue and runs h2_tasks it is given. - */ -typedef struct h2_worker h2_worker; - -/* Invoked when the worker wants a new task to process. Will block - * until a h2_mplx becomes available or the worker itself - * gets aborted (idle timeout, for example). */ -typedef apr_status_t h2_worker_mplx_next_fn(h2_worker *worker, - void *ctx, - struct h2_task **ptask, - int *psticky); - -/* Invoked just before the worker thread exits. */ -typedef void h2_worker_done_fn(h2_worker *worker, void *ctx); - - -struct h2_worker { - int id; - /** Links to the rest of the workers */ - APR_RING_ENTRY(h2_worker) link; - apr_thread_t *thread; - h2_worker_mplx_next_fn *get_next; - h2_worker_done_fn *worker_done; - void *ctx; - int aborted; -}; - -/** - * The magic pointer value that indicates the head of a h2_worker list - * @param b The worker list - * @return The magic pointer value - */ -#define H2_WORKER_LIST_SENTINEL(b) APR_RING_SENTINEL((b), h2_worker, link) - -/** - * Determine if the worker list is empty - * @param b The list to check - * @return true or false - */ -#define H2_WORKER_LIST_EMPTY(b) APR_RING_EMPTY((b), h2_worker, link) - -/** - * Return the first worker in a list - * @param b The list to query - * @return The first worker in the list - */ -#define H2_WORKER_LIST_FIRST(b) APR_RING_FIRST(b) - -/** - * Return the last worker in a list - * @param b The list to query - * @return The last worker int he list - */ -#define H2_WORKER_LIST_LAST(b) APR_RING_LAST(b) - -/** - * Insert a single worker at the front of a list - * @param b The list to add to - * @param e The worker to insert - */ -#define H2_WORKER_LIST_INSERT_HEAD(b, e) do { \ - h2_worker *ap__b = (e); \ - APR_RING_INSERT_HEAD((b), ap__b, h2_worker, link); \ - } while (0) - -/** - * Insert a single worker at the end of a list - * @param b The list to add to - * @param e The worker to insert - */ -#define H2_WORKER_LIST_INSERT_TAIL(b, e) do { \ - h2_worker *ap__b = (e); \ - APR_RING_INSERT_TAIL((b), ap__b, h2_worker, link); \ - } while (0) - -/** - * Get the next worker in the list - * @param e The current worker - * @return The next worker - */ -#define H2_WORKER_NEXT(e) APR_RING_NEXT((e), link) -/** - * Get the previous worker in the list - * @param e The current worker - * @return The previous worker - */ -#define H2_WORKER_PREV(e) APR_RING_PREV((e), link) - -/** - * Remove a worker from its list - * @param e The worker to remove - */ -#define H2_WORKER_REMOVE(e) APR_RING_REMOVE((e), link) - - -/* Create a new worker with given id, pool and attributes, callbacks - * callback parameter. - */ -h2_worker *h2_worker_create(int id, - apr_pool_t *pool, - apr_threadattr_t *attr, - h2_worker_mplx_next_fn *get_next, - h2_worker_done_fn *worker_done, - void *ctx); - -apr_status_t h2_worker_destroy(h2_worker *worker); - -void h2_worker_abort(h2_worker *worker); - -int h2_worker_is_aborted(h2_worker *worker); - -#endif /* defined(__mod_h2__h2_worker__) */ diff --git a/modules/http2/h2_workers.c b/modules/http2/h2_workers.c index 1dcfb2fcd7..fa395255e9 100644 --- a/modules/http2/h2_workers.c +++ b/modules/http2/h2_workers.c @@ -27,221 +27,248 @@ #include "h2_private.h" #include "h2_mplx.h" #include "h2_task.h" -#include "h2_worker.h" #include "h2_workers.h" +#include "h2_util.h" +typedef struct h2_slot h2_slot; +struct h2_slot { + int id; + h2_slot *next; + h2_workers *workers; + int aborted; + int sticks; + h2_task *task; + apr_thread_t *thread; + apr_thread_cond_t *not_idle; +}; -static int in_list(h2_workers *workers, h2_mplx *m) +static h2_slot *pop_slot(h2_slot **phead) { - h2_mplx *e; - for (e = H2_MPLX_LIST_FIRST(&workers->mplxs); - e != H2_MPLX_LIST_SENTINEL(&workers->mplxs); - e = H2_MPLX_NEXT(e)) { - if (e == m) { - return 1; + /* Atomically pop a slot from the list */ + for (;;) { + h2_slot *first = *phead; + if (first == NULL) { + return NULL; + } + if (apr_atomic_casptr((void*)phead, first->next, first) == first) { + first->next = NULL; + return first; } } - return 0; } -static void cleanup_zombies(h2_workers *workers, int lock) +static void push_slot(h2_slot **phead, h2_slot *slot) { - if (lock) { - apr_thread_mutex_lock(workers->lock); + /* Atomically push a slot to the list */ + ap_assert(!slot->next); + for (;;) { + h2_slot *next = slot->next = *phead; + if (apr_atomic_casptr((void*)phead, slot, next) == next) { + return; + } + } +} + +static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx); + +static apr_status_t activate_slot(h2_workers *workers, h2_slot *slot) +{ + apr_status_t status; + + slot->workers = workers; + slot->aborted = 0; + slot->task = NULL; + if (!slot->not_idle) { + status = apr_thread_cond_create(&slot->not_idle, workers->pool); + if (status != APR_SUCCESS) { + push_slot(&workers->free, slot); + return status; + } } - while (!H2_WORKER_LIST_EMPTY(&workers->zombies)) { - h2_worker *zombie = H2_WORKER_LIST_FIRST(&workers->zombies); - H2_WORKER_REMOVE(zombie); - ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, - "h2_workers: cleanup zombie %d", zombie->id); - h2_worker_destroy(zombie); + + /* thread will either immediately start work or add itself + * to the idle queue */ + apr_thread_create(&slot->thread, workers->thread_attr, slot_run, slot, + workers->pool); + if (!slot->thread) { + push_slot(&workers->free, slot); + return APR_ENOMEM; + } + + ++workers->worker_count; + return APR_SUCCESS; +} + +static apr_status_t add_worker(h2_workers *workers) +{ + h2_slot *slot = pop_slot(&workers->free); + if (slot) { + return activate_slot(workers, slot); } - if (lock) { + return APR_EAGAIN; +} + +static void wake_idle_worker(h2_workers *workers) +{ + h2_slot *slot = pop_slot(&workers->idle); + if (slot) { + apr_thread_mutex_lock(workers->lock); + apr_thread_cond_signal(slot->not_idle); apr_thread_mutex_unlock(workers->lock); } + else if (workers->dynamic) { + add_worker(workers); + } } -static h2_task *next_task(h2_workers *workers) +static void cleanup_zombies(h2_workers *workers) { - h2_task *task = NULL; - h2_mplx *last = NULL; - int has_more; - - /* Get the next h2_mplx to process that has a task to hand out. - * If it does, place it at the end of the queu and return the - * task to the worker. - * If it (currently) has no tasks, remove it so that it needs - * to register again for scheduling. - * If we run out of h2_mplx in the queue, we need to wait for - * new mplx to arrive. Depending on how many workers do exist, - * we do a timed wait or block indefinitely. - */ - while (!task && !H2_MPLX_LIST_EMPTY(&workers->mplxs)) { - h2_mplx *m = H2_MPLX_LIST_FIRST(&workers->mplxs); - - if (last == m) { - break; - } - H2_MPLX_REMOVE(m); - --workers->mplx_count; - - task = h2_mplx_pop_task(m, &has_more); - if (has_more) { - H2_MPLX_LIST_INSERT_TAIL(&workers->mplxs, m); - ++workers->mplx_count; - if (!last) { - last = m; - } + h2_slot *slot; + while ((slot = pop_slot(&workers->zombies))) { + if (slot->thread) { + apr_status_t status; + apr_thread_join(&status, slot->thread); + slot->thread = NULL; } + --workers->worker_count; + push_slot(&workers->free, slot); } - return task; +} + +static apr_status_t slot_pull_task(h2_slot *slot, h2_mplx *m) +{ + int has_more; + slot->task = h2_mplx_pop_task(m, &has_more); + if (slot->task) { + /* Ok, we got something to give back to the worker for execution. + * If we still have idle workers, we let the worker be sticky, + * e.g. making it poll the task's h2_mplx instance for more work + * before asking back here. */ + slot->sticks = slot->workers->max_workers; + return has_more? APR_EAGAIN : APR_SUCCESS; + } + slot->sticks = 0; + return APR_EOF; +} + +static h2_fifo_op_t mplx_peek(void *head, void *ctx) +{ + h2_mplx *m = head; + h2_slot *slot = ctx; + + if (slot_pull_task(slot, m) == APR_EAGAIN) { + wake_idle_worker(slot->workers); + return H2_FIFO_OP_REPUSH; + } + return H2_FIFO_OP_PULL; } /** * Get the next task for the given worker. Will block until a task arrives * or the max_wait timer expires and more than min workers exist. */ -static apr_status_t get_mplx_next(h2_worker *worker, void *ctx, - h2_task **ptask, int *psticky) +static apr_status_t get_next(h2_slot *slot) { + h2_workers *workers = slot->workers; apr_status_t status; - apr_time_t wait_until = 0, now; - h2_workers *workers = ctx; - h2_task *task = NULL; - *ptask = NULL; - *psticky = 0; - - status = apr_thread_mutex_lock(workers->lock); - if (status == APR_SUCCESS) { - ++workers->idle_workers; - ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, - "h2_worker(%d): looking for work", worker->id); - - while (!h2_worker_is_aborted(worker) && !workers->aborted - && !(task = next_task(workers))) { - - /* Need to wait for a new tasks to arrive. If we are above - * minimum workers, we do a timed wait. When timeout occurs - * and we have still more workers, we shut down one after - * the other. */ - cleanup_zombies(workers, 0); - if (workers->worker_count > workers->min_workers) { - now = apr_time_now(); - if (now >= wait_until) { - wait_until = now + apr_time_from_sec(workers->max_idle_secs); - } - - ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, - "h2_worker(%d): waiting signal, " - "workers=%d, idle=%d", worker->id, - (int)workers->worker_count, - workers->idle_workers); - status = apr_thread_cond_timedwait(workers->mplx_added, - workers->lock, - wait_until - now); - if (status == APR_TIMEUP - && workers->worker_count > workers->min_workers) { - /* waited long enough without getting a task and - * we are above min workers, abort this one. */ - ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, - workers->s, - "h2_workers: aborting idle worker"); - h2_worker_abort(worker); - break; - } - } - else { - ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, - "h2_worker(%d): waiting signal (eternal), " - "worker_count=%d, idle=%d", worker->id, - (int)workers->worker_count, - workers->idle_workers); - apr_thread_cond_wait(workers->mplx_added, workers->lock); + slot->task = NULL; + while (!slot->aborted) { + if (!slot->task) { + status = h2_fifo_try_peek(workers->mplxs, mplx_peek, slot); + if (status == APR_EOF) { + return status; } } - /* Here, we either have gotten task or decided to shut down - * the calling worker. - */ - if (task) { - /* Ok, we got something to give back to the worker for execution. - * If we have more idle workers than h2_mplx in our queue, then - * we let the worker be sticky, e.g. making it poll the task's - * h2_mplx instance for more work before asking back here. - * This avoids entering our global lock as long as enough idle - * workers remain. Stickiness of a worker ends when the connection - * has no new tasks to process, so the worker will get back here - * eventually. - */ - *ptask = task; - *psticky = (workers->max_workers >= workers->mplx_count); - - if (workers->mplx_count && workers->idle_workers > 1) { - apr_thread_cond_signal(workers->mplx_added); - } + if (slot->task) { + return APR_SUCCESS; } + apr_thread_mutex_lock(workers->lock); + cleanup_zombies(workers); + + ++workers->idle_workers; + push_slot(&workers->idle, slot); + apr_thread_cond_wait(slot->not_idle, workers->lock); --workers->idle_workers; + apr_thread_mutex_unlock(workers->lock); } - - return *ptask? APR_SUCCESS : APR_EOF; + return APR_EOF; } -static void worker_done(h2_worker *worker, void *ctx) +static void slot_done(h2_slot *slot) { - h2_workers *workers = ctx; - apr_status_t status = apr_thread_mutex_lock(workers->lock); - if (status == APR_SUCCESS) { - ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, - "h2_worker(%d): done", worker->id); - H2_WORKER_REMOVE(worker); - --workers->worker_count; - H2_WORKER_LIST_INSERT_TAIL(&workers->zombies, worker); - - apr_thread_mutex_unlock(workers->lock); - } + push_slot(&(slot->workers->zombies), slot); } -static apr_status_t add_worker(h2_workers *workers) + +static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx) { - h2_worker *w = h2_worker_create(workers->next_worker_id++, - workers->pool, workers->thread_attr, - get_mplx_next, worker_done, workers); - if (!w) { - return APR_ENOMEM; + h2_slot *slot = wctx; + + while (!slot->aborted) { + + /* Get a h2_task from the mplxs queue. */ + get_next(slot); + while (slot->task) { + + h2_task_do(slot->task, thread, slot->id); + + /* Report the task as done. If stickyness is left, offer the + * mplx the opportunity to give us back a new task right away. + */ + if (!slot->aborted && (--slot->sticks > 0)) { + h2_mplx_task_done(slot->task->mplx, slot->task, &slot->task); + } + else { + h2_mplx_task_done(slot->task->mplx, slot->task, NULL); + slot->task = NULL; + } + } } - ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, - "h2_workers: adding worker(%d)", w->id); - ++workers->worker_count; - H2_WORKER_LIST_INSERT_TAIL(&workers->workers, w); - return APR_SUCCESS; + + slot_done(slot); + return NULL; } -static apr_status_t h2_workers_start(h2_workers *workers) +static apr_status_t workers_pool_cleanup(void *data) { - apr_status_t status = apr_thread_mutex_lock(workers->lock); - if (status == APR_SUCCESS) { - ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, - "h2_workers: starting"); - - while (workers->worker_count < workers->min_workers - && status == APR_SUCCESS) { - status = add_worker(workers); + h2_workers *workers = data; + h2_slot *slot; + + if (!workers->aborted) { + apr_thread_mutex_lock(workers->lock); + workers->aborted = 1; + /* before we go, cleanup any zombies and abort the rest */ + cleanup_zombies(workers); + for (;;) { + slot = pop_slot(&workers->idle); + if (slot) { + slot->aborted = 1; + apr_thread_cond_signal(slot->not_idle); + } + else { + break; + } } apr_thread_mutex_unlock(workers->lock); + + h2_fifo_term(workers->mplxs); + h2_fifo_interrupt(workers->mplxs); } - return status; + return APR_SUCCESS; } h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool, int min_workers, int max_workers, - apr_size_t max_tx_handles) + int idle_secs) { apr_status_t status; h2_workers *workers; apr_pool_t *pool; + int i, n; ap_assert(s); ap_assert(server_pool); @@ -254,163 +281,77 @@ h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool, apr_pool_create(&pool, server_pool); apr_pool_tag(pool, "h2_workers"); workers = apr_pcalloc(pool, sizeof(h2_workers)); - if (workers) { - workers->s = s; - workers->pool = pool; - workers->min_workers = min_workers; - workers->max_workers = max_workers; - workers->max_idle_secs = 10; - - workers->max_tx_handles = max_tx_handles; - workers->spare_tx_handles = workers->max_tx_handles; - - apr_threadattr_create(&workers->thread_attr, workers->pool); - if (ap_thread_stacksize != 0) { - apr_threadattr_stacksize_set(workers->thread_attr, - ap_thread_stacksize); - ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, s, - "h2_workers: using stacksize=%ld", - (long)ap_thread_stacksize); - } - - APR_RING_INIT(&workers->workers, h2_worker, link); - APR_RING_INIT(&workers->zombies, h2_worker, link); - APR_RING_INIT(&workers->mplxs, h2_mplx, link); - - status = apr_thread_mutex_create(&workers->lock, - APR_THREAD_MUTEX_DEFAULT, - workers->pool); - if (status == APR_SUCCESS) { - status = apr_thread_cond_create(&workers->mplx_added, workers->pool); - } - - if (status == APR_SUCCESS) { - status = apr_thread_mutex_create(&workers->tx_lock, - APR_THREAD_MUTEX_DEFAULT, - workers->pool); - } - - if (status == APR_SUCCESS) { - status = h2_workers_start(workers); - } - - if (status != APR_SUCCESS) { - h2_workers_destroy(workers); - workers = NULL; - } + if (!workers) { + return NULL; } - return workers; -} - -void h2_workers_destroy(h2_workers *workers) -{ - /* before we go, cleanup any zombie workers that may have accumulated */ - cleanup_zombies(workers, 1); - if (workers->mplx_added) { - apr_thread_cond_destroy(workers->mplx_added); - workers->mplx_added = NULL; - } - if (workers->lock) { - apr_thread_mutex_destroy(workers->lock); - workers->lock = NULL; - } - while (!H2_MPLX_LIST_EMPTY(&workers->mplxs)) { - h2_mplx *m = H2_MPLX_LIST_FIRST(&workers->mplxs); - H2_MPLX_REMOVE(m); + workers->s = s; + workers->pool = pool; + workers->min_workers = min_workers; + workers->max_workers = max_workers; + workers->max_idle_secs = (idle_secs > 0)? idle_secs : 10; + + status = h2_fifo_create(&workers->mplxs, pool, 2 * workers->max_workers); + if (status != APR_SUCCESS) { + return NULL; } - while (!H2_WORKER_LIST_EMPTY(&workers->workers)) { - h2_worker *w = H2_WORKER_LIST_FIRST(&workers->workers); - H2_WORKER_REMOVE(w); + + status = apr_threadattr_create(&workers->thread_attr, workers->pool); + if (status != APR_SUCCESS) { + return NULL; } - if (workers->pool) { - apr_pool_destroy(workers->pool); - /* workers is gone */ + + if (ap_thread_stacksize != 0) { + apr_threadattr_stacksize_set(workers->thread_attr, + ap_thread_stacksize); + ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, s, + "h2_workers: using stacksize=%ld", + (long)ap_thread_stacksize); } -} - -apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m) -{ - apr_status_t status = apr_thread_mutex_lock(workers->lock); - if (status == APR_SUCCESS) { - ap_log_error(APLOG_MARK, APLOG_TRACE3, status, workers->s, - "h2_workers: register mplx(%ld), idle=%d", - m->id, workers->idle_workers); - if (in_list(workers, m)) { - status = APR_EAGAIN; - } - else { - H2_MPLX_LIST_INSERT_TAIL(&workers->mplxs, m); - ++workers->mplx_count; - status = APR_SUCCESS; - } - - if (workers->idle_workers > 0) { - apr_thread_cond_signal(workers->mplx_added); + + status = apr_thread_mutex_create(&workers->lock, + APR_THREAD_MUTEX_DEFAULT, + workers->pool); + if (status == APR_SUCCESS) { + n = workers->nslots = workers->max_workers; + workers->slots = apr_pcalloc(workers->pool, n * sizeof(h2_slot)); + if (workers->slots == NULL) { + workers->nslots = 0; + status = APR_ENOMEM; } - else if (status == APR_SUCCESS - && workers->worker_count < workers->max_workers) { - ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, - "h2_workers: got %d worker, adding 1", - workers->worker_count); - add_worker(workers); + for (i = 0; i < n; ++i) { + workers->slots[i].id = i; } - apr_thread_mutex_unlock(workers->lock); } - return status; -} - -apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m) -{ - apr_status_t status = apr_thread_mutex_lock(workers->lock); if (status == APR_SUCCESS) { - status = APR_EAGAIN; - if (in_list(workers, m)) { - H2_MPLX_REMOVE(m); - status = APR_SUCCESS; + /* we activate all for now, TODO: support min_workers again. + * do this in reverse for vanity reasons so slot 0 will most + * likely be at head of idle queue. */ + n = workers->max_workers; + for (i = n-1; i >= 0; --i) { + status = activate_slot(workers, &workers->slots[i]); } - apr_thread_mutex_unlock(workers->lock); + /* the rest of the slots go on the free list */ + for(i = n; i < workers->nslots; ++i) { + push_slot(&workers->free, &workers->slots[i]); + } + workers->dynamic = (workers->worker_count < workers->max_workers); } - return status; -} - -void h2_workers_set_max_idle_secs(h2_workers *workers, int idle_secs) -{ - if (idle_secs <= 0) { - ap_log_error(APLOG_MARK, APLOG_WARNING, 0, workers->s, - APLOGNO(02962) "h2_workers: max_worker_idle_sec value of %d" - " is not valid, ignored.", idle_secs); - return; + if (status == APR_SUCCESS) { + apr_pool_pre_cleanup_register(pool, workers, workers_pool_cleanup); + return workers; } - workers->max_idle_secs = idle_secs; + return NULL; } -apr_size_t h2_workers_tx_reserve(h2_workers *workers, apr_size_t count) +apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m) { - apr_status_t status = apr_thread_mutex_lock(workers->tx_lock); - if (status == APR_SUCCESS) { - count = H2MIN(workers->spare_tx_handles, count); - workers->spare_tx_handles -= count; - ap_log_error(APLOG_MARK, APLOG_TRACE2, 0, workers->s, - "h2_workers: reserved %d tx handles, %d/%d left", - (int)count, (int)workers->spare_tx_handles, - (int)workers->max_tx_handles); - apr_thread_mutex_unlock(workers->tx_lock); - return count; - } - return 0; + apr_status_t status = h2_fifo_push(workers->mplxs, m); + wake_idle_worker(workers); + return status; } -void h2_workers_tx_free(h2_workers *workers, apr_size_t count) +apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m) { - apr_status_t status = apr_thread_mutex_lock(workers->tx_lock); - if (status == APR_SUCCESS) { - workers->spare_tx_handles += count; - ap_log_error(APLOG_MARK, APLOG_TRACE2, 0, workers->s, - "h2_workers: freed %d tx handles, %d/%d left", - (int)count, (int)workers->spare_tx_handles, - (int)workers->max_tx_handles); - apr_thread_mutex_unlock(workers->tx_lock); - } + return h2_fifo_remove(workers->mplxs, m); } - diff --git a/modules/http2/h2_workers.h b/modules/http2/h2_workers.h index ae7b4d8969..30a7514cd0 100644 --- a/modules/http2/h2_workers.h +++ b/modules/http2/h2_workers.h @@ -27,6 +27,9 @@ struct apr_thread_cond_t; struct h2_mplx; struct h2_request; struct h2_task; +struct h2_fifo; + +struct h2_slot; typedef struct h2_workers h2_workers; @@ -41,22 +44,20 @@ struct h2_workers { int idle_workers; int max_idle_secs; - apr_size_t max_tx_handles; - apr_size_t spare_tx_handles; - - unsigned int aborted : 1; + int aborted; + int dynamic; apr_threadattr_t *thread_attr; + int nslots; + struct h2_slot *slots; + + struct h2_slot *free; + struct h2_slot *idle; + struct h2_slot *zombies; - APR_RING_HEAD(h2_worker_list, h2_worker) workers; - APR_RING_HEAD(h2_worker_zombies, h2_worker) zombies; - APR_RING_HEAD(h2_mplx_list, h2_mplx) mplxs; - int mplx_count; + struct h2_fifo *mplxs; struct apr_thread_mutex_t *lock; - struct apr_thread_cond_t *mplx_added; - - struct apr_thread_mutex_t *tx_lock; }; @@ -64,12 +65,7 @@ struct h2_workers { * threads. */ h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pool, - int min_size, int max_size, - apr_size_t max_tx_handles); - -/* Destroy the worker pool and all its threads. - */ -void h2_workers_destroy(h2_workers *workers); + int min_size, int max_size, int idle_secs); /** * Registers a h2_mplx for task scheduling. If this h2_mplx runs @@ -83,38 +79,4 @@ apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m); */ apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m); -/** - * Set the amount of seconds a h2_worker should wait for new tasks - * before shutting down (if there are more than the minimum number of - * workers). - */ -void h2_workers_set_max_idle_secs(h2_workers *workers, int idle_secs); - -/** - * Reservation of file handles available for transfer between workers - * and master connections. - * - * When handling output from request processing, file handles are often - * encountered when static files are served. The most efficient way is then - * to forward the handle itself to the master connection where it can be - * read or sendfile'd to the client. But file handles are a scarce resource, - * so there needs to be a limit on how many handles are transferred this way. - * - * h2_workers keeps track of the number of reserved handles and observes a - * configurable maximum value. - * - * @param workers the workers instance - * @param count how many handles the caller wishes to reserve - * @return the number of reserved handles, may be 0. - */ -apr_size_t h2_workers_tx_reserve(h2_workers *workers, apr_size_t count); - -/** - * Return a number of reserved file handles back to the pool. The number - * overall may not exceed the numbers reserved. - * @param workers the workers instance - * @param count how many handles are returned to the pool - */ -void h2_workers_tx_free(h2_workers *workers, apr_size_t count); - #endif /* defined(__mod_h2__h2_workers__) */ diff --git a/modules/http2/mod_http2.c b/modules/http2/mod_http2.c index e0a4b90883..ea399c91a2 100644 --- a/modules/http2/mod_http2.c +++ b/modules/http2/mod_http2.c @@ -61,6 +61,7 @@ typedef struct { unsigned int change_prio : 1; unsigned int sha256 : 1; unsigned int inv_headers : 1; + unsigned int dyn_windows : 1; } features; static features myfeats; @@ -96,6 +97,9 @@ static int h2_post_config(apr_pool_t *p, apr_pool_t *plog, #ifdef H2_NG2_INVALID_HEADER_CB myfeats.inv_headers = 1; #endif +#ifdef H2_NG2_LOCAL_WIN_SIZE + myfeats.dyn_windows = 1; +#endif apr_pool_userdata_get(&data, mod_h2_init_key, s->process->pool); if ( data == NULL ) { @@ -108,11 +112,12 @@ static int h2_post_config(apr_pool_t *p, apr_pool_t *plog, ngh2 = nghttp2_version(0); ap_log_error( APLOG_MARK, APLOG_INFO, 0, s, APLOGNO(03090) - "mod_http2 (v%s, feats=%s%s%s, nghttp2 %s), initializing...", + "mod_http2 (v%s, feats=%s%s%s%s, nghttp2 %s), initializing...", MOD_HTTP2_VERSION, myfeats.change_prio? "CHPRIO" : "", myfeats.sha256? "+SHA256" : "", myfeats.inv_headers? "+INVHD" : "", + myfeats.dyn_windows? "+DWINS" : "", ngh2? ngh2->version_str : "unknown"); switch (h2_conn_mpm_type()) {