From 71a201e538552b857992e7a425e336451e6b673a Mon Sep 17 00:00:00 2001 From: Stefan Eissing Date: Wed, 16 Mar 2016 15:16:00 +0000 Subject: [PATCH] backport of mod_http2 v1.4.2 git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/branches/2.4.x@1735239 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES | 3 + modules/http2/h2_bucket_eoc.h | 1 + modules/http2/h2_bucket_eos.h | 1 + modules/http2/h2_conn.c | 15 ++- modules/http2/h2_conn_io.c | 214 +++++++++++++++++++++----------- modules/http2/h2_conn_io.h | 3 +- modules/http2/h2_io.c | 182 +++++++++++++++------------ modules/http2/h2_io.h | 20 ++- modules/http2/h2_mplx.c | 219 +++++++++++++++++++-------------- modules/http2/h2_mplx.h | 22 ++-- modules/http2/h2_ngn_shed.c | 80 +++++++----- modules/http2/h2_ngn_shed.h | 17 ++- modules/http2/h2_session.c | 55 ++++++--- modules/http2/h2_session.h | 3 + modules/http2/h2_stream.c | 69 ++--------- modules/http2/h2_stream.h | 12 +- modules/http2/h2_task.c | 31 +---- modules/http2/h2_task.h | 6 +- modules/http2/h2_task_output.c | 104 ++++++---------- modules/http2/h2_task_output.h | 3 - modules/http2/h2_util.c | 37 +++--- modules/http2/h2_util.h | 10 +- modules/http2/h2_version.h | 4 +- modules/http2/mod_http2.c | 2 +- modules/http2/mod_http2.h | 18 +-- 25 files changed, 606 insertions(+), 525 deletions(-) diff --git a/CHANGES b/CHANGES index 6a0115224e..a76dcaf545 100644 --- a/CHANGES +++ b/CHANGES @@ -2,6 +2,9 @@ Changes with Apache 2.4.19 + *) mod_http2: fixes problem with wrong lifetime of file buckets on main + connection. [Stefan Eissing] + *) mod_http2: fixes incorrect denial of requests without :authority header. [Stefan Eissing] diff --git a/modules/http2/h2_bucket_eoc.h b/modules/http2/h2_bucket_eoc.h index f1cd6f8135..2d46691995 100644 --- a/modules/http2/h2_bucket_eoc.h +++ b/modules/http2/h2_bucket_eoc.h @@ -21,6 +21,7 @@ struct h2_session; /** End Of HTTP/2 SESSION (H2EOC) bucket */ extern const apr_bucket_type_t h2_bucket_type_eoc; +#define H2_BUCKET_IS_H2EOC(e) (e->type == &h2_bucket_type_eoc) apr_bucket * h2_bucket_eoc_make(apr_bucket *b, struct h2_session *session); diff --git a/modules/http2/h2_bucket_eos.h b/modules/http2/h2_bucket_eos.h index bd3360db5a..27b501dad3 100644 --- a/modules/http2/h2_bucket_eos.h +++ b/modules/http2/h2_bucket_eos.h @@ -21,6 +21,7 @@ struct h2_stream; /** End Of HTTP/2 STREAM (H2EOS) bucket */ extern const apr_bucket_type_t h2_bucket_type_eos; +#define H2_BUCKET_IS_H2EOS(e) (e->type == &h2_bucket_type_eos) apr_bucket *h2_bucket_eos_make(apr_bucket *b, struct h2_stream *stream); diff --git a/modules/http2/h2_conn.c b/modules/http2/h2_conn.c index a0cd54e6ac..60e209492c 100644 --- a/modules/http2/h2_conn.c +++ b/modules/http2/h2_conn.c @@ -261,7 +261,7 @@ conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *parent, } apr_pool_create_ex(&pool, parent, NULL, allocator); apr_pool_tag(pool, "h2_slave_conn"); - apr_allocator_owner_set(allocator, parent); + apr_allocator_owner_set(allocator, pool); c = (conn_rec *) apr_palloc(pool, sizeof(conn_rec)); if (c == NULL) { @@ -309,15 +309,18 @@ conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *parent, void h2_slave_destroy(conn_rec *slave, apr_allocator_t **pallocator) { + apr_pool_t *parent; apr_allocator_t *allocator = apr_pool_allocator_get(slave->pool); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, slave, "h2_slave_conn(%ld): destroy (task=%s)", slave->id, apr_table_get(slave->notes, H2_TASK_ID_NOTE)); - apr_pool_destroy(slave->pool); - if (pallocator) { + /* Attache the allocator to the parent pool and return it for + * reuse, otherwise the own is still the slave pool and it will + * get destroyed with it. */ + parent = apr_pool_parent_get(slave->pool); + if (pallocator && parent) { + apr_allocator_owner_set(allocator, parent); *pallocator = allocator; } - else { - apr_allocator_destroy(allocator); - } + apr_pool_destroy(slave->pool); } diff --git a/modules/http2/h2_conn_io.c b/modules/http2/h2_conn_io.c index 56d01e6732..59561ecd61 100644 --- a/modules/http2/h2_conn_io.c +++ b/modules/http2/h2_conn_io.c @@ -14,16 +14,18 @@ */ #include - +#include #include #include #include #include #include +#include #include "h2_private.h" #include "h2_bucket_eoc.h" +#include "h2_bucket_eos.h" #include "h2_config.h" #include "h2_conn_io.h" #include "h2_h2.h" @@ -43,18 +45,96 @@ * which seems to create less TCP packets overall */ #define WRITE_SIZE_MAX (TLS_DATA_MAX - 100) - #define WRITE_BUFFER_SIZE (5*WRITE_SIZE_MAX) + +static void h2_conn_io_bb_log(conn_rec *c, int stream_id, int level, + const char *tag, apr_bucket_brigade *bb) +{ + char buffer[16 * 1024]; + const char *line = "(null)"; + apr_size_t bmax = sizeof(buffer)/sizeof(buffer[0]); + int off = 0; + apr_bucket *b; + + if (bb) { + memset(buffer, 0, bmax--); + for (b = APR_BRIGADE_FIRST(bb); + bmax && (b != APR_BRIGADE_SENTINEL(bb)); + b = APR_BUCKET_NEXT(b)) { + + if (APR_BUCKET_IS_METADATA(b)) { + if (APR_BUCKET_IS_EOS(b)) { + off += apr_snprintf(buffer+off, bmax-off, "eos "); + } + else if (APR_BUCKET_IS_FLUSH(b)) { + off += apr_snprintf(buffer+off, bmax-off, "flush "); + } + else if (AP_BUCKET_IS_EOR(b)) { + off += apr_snprintf(buffer+off, bmax-off, "eor "); + } + else if (H2_BUCKET_IS_H2EOC(b)) { + off += apr_snprintf(buffer+off, bmax-off, "h2eoc "); + } + else if (H2_BUCKET_IS_H2EOS(b)) { + off += apr_snprintf(buffer+off, bmax-off, "h2eos "); + } + else { + off += apr_snprintf(buffer+off, bmax-off, "meta(unknown) "); + } + } + else { + const char *btype = "data"; + if (APR_BUCKET_IS_FILE(b)) { + btype = "file"; + } + else if (APR_BUCKET_IS_PIPE(b)) { + btype = "pipe"; + } + else if (APR_BUCKET_IS_SOCKET(b)) { + btype = "socket"; + } + else if (APR_BUCKET_IS_HEAP(b)) { + btype = "heap"; + } + else if (APR_BUCKET_IS_TRANSIENT(b)) { + btype = "transient"; + } + else if (APR_BUCKET_IS_IMMORTAL(b)) { + btype = "immortal"; + } +#if APR_HAS_MMAP + else if (APR_BUCKET_IS_MMAP(b)) { + btype = "mmap"; + } +#endif + else if (APR_BUCKET_IS_POOL(b)) { + btype = "pool"; + } + + off += apr_snprintf(buffer+off, bmax-off, "%s[%ld] ", + btype, + (long)(b->length == ((apr_size_t)-1)? + -1 : b->length)); + } + } + line = *buffer? buffer : "(empty)"; + } + /* Intentional no APLOGNO */ + ap_log_cerror(APLOG_MARK, level, 0, c, "bb_dump(%ld-%d)-%s: %s", + c->id, stream_id, tag, line); + +} + apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c, const h2_config *cfg, apr_pool_t *pool) { - io->connection = c; - io->output = apr_brigade_create(pool, c->bucket_alloc); - io->buflen = 0; - io->is_tls = h2_h2_is_tls(c); - io->buffer_output = io->is_tls; + io->c = c; + io->output = apr_brigade_create(pool, c->bucket_alloc); + io->buflen = 0; + io->is_tls = h2_h2_is_tls(c); + io->buffer_output = io->is_tls; if (io->buffer_output) { io->bufsize = WRITE_BUFFER_SIZE; @@ -65,8 +145,9 @@ apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c, } if (io->is_tls) { - /* That is where we start with, - * see https://issues.apache.org/jira/browse/TS-2503 */ + /* This is what we start with, + * see https://issues.apache.org/jira/browse/TS-2503 + */ io->warmup_size = h2_config_geti64(cfg, H2_CONF_TLS_WARMUP_SIZE); io->cooldown_usecs = (h2_config_geti(cfg, H2_CONF_TLS_COOLDOWN_SECS) * APR_USEC_PER_SEC); @@ -79,9 +160,10 @@ apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c, } if (APLOGctrace1(c)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->connection, - "h2_conn_io(%ld): init, buffering=%d, warmup_size=%ld, cd_secs=%f", - io->connection->id, io->buffer_output, (long)io->warmup_size, + ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c, + "h2_conn_io(%ld): init, buffering=%d, warmup_size=%ld, " + "cd_secs=%f", io->c->id, io->buffer_output, + (long)io->warmup_size, ((float)io->cooldown_usecs/APR_USEC_PER_SEC)); } @@ -110,16 +192,17 @@ static apr_status_t pass_out(apr_bucket_brigade *bb, void *ctx) } ap_update_child_status_from_conn(c->sbh, SERVER_BUSY_WRITE, c); - status = apr_brigade_length(bb, 0, &bblen); - if (status == APR_SUCCESS) { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(03044) + apr_brigade_length(bb, 0, &bblen); + h2_conn_io_bb_log(c, 0, APLOG_TRACE2, "master conn pass", bb); + status = ap_pass_brigade(c->output_filters, bb); + if (status == APR_SUCCESS && pctx->io) { + pctx->io->bytes_written += (apr_size_t)bblen; + pctx->io->last_write = apr_time_now(); + } + if (status != APR_SUCCESS) { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c, APLOGNO(03044) "h2_conn_io(%ld): pass_out brigade %ld bytes", c->id, (long)bblen); - status = ap_pass_brigade(c->output_filters, bb); - if (status == APR_SUCCESS && pctx->io) { - pctx->io->bytes_written += (apr_size_t)bblen; - pctx->io->last_write = apr_time_now(); - } } apr_brigade_cleanup(bb); return status; @@ -141,17 +224,17 @@ static apr_status_t bucketeer_buffer(h2_conn_io *io) /* long time not written, reset write size */ io->write_size = WRITE_SIZE_INITIAL; io->bytes_written = 0; - ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->connection, + ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c, "h2_conn_io(%ld): timeout write size reset to %ld", - (long)io->connection->id, (long)io->write_size); + (long)io->c->id, (long)io->write_size); } else if (io->write_size < WRITE_SIZE_MAX && io->bytes_written >= io->warmup_size) { /* connection is hot, use max size */ io->write_size = WRITE_SIZE_MAX; - ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->connection, + ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c, "h2_conn_io(%ld): threshold reached, write size now %ld", - (long)io->connection->id, (long)io->write_size); + (long)io->c->id, (long)io->write_size); } bcount = (int)(remaining / io->write_size); @@ -177,50 +260,41 @@ apr_status_t h2_conn_io_writeb(h2_conn_io *io, apr_bucket *b) return APR_SUCCESS; } -static apr_status_t h2_conn_io_flush_int(h2_conn_io *io, int force, int eoc) +static apr_status_t h2_conn_io_flush_int(h2_conn_io *io, int flush, int eoc) { - if (io->buflen > 0 || !APR_BRIGADE_EMPTY(io->output)) { - pass_out_ctx ctx; - - if (io->buflen > 0) { - /* something in the buffer, put it in the output brigade */ - ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->connection, - "h2_conn_io: flush, flushing %ld bytes", (long)io->buflen); - bucketeer_buffer(io); - } - - if (force) { - APR_BRIGADE_INSERT_TAIL(io->output, - apr_bucket_flush_create(io->output->bucket_alloc)); - } - - ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->connection, - "h2_conn_io: flush"); - /* Send it out */ - io->buflen = 0; - ctx.c = io->connection; - ctx.io = eoc? NULL : io; + pass_out_ctx ctx; + apr_bucket *b; + + if (io->buflen == 0 && APR_BRIGADE_EMPTY(io->output)) { + return APR_SUCCESS; + } - return pass_out(io->output, &ctx); - /* no more access after this, as we might have flushed an EOC bucket - * that de-allocated us all. */ + if (io->buflen > 0) { + /* something in the buffer, put it in the output brigade */ + ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c, + "h2_conn_io: flush, flushing %ld bytes", + (long)io->buflen); + bucketeer_buffer(io); } - return APR_SUCCESS; -} - -apr_status_t h2_conn_io_pass(h2_conn_io *io, int flush) -{ - return h2_conn_io_flush_int(io, flush, 0); + + if (flush) { + b = apr_bucket_flush_create(io->c->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(io->output, b); + } + + ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c, "h2_conn_io: flush"); + io->buflen = 0; + ctx.c = io->c; + ctx.io = eoc? NULL : io; + + return pass_out(io->output, &ctx); + /* no more access after this, as we might have flushed an EOC bucket + * that de-allocated us all. */ } apr_status_t h2_conn_io_flush(h2_conn_io *io) { - /* make sure we always write a flush, even if our buffers are empty. - * We want to flush not only our buffers, but alse ones further down - * the connection filters. */ - apr_bucket *b = apr_bucket_flush_create(io->connection->bucket_alloc); - APR_BRIGADE_INSERT_TAIL(io->output, b); - return h2_conn_io_flush_int(io, 0, 0); + return h2_conn_io_flush_int(io, 1, 0); } apr_status_t h2_conn_io_consider_pass(h2_conn_io *io) @@ -228,20 +302,18 @@ apr_status_t h2_conn_io_consider_pass(h2_conn_io *io) apr_off_t len = 0; if (!APR_BRIGADE_EMPTY(io->output)) { - apr_brigade_length(io->output, 0, &len); + len = h2_brigade_mem_size(io->output); } len += io->buflen; if (len >= WRITE_BUFFER_SIZE) { - return h2_conn_io_pass(io, 0); + return h2_conn_io_flush_int(io, 1, 0); } return APR_SUCCESS; } apr_status_t h2_conn_io_write_eoc(h2_conn_io *io, h2_session *session) { - apr_bucket *b = h2_bucket_eoc_create(io->connection->bucket_alloc, session); - APR_BRIGADE_INSERT_TAIL(io->output, b); - b = apr_bucket_flush_create(io->connection->bucket_alloc); + apr_bucket *b = h2_bucket_eoc_create(io->c->bucket_alloc, session); APR_BRIGADE_INSERT_TAIL(io->output, b); return h2_conn_io_flush_int(io, 0, 1); } @@ -252,20 +324,20 @@ apr_status_t h2_conn_io_write(h2_conn_io *io, apr_status_t status = APR_SUCCESS; pass_out_ctx ctx; - ctx.c = io->connection; + ctx.c = io->c; ctx.io = io; if (io->bufsize > 0) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->connection, + ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c, "h2_conn_io: buffering %ld bytes", (long)length); if (!APR_BRIGADE_EMPTY(io->output)) { - status = h2_conn_io_pass(io, 0); + status = h2_conn_io_flush_int(io, 0, 0); } while (length > 0 && (status == APR_SUCCESS)) { apr_size_t avail = io->bufsize - io->buflen; if (avail <= 0) { - h2_conn_io_pass(io, 0); + status = h2_conn_io_flush_int(io, 0, 0); } else if (length > avail) { memcpy(io->buffer + io->buflen, buf, avail); @@ -283,7 +355,7 @@ apr_status_t h2_conn_io_write(h2_conn_io *io, } else { - ap_log_cerror(APLOG_MARK, APLOG_TRACE4, status, io->connection, + ap_log_cerror(APLOG_MARK, APLOG_TRACE4, status, io->c, "h2_conn_io: writing %ld bytes to brigade", (long)length); status = apr_brigade_write(io->output, pass_out, &ctx, buf, length); } diff --git a/modules/http2/h2_conn_io.h b/modules/http2/h2_conn_io.h index 8d71fffcd7..b8be671d38 100644 --- a/modules/http2/h2_conn_io.h +++ b/modules/http2/h2_conn_io.h @@ -26,7 +26,7 @@ struct h2_session; * directly without copying. */ typedef struct { - conn_rec *connection; + conn_rec *c; apr_bucket_brigade *output; int is_tls; @@ -77,7 +77,6 @@ apr_status_t h2_conn_io_write_eoc(h2_conn_io *io, struct h2_session *session); * @param io the connection io * @param flush if a flush bucket should be appended to any output */ -apr_status_t h2_conn_io_pass(h2_conn_io *io, int flush); apr_status_t h2_conn_io_flush(h2_conn_io *io); /** diff --git a/modules/http2/h2_io.c b/modules/http2/h2_io.c index 3f82c60f10..0beb85606d 100644 --- a/modules/http2/h2_io.c +++ b/modules/http2/h2_io.c @@ -23,6 +23,7 @@ #include #include #include +#include #include "h2_private.h" #include "h2_h2.h" @@ -33,18 +34,46 @@ #include "h2_task.h" #include "h2_util.h" -h2_io *h2_io_create(int id, apr_pool_t *pool, const h2_request *request) +h2_io *h2_io_create(int id, apr_pool_t *pool, + apr_bucket_alloc_t *bucket_alloc, + const h2_request *request) { h2_io *io = apr_pcalloc(pool, sizeof(*io)); if (io) { io->id = id; io->pool = pool; - io->bucket_alloc = apr_bucket_alloc_create(pool); + io->bucket_alloc = bucket_alloc; io->request = h2_request_clone(pool, request); } return io; } +static void check_bbin(h2_io *io) +{ + if (!io->bbin) { + io->bbin = apr_brigade_create(io->pool, io->bucket_alloc); + } +} + +static void check_bbout(h2_io *io) +{ + if (!io->bbout) { + io->bbout = apr_brigade_create(io->pool, io->bucket_alloc); + } +} + +static void check_bbtmp(h2_io *io) +{ + if (!io->bbtmp) { + io->bbtmp = apr_brigade_create(io->pool, io->bucket_alloc); + } +} + +static void append_eos(h2_io *io, apr_bucket_brigade *bb) +{ + APR_BRIGADE_INSERT_TAIL(bb, apr_bucket_eos_create(io->bucket_alloc)); +} + void h2_io_redo(h2_io *io) { io->worker_started = 0; @@ -56,8 +85,8 @@ void h2_io_redo(h2_io *io) if (io->bbout) { apr_brigade_cleanup(io->bbout); } - if (io->tmp) { - apr_brigade_cleanup(io->tmp); + if (io->bbtmp) { + apr_brigade_cleanup(io->bbtmp); } io->started_at = io->done_at = 0; } @@ -85,23 +114,12 @@ void h2_io_set_response(h2_io *io, h2_response *response) } } - void h2_io_rst(h2_io *io, int error) { io->rst_error = error; io->eos_in = 1; } -int h2_io_in_has_eos_for(h2_io *io) -{ - return io->eos_in || (io->bbin && h2_util_has_eos(io->bbin, -1)); -} - -int h2_io_in_has_data(h2_io *io) -{ - return io->bbin && h2_util_bb_has_data_or_eos(io->bbin); -} - int h2_io_out_has_data(h2_io *io) { return io->bbout && h2_util_bb_has_data_or_eos(io->bbout); @@ -199,8 +217,8 @@ static int add_trailer(void *ctx, const char *key, const char *value) return (status == APR_SUCCESS); } -static apr_status_t append_eos(h2_io *io, apr_bucket_brigade *bb, - apr_table_t *trailers) +static apr_status_t in_append_eos(h2_io *io, apr_bucket_brigade *bb, + apr_table_t *trailers) { apr_status_t status = APR_SUCCESS; apr_table_t *t = io->request->trailers; @@ -222,7 +240,7 @@ static apr_status_t append_eos(h2_io *io, apr_bucket_brigade *bb, status = apr_brigade_puts(bb, NULL, NULL, "0\r\n\r\n"); } } - APR_BRIGADE_INSERT_TAIL(bb, apr_bucket_eos_create(io->bucket_alloc)); + append_eos(io, bb); return status; } @@ -239,7 +257,7 @@ apr_status_t h2_io_in_read(h2_io *io, apr_bucket_brigade *bb, if (!io->bbin || APR_BRIGADE_EMPTY(io->bbin)) { if (io->eos_in) { if (!io->eos_in_written) { - status = append_eos(io, bb, trailers); + status = in_append_eos(io, bb, trailers); io->eos_in_written = 1; return status; } @@ -250,26 +268,27 @@ apr_status_t h2_io_in_read(h2_io *io, apr_bucket_brigade *bb, if (io->request->chunked) { /* the reader expects HTTP/1.1 chunked encoding */ - status = h2_util_move(io->tmp, io->bbin, maxlen, NULL, "h2_io_in_read_chunk"); + check_bbtmp(io); + status = h2_util_move(io->bbtmp, io->bbin, maxlen, NULL, "h2_io_in_read_chunk"); if (status == APR_SUCCESS) { apr_off_t tmp_len = 0; - apr_brigade_length(io->tmp, 1, &tmp_len); + apr_brigade_length(io->bbtmp, 1, &tmp_len); if (tmp_len > 0) { io->input_consumed += tmp_len; status = apr_brigade_printf(bb, NULL, NULL, "%lx\r\n", (unsigned long)tmp_len); if (status == APR_SUCCESS) { - status = h2_util_move(bb, io->tmp, -1, NULL, "h2_io_in_read_tmp1"); + status = h2_util_move(bb, io->bbtmp, -1, NULL, "h2_io_in_read_tmp1"); if (status == APR_SUCCESS) { status = apr_brigade_puts(bb, NULL, NULL, "\r\n"); } } } else { - status = h2_util_move(bb, io->tmp, -1, NULL, "h2_io_in_read_tmp2"); + status = h2_util_move(bb, io->bbtmp, -1, NULL, "h2_io_in_read_tmp2"); } - apr_brigade_cleanup(io->tmp); + apr_brigade_cleanup(io->bbtmp); } } else { @@ -286,7 +305,7 @@ apr_status_t h2_io_in_read(h2_io *io, apr_bucket_brigade *bb, if (status == APR_SUCCESS && (!io->bbin || APR_BRIGADE_EMPTY(io->bbin))) { if (io->eos_in) { if (!io->eos_in_written) { - status = append_eos(io, bb, trailers); + status = in_append_eos(io, bb, trailers); io->eos_in_written = 1; } } @@ -298,7 +317,7 @@ apr_status_t h2_io_in_read(h2_io *io, apr_bucket_brigade *bb, return status; } -apr_status_t h2_io_in_write(h2_io *io, apr_bucket_brigade *bb) +apr_status_t h2_io_in_write(h2_io *io, const char *d, apr_size_t len, int eos) { if (io->rst_error) { return APR_ECONNABORTED; @@ -307,13 +326,12 @@ apr_status_t h2_io_in_write(h2_io *io, apr_bucket_brigade *bb) if (io->eos_in) { return APR_EOF; } - io->eos_in = h2_util_has_eos(bb, -1); - if (!APR_BRIGADE_EMPTY(bb)) { - if (!io->bbin) { - io->bbin = apr_brigade_create(io->pool, io->bucket_alloc); - io->tmp = apr_brigade_create(io->pool, io->bucket_alloc); - } - return h2_util_move(io->bbin, bb, -1, NULL, "h2_io_in_write"); + if (eos) { + io->eos_in = 1; + } + if (len > 0) { + check_bbin(io); + return apr_brigade_write(io->bbin, NULL, NULL, d, len); } return APR_SUCCESS; } @@ -328,27 +346,36 @@ apr_status_t h2_io_in_close(h2_io *io) return APR_SUCCESS; } -apr_status_t h2_io_out_readx(h2_io *io, - h2_io_data_cb *cb, void *ctx, - apr_off_t *plen, int *peos) +static int is_out_readable(h2_io *io, apr_off_t *plen, int *peos, + apr_status_t *ps) { - apr_status_t status; - if (io->rst_error) { - return APR_ECONNABORTED; + *ps = APR_ECONNABORTED; + return 0; } - if (io->eos_out_read) { *plen = 0; *peos = 1; - return APR_SUCCESS; + *ps = APR_SUCCESS; + return 0; } else if (!io->bbout) { *plen = 0; *peos = 0; - return APR_EAGAIN; + *ps = APR_EAGAIN; + return 0; + } + return 1; +} + +apr_status_t h2_io_out_readx(h2_io *io, + h2_io_data_cb *cb, void *ctx, + apr_off_t *plen, int *peos) +{ + apr_status_t status; + if (!is_out_readable(io, plen, peos, &status)) { + return status; } - if (cb == NULL) { /* just checking length available */ status = h2_util_bb_avail(io->bbout, plen, peos); @@ -360,7 +387,6 @@ apr_status_t h2_io_out_readx(h2_io *io, io->output_consumed += *plen; } } - return status; } @@ -368,24 +394,13 @@ apr_status_t h2_io_out_read_to(h2_io *io, apr_bucket_brigade *bb, apr_off_t *plen, int *peos) { apr_status_t status; - - if (io->rst_error) { - return APR_ECONNABORTED; - } - - if (io->eos_out_read) { - *plen = 0; - *peos = 1; - return APR_SUCCESS; - } - else if (!io->bbout) { - *plen = 0; - *peos = 0; - return APR_EAGAIN; + if (!is_out_readable(io, plen, peos, &status)) { + return status; } - - io->eos_out_read = *peos = h2_util_has_eos(io->bbout, *plen); status = h2_util_move(bb, io->bbout, *plen, NULL, "h2_io_read_to"); + if (status == APR_SUCCESS && io->eos_out && APR_BRIGADE_EMPTY(io->bbout)) { + io->eos_out_read = *peos = 1; + } io->output_consumed += *plen; return status; } @@ -403,6 +418,7 @@ apr_status_t h2_io_out_write(h2_io *io, apr_bucket_brigade *bb, apr_size_t *pfile_buckets_allowed) { apr_status_t status; + apr_bucket *b; int start_allowed; if (io->rst_error) { @@ -410,27 +426,33 @@ apr_status_t h2_io_out_write(h2_io *io, apr_bucket_brigade *bb, } if (io->eos_out) { - apr_off_t len; + apr_off_t len = 0; /* We have already delivered an EOS bucket to a reader, no * sense in storing anything more here. */ - status = apr_brigade_length(bb, 1, &len); - if (status == APR_SUCCESS) { - if (len > 0) { - /* someone tries to write real data after EOS, that - * does not look right. */ - status = APR_EOF; - } - /* cleanup, as if we had moved the data */ - apr_brigade_cleanup(bb); + apr_brigade_length(bb, 0, &len); + apr_brigade_cleanup(bb); + return (len > 0)? APR_EOF : APR_SUCCESS; + } + + /* Filter the EOR bucket and set it aside. We prefer to tear down + * the request when the whole h2 stream is done */ + for (b = APR_BRIGADE_FIRST(bb); + b != APR_BRIGADE_SENTINEL(bb); + b = APR_BUCKET_NEXT(b)) + { + if (AP_BUCKET_IS_EOR(b)) { + APR_BUCKET_REMOVE(b); + io->eor = b; + break; } - return status; - } - + else if (APR_BUCKET_IS_EOS(b)) { + io->eos_out = 1; + break; + } + } + process_trailers(io, trailers); - if (!io->bbout) { - io->bbout = apr_brigade_create(io->pool, io->bucket_alloc); - } /* Let's move the buckets from the request processing in here, so * that the main thread can read them when it has time/capacity. @@ -442,6 +464,7 @@ apr_status_t h2_io_out_write(h2_io *io, apr_bucket_brigade *bb, * many open files already buffered. Otherwise we will run out of * file handles. */ + check_bbout(io); start_allowed = *pfile_buckets_allowed; status = h2_util_move(io->bbout, bb, maxlen, pfile_buckets_allowed, "h2_io_out_write"); @@ -460,14 +483,11 @@ apr_status_t h2_io_out_close(h2_io *io, apr_table_t *trailers) } if (!io->eos_out_read) { /* EOS has not been read yet */ process_trailers(io, trailers); - if (!io->bbout) { - io->bbout = apr_brigade_create(io->pool, io->bucket_alloc); - } if (!io->eos_out) { + check_bbout(io); io->eos_out = 1; if (!h2_util_has_eos(io->bbout, -1)) { - APR_BRIGADE_INSERT_TAIL(io->bbout, - apr_bucket_eos_create(io->bucket_alloc)); + append_eos(io, io->bbout); } } } diff --git a/modules/http2/h2_io.h b/modules/http2/h2_io.h index d92b7eb0d4..90d0cde8f2 100644 --- a/modules/http2/h2_io.h +++ b/modules/http2/h2_io.h @@ -44,9 +44,12 @@ struct h2_io { struct h2_response *response; /* response to request */ int rst_error; /* h2 related stream abort error */ + apr_bucket *eor; /* the EOR bucket, set aside */ + struct h2_task *task; /* the task once started */ + apr_bucket_brigade *bbin; /* input data for stream */ apr_bucket_brigade *bbout; /* output data from stream */ - apr_bucket_brigade *tmp; /* temporary data for chunking */ + apr_bucket_brigade *bbtmp; /* temporary data for chunking */ unsigned int orphaned : 1; /* h2_stream is gone for this io */ unsigned int worker_started : 1; /* h2_worker started processing for this io */ @@ -77,7 +80,9 @@ struct h2_io { /** * Creates a new h2_io for the given stream id. */ -h2_io *h2_io_create(int id, apr_pool_t *pool, const struct h2_request *request); +h2_io *h2_io_create(int id, apr_pool_t *pool, + apr_bucket_alloc_t *bucket_alloc, + const struct h2_request *request); /** * Set the response of this stream. @@ -92,19 +97,10 @@ void h2_io_rst(h2_io *io, int error); int h2_io_is_repeatable(h2_io *io); void h2_io_redo(h2_io *io); -/** - * The input data is completely queued. Blocked reads will return immediately - * and give either data or EOF. - */ -int h2_io_in_has_eos_for(h2_io *io); /** * Output data is available. */ int h2_io_out_has_data(h2_io *io); -/** - * Input data is available. - */ -int h2_io_in_has_data(h2_io *io); void h2_io_signal(h2_io *io, h2_io_op op); void h2_io_signal_init(h2_io *io, h2_io_op op, apr_interval_time_t timeout, @@ -127,7 +123,7 @@ apr_status_t h2_io_in_read(h2_io *io, apr_bucket_brigade *bb, /** * Appends given bucket to the input. */ -apr_status_t h2_io_in_write(h2_io *io, apr_bucket_brigade *bb); +apr_status_t h2_io_in_write(h2_io *io, const char *d, apr_size_t len, int eos); /** * Closes the input. After existing data has been read, APR_EOF will diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index 4d7f63bb52..1284c43255 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -195,12 +195,13 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, return NULL; } - status = apr_thread_cond_create(&m->req_added, m->pool); + status = apr_thread_cond_create(&m->task_thawed, m->pool); if (status != APR_SUCCESS) { h2_mplx_destroy(m); return NULL; } + m->bucket_alloc = apr_bucket_alloc_create(m->pool); m->max_streams = h2_config_geti(conf, H2_CONF_MAX_STREAMS); m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM); m->q = h2_iq_create(m->pool, m->max_streams); @@ -253,7 +254,7 @@ static void workers_register(h2_mplx *m) h2_workers_register(m->workers, m); } -static int io_process_events(h2_mplx *m, h2_io *io) +static int io_in_consumed_signal(h2_mplx *m, h2_io *io) { if (io->input_consumed && m->input_consumed) { m->input_consumed(m->input_consumed_ctx, @@ -264,18 +265,28 @@ static int io_process_events(h2_mplx *m, h2_io *io) return 0; } +static int io_out_consumed_signal(h2_mplx *m, h2_io *io) +{ + if (io->output_consumed && io->task && io->task->assigned) { + h2_req_engine_out_consumed(io->task->assigned, io->task->c, + io->output_consumed); + io->output_consumed = 0; + return 1; + } + return 0; +} + static void io_destroy(h2_mplx *m, h2_io *io, int events) { - apr_pool_t *pool = io->pool; + apr_pool_t *pool; /* cleanup any buffered input */ h2_io_in_shutdown(io); if (events) { /* Process outstanding events before destruction */ - io_process_events(m, io); + io_in_consumed_signal(m, io); } - io->pool = NULL; /* The pool is cleared/destroyed which also closes all * allocated file handles. Give this count back to our * file handle pool. */ @@ -286,8 +297,20 @@ static void io_destroy(h2_mplx *m, h2_io *io, int events) if (m->redo_ios) { h2_io_set_remove(m->redo_ios, io); } - - if (pool) { + + if (io->task) { + if (m->spare_allocator) { + apr_allocator_destroy(m->spare_allocator); + m->spare_allocator = NULL; + } + + h2_slave_destroy(io->task->c, &m->spare_allocator); + io->task = NULL; + } + + pool = io->pool; + io->pool = NULL; + if (0 && pool) { apr_pool_clear(pool); if (m->spare_pool) { apr_pool_destroy(m->spare_pool); @@ -365,7 +388,7 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) h2_mplx_set_consumed_cb(m, NULL, NULL); h2_iq_clear(m->q); - apr_thread_cond_broadcast(m->req_added); + apr_thread_cond_broadcast(m->task_thawed); while (!h2_io_set_iter(m->stream_ios, stream_done_iter, m)) { /* iterate until all ios have been orphaned or destroyed */ } @@ -401,7 +424,7 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) } } h2_mplx_abort(m); - apr_thread_cond_broadcast(m->req_added); + apr_thread_cond_broadcast(m->task_thawed); } } @@ -448,7 +471,7 @@ apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error) * for processing, e.g. when we received all HEADERs. But when * a stream is cancelled very early, it will not exist. */ if (io) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, "h2_mplx(%ld-%d): marking stream as done.", m->id, stream_id); io_stream_done(m, io, rst_error); @@ -498,7 +521,7 @@ apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block, } apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id, - apr_bucket_brigade *bb) + const char *data, apr_size_t len, int eos) { apr_status_t status; int acquired; @@ -508,10 +531,10 @@ apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id, h2_io *io = h2_io_set_get(m->stream_ios, stream_id); if (io && !io->orphaned) { H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_pre"); - status = h2_io_in_write(io, bb); + status = h2_io_in_write(io, data, len, eos); H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_post"); h2_io_signal(io, H2_IO_READ); - io_process_events(m, io); + io_in_consumed_signal(m, io); } else { status = APR_ECONNABORTED; @@ -533,7 +556,7 @@ apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id) status = h2_io_in_close(io); H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_close"); h2_io_signal(io, H2_IO_READ); - io_process_events(m, io); + io_in_consumed_signal(m, io); } else { status = APR_ECONNABORTED; @@ -543,6 +566,12 @@ apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id) return status; } +void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx) +{ + m->input_consumed = cb; + m->input_consumed_ctx = ctx; +} + typedef struct { h2_mplx * m; int streams_updated; @@ -551,18 +580,12 @@ typedef struct { static int update_window(void *ctx, h2_io *io) { update_ctx *uctx = (update_ctx*)ctx; - if (io_process_events(uctx->m, io)) { + if (io_in_consumed_signal(uctx->m, io)) { ++uctx->streams_updated; } return 1; } -void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx) -{ - m->input_consumed = cb; - m->input_consumed_ctx = ctx; -} - apr_status_t h2_mplx_in_update_windows(h2_mplx *m) { apr_status_t status; @@ -690,7 +713,7 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams) * shutdown input and send out any events (e.g. window * updates) asap. */ h2_io_in_shutdown(io); - io_process_events(m, io); + io_in_consumed_signal(m, io); } } @@ -717,8 +740,10 @@ static apr_status_t out_write(h2_mplx *m, h2_io *io, && !APR_BRIGADE_EMPTY(bb) && !is_aborted(m, &status)) { - status = h2_io_out_write(io, bb, m->stream_max_mem, trailers, - &m->tx_handles_reserved); + status = h2_io_out_write(io, bb, blocking? m->stream_max_mem : INT_MAX, + trailers, &m->tx_handles_reserved); + io_out_consumed_signal(m, io); + /* Wait for data to drain until there is room again or * stream timeout expires */ h2_io_signal_init(io, H2_IO_WRITE, m->stream_timeout, iowait); @@ -728,6 +753,9 @@ static apr_status_t out_write(h2_mplx *m, h2_io *io, && (m->stream_max_mem <= h2_io_out_length(io)) && !is_aborted(m, &status)) { if (!blocking) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, + "h2_mplx(%ld-%d): incomplete write", + m->id, io->id); return APR_INCOMPLETE; } trailers = NULL; @@ -856,11 +884,13 @@ apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id, apr_table_t *trailers) "h2_mplx(%ld-%d): close, no response, no rst", m->id, io->id); } - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c, - "h2_mplx(%ld-%d): close with trailers=%s", - m->id, io->id, trailers? "yes" : "no"); + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c, + "h2_mplx(%ld-%d): close with eor=%s, trailers=%s", + m->id, io->id, io->eor? "yes" : "no", + trailers? "yes" : "no"); status = h2_io_out_close(io, trailers); H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_close"); + io_out_consumed_signal(m, io); have_out_data_for(m, stream_id); } @@ -898,46 +928,6 @@ apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error) return status; } -int h2_mplx_in_has_eos_for(h2_mplx *m, int stream_id) -{ - int has_eos = 0; - int acquired; - - apr_status_t status; - AP_DEBUG_ASSERT(m); - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); - if (io && !io->orphaned) { - has_eos = h2_io_in_has_eos_for(io); - } - else { - has_eos = 1; - } - leave_mutex(m, acquired); - } - return has_eos; -} - -int h2_mplx_in_has_data_for(h2_mplx *m, int stream_id) -{ - apr_status_t status; - int has_data = 0; - int acquired; - - AP_DEBUG_ASSERT(m); - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); - if (io && !io->orphaned) { - has_data = h2_io_in_has_data(io); - } - else { - has_data = 0; - } - leave_mutex(m, acquired); - } - return has_data; -} - int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id) { apr_status_t status; @@ -1027,7 +1017,7 @@ static h2_io *open_io(h2_mplx *m, int stream_id, const h2_request *request) m->spare_pool = NULL; } - io = h2_io_create(stream_id, io_pool, request); + io = h2_io_create(stream_id, io_pool, m->bucket_alloc, request); h2_io_set_add(m->stream_ios, io); return io; @@ -1086,7 +1076,7 @@ static h2_task *pop_task(h2_mplx *m) else if (io) { conn_rec *slave = h2_slave_create(m->c, m->pool, m->spare_allocator); m->spare_allocator = NULL; - task = h2_task_create(m->id, io->request, slave, m); + io->task = task = h2_task_create(m->id, io->request, slave, m); apr_table_setn(slave->notes, H2_TASK_ID_NOTE, task->id); io->worker_started = 1; io->started_at = apr_time_now(); @@ -1123,7 +1113,7 @@ h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more) return task; } -static void task_done(h2_mplx *m, h2_task *task) +static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) { if (task) { if (task->frozen) { @@ -1135,7 +1125,7 @@ static void task_done(h2_mplx *m, h2_task *task) * bodies into the mplx. */ /* FIXME: this implementation is incomplete. */ h2_task_set_io_blocking(task, 0); - apr_thread_cond_broadcast(m->req_added); + apr_thread_cond_broadcast(m->task_thawed); } else { h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id); @@ -1149,6 +1139,18 @@ static void task_done(h2_mplx *m, h2_task *task) * other mplx's. Perhaps leave after n requests? */ h2_mplx_out_close(m, task->stream_id, NULL); + if (ngn && io) { + apr_off_t bytes = io->output_consumed + h2_io_out_length(io); + if (bytes > 0) { + /* we need to report consumed and current buffered output + * to the engine. The request will be streamed out or cancelled, + * no more data is coming from it and the engine should update + * its calculations before we destroy this information. */ + h2_req_engine_out_consumed(ngn, task->c, bytes); + io->output_consumed = 0; + } + } + if (task->engine) { if (!h2_req_engine_is_shutdown(task->engine)) { ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, @@ -1159,14 +1161,6 @@ static void task_done(h2_mplx *m, h2_task *task) h2_ngn_shed_done_ngn(m->ngn_shed, task->engine); } - if (m->spare_allocator) { - apr_allocator_destroy(m->spare_allocator); - m->spare_allocator = NULL; - } - - h2_slave_destroy(task->c, &m->spare_allocator); - task = NULL; - if (io) { apr_time_t now = apr_time_now(); if (!io->orphaned && m->redo_ios @@ -1208,9 +1202,14 @@ static void task_done(h2_mplx *m, h2_task *task) } } else { - /* hang around until the stream deregisteres */ + /* hang around until the stream deregisters */ } } + else { + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, + "h2_mplx(%ld): task %s without corresp. h2_io", + m->id, task->id); + } } } } @@ -1220,7 +1219,7 @@ void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask) int acquired; if (enter_mutex(m, &acquired) == APR_SUCCESS) { - task_done(m, task); + task_done(m, task, NULL); --m->workers_busy; if (ptask) { /* caller wants another task */ @@ -1373,8 +1372,37 @@ apr_status_t h2_mplx_idle(h2_mplx *m) * HTTP/2 request engines ******************************************************************************/ +typedef struct { + h2_mplx * m; + h2_req_engine *ngn; + int streams_updated; +} ngn_update_ctx; + +static int ngn_update_window(void *ctx, h2_io *io) +{ + ngn_update_ctx *uctx = ctx; + if (io && io->task && io->task->assigned == uctx->ngn + && io_out_consumed_signal(uctx->m, io)) { + ++uctx->streams_updated; + } + return 1; +} + +static apr_status_t ngn_out_update_windows(h2_mplx *m, h2_req_engine *ngn) +{ + ngn_update_ctx ctx; + + ctx.m = m; + ctx.ngn = ngn; + ctx.streams_updated = 0; + h2_io_set_iter(m->stream_ios, ngn_update_window, &ctx); + + return ctx.streams_updated? APR_SUCCESS : APR_EAGAIN; +} + apr_status_t h2_mplx_req_engine_push(const char *ngn_type, - request_rec *r, h2_req_engine_init *einit) + request_rec *r, + http2_req_engine_init *einit) { apr_status_t status; h2_mplx *m; @@ -1386,6 +1414,7 @@ apr_status_t h2_mplx_req_engine_push(const char *ngn_type, return APR_ECONNABORTED; } m = task->mplx; + task->r = r; if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id); @@ -1393,8 +1422,7 @@ apr_status_t h2_mplx_req_engine_push(const char *ngn_type, status = APR_ECONNABORTED; } else { - status = h2_ngn_shed_push_req(m->ngn_shed, ngn_type, - task, r, einit); + status = h2_ngn_shed_push_task(m->ngn_shed, ngn_type, task, einit); } leave_mutex(m, acquired); } @@ -1409,30 +1437,37 @@ apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn, h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn); h2_mplx *m = h2_ngn_shed_get_ctx(shed); apr_status_t status; + h2_task *task = NULL; int acquired; - *pr = NULL; if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { int want_shutdown = (block == APR_BLOCK_READ); + + /* Take this opportunity to update output consummation + * for this engine */ + ngn_out_update_windows(m, ngn); + if (want_shutdown && !h2_iq_empty(m->q)) { /* For a blocking read, check first if requests are to be * had and, if not, wait a short while before doing the * blocking, and if unsuccessful, terminating read. */ - status = h2_ngn_shed_pull_req(shed, ngn, capacity, 1, pr); + status = h2_ngn_shed_pull_task(shed, ngn, capacity, 1, &task); if (APR_STATUS_IS_EAGAIN(status)) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): start block engine pull", m->id); - apr_thread_cond_timedwait(m->req_added, m->lock, + apr_thread_cond_timedwait(m->task_thawed, m->lock, apr_time_from_msec(20)); - status = h2_ngn_shed_pull_req(shed, ngn, capacity, 1, pr); + status = h2_ngn_shed_pull_task(shed, ngn, capacity, 1, &task); } } else { - status = h2_ngn_shed_pull_req(shed, ngn, capacity, want_shutdown, pr); + status = h2_ngn_shed_pull_task(shed, ngn, capacity, + want_shutdown, &task); } leave_mutex(m, acquired); } + *pr = task? task->r : NULL; return status; } @@ -1445,14 +1480,16 @@ void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn) int acquired; if (enter_mutex(m, &acquired) == APR_SUCCESS) { + ngn_out_update_windows(m, ngn); h2_ngn_shed_done_task(m->ngn_shed, ngn, task); if (task->engine) { /* cannot report that as done until engine returns */ } else { - h2_task_output_close(task->output); - task_done(m, task); + task_done(m, task, ngn); } + /* Take this opportunity to update output consummation + * for this engine */ leave_mutex(m, acquired); } } diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index a61a63891a..840f34b464 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -67,6 +67,7 @@ struct h2_mplx { volatile int refs; conn_rec *c; apr_pool_t *pool; + apr_bucket_alloc_t *bucket_alloc; unsigned int aborted : 1; unsigned int need_registration : 1; @@ -89,7 +90,7 @@ struct h2_mplx { apr_thread_mutex_t *lock; struct apr_thread_cond_t *added_output; - struct apr_thread_cond_t *req_added; + struct apr_thread_cond_t *task_thawed; struct apr_thread_cond_t *join_wait; apr_size_t stream_max_mem; @@ -171,10 +172,6 @@ apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error); */ int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id); -/* Return != 0 iff the multiplexer has input data for the given stream. - */ -int h2_mplx_in_has_data_for(h2_mplx *m, int stream_id); - /** * Waits on output data from any stream in this session to become available. * Returns APR_TIMEUP if no data arrived in the given time. @@ -238,20 +235,14 @@ apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block, * Appends data to the input of the given stream. Storage of input data is * not subject to flow control. */ -apr_status_t h2_mplx_in_write(h2_mplx *mplx, int stream_id, - apr_bucket_brigade *bb); +apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id, + const char *data, apr_size_t len, int eos); /** * Closes the input for the given stream_id. */ apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id); -/** - * Returns != 0 iff the input for the given stream has been closed. There - * could still be data queued, but it can be read without blocking. - */ -int h2_mplx_in_has_eos_for(h2_mplx *m, int stream_id); - /** * Invoke the consumed callback for all streams that had bytes read since the * last call to this function. If no stream had input data consumed, the @@ -414,12 +405,15 @@ apr_status_t h2_mplx_idle(h2_mplx *m); * h2_req_engine handling ******************************************************************************/ +typedef void h2_output_consumed(void *ctx, conn_rec *c, apr_off_t consumed); typedef apr_status_t h2_mplx_req_engine_init(struct h2_req_engine *engine, const char *id, const char *type, apr_pool_t *pool, apr_uint32_t req_buffer_size, - request_rec *r); + request_rec *r, + h2_output_consumed **pconsumed, + void **pbaton); apr_status_t h2_mplx_req_engine_push(const char *ngn_type, request_rec *r, diff --git a/modules/http2/h2_ngn_shed.c b/modules/http2/h2_ngn_shed.c index 5b97cf914d..3e8667aa23 100644 --- a/modules/http2/h2_ngn_shed.c +++ b/modules/http2/h2_ngn_shed.c @@ -34,6 +34,7 @@ #include "h2_ctx.h" #include "h2_h2.h" #include "h2_int_queue.h" +#include "h2_mplx.h" #include "h2_response.h" #include "h2_request.h" #include "h2_task.h" @@ -46,7 +47,6 @@ typedef struct h2_ngn_entry h2_ngn_entry; struct h2_ngn_entry { APR_RING_ENTRY(h2_ngn_entry) link; h2_task *task; - request_rec *r; }; #define H2_NGN_ENTRY_NEXT(e) APR_RING_NEXT((e), link) @@ -84,6 +84,9 @@ struct h2_req_engine { apr_uint32_t no_assigned; /* # of assigned requests */ apr_uint32_t no_live; /* # of live */ apr_uint32_t no_finished; /* # of finished */ + + h2_output_consumed *out_consumed; + void *out_consumed_ctx; }; const char *h2_req_engine_get_id(h2_req_engine *engine) @@ -96,6 +99,14 @@ int h2_req_engine_is_shutdown(h2_req_engine *engine) return engine->shutdown; } +void h2_req_engine_out_consumed(h2_req_engine *engine, conn_rec *c, + apr_off_t bytes) +{ + if (engine->out_consumed) { + engine->out_consumed(engine->out_consumed_ctx, c, bytes); + } +} + h2_ngn_shed *h2_ngn_shed_create(apr_pool_t *pool, conn_rec *c, apr_uint32_t default_capacity, apr_uint32_t req_buffer_size) @@ -132,26 +143,25 @@ void h2_ngn_shed_abort(h2_ngn_shed *shed) shed->aborted = 1; } -static void ngn_add_req(h2_req_engine *ngn, h2_task *task, request_rec *r) +static void ngn_add_task(h2_req_engine *ngn, h2_task *task) { h2_ngn_entry *entry = apr_pcalloc(task->c->pool, sizeof(*entry)); APR_RING_ELEM_INIT(entry, link); entry->task = task; - entry->r = r; H2_REQ_ENTRIES_INSERT_TAIL(&ngn->entries, entry); } -apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type, - h2_task *task, request_rec *r, - h2_req_engine_init *einit){ +apr_status_t h2_ngn_shed_push_task(h2_ngn_shed *shed, const char *ngn_type, + h2_task *task, http2_req_engine_init *einit) +{ h2_req_engine *ngn; AP_DEBUG_ASSERT(shed); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c, "h2_ngn_shed(%ld): PUSHing request (task=%s)", shed->c->id, - apr_table_get(r->connection->notes, H2_TASK_ID_NOTE)); + task->id); if (task->ser_headers) { /* Max compatibility, deny processing of this */ return APR_EOF; @@ -165,10 +175,10 @@ apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type, "h2_ngn_shed(%ld): pushing request %s to %s", shed->c->id, task->id, ngn->id); if (!h2_task_is_detached(task)) { - h2_task_freeze(task, r); + h2_task_freeze(task); } /* FIXME: sometimes ngn is garbage, probly alread freed */ - ngn_add_req(ngn, task, r); + ngn_add_task(ngn, task); ngn->no_assigned++; return APR_SUCCESS; } @@ -191,7 +201,8 @@ apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type, APR_RING_INIT(&newngn->entries, h2_ngn_entry, link); status = einit(newngn, newngn->id, newngn->type, newngn->pool, - shed->req_buffer_size, r); + shed->req_buffer_size, task->r, + &newngn->out_consumed, &newngn->out_consumed_ctx); ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, task->c, "h2_ngn_shed(%ld): create engine %s (%s)", shed->c->id, newngn->id, newngn->type); @@ -199,6 +210,7 @@ apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type, AP_DEBUG_ASSERT(task->engine == NULL); newngn->task = task; task->engine = newngn; + task->assigned = newngn; apr_hash_set(shed->ngns, newngn->type, APR_HASH_KEY_STRING, newngn); } return status; @@ -206,13 +218,17 @@ apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type, return APR_EOF; } -static h2_ngn_entry *pop_non_frozen(h2_req_engine *ngn) +static h2_ngn_entry *pop_detached(h2_req_engine *ngn) { h2_ngn_entry *entry; for (entry = H2_REQ_ENTRIES_FIRST(&ngn->entries); entry != H2_REQ_ENTRIES_SENTINEL(&ngn->entries); entry = H2_NGN_ENTRY_NEXT(entry)) { - if (!entry->task->frozen) { + if (h2_task_is_detached(entry->task) + || (entry->task->engine == ngn)) { + /* The task hosting this engine can always be pulled by it. + * For other task, they need to become detached, e.g. no longer + * assigned to another worker. */ H2_NGN_ENTRY_REMOVE(entry); return entry; } @@ -220,16 +236,19 @@ static h2_ngn_entry *pop_non_frozen(h2_req_engine *ngn) return NULL; } -apr_status_t h2_ngn_shed_pull_req(h2_ngn_shed *shed, - h2_req_engine *ngn, - apr_uint32_t capacity, - int want_shutdown, - request_rec **pr) +apr_status_t h2_ngn_shed_pull_task(h2_ngn_shed *shed, + h2_req_engine *ngn, + apr_uint32_t capacity, + int want_shutdown, + h2_task **ptask) { h2_ngn_entry *entry; AP_DEBUG_ASSERT(ngn); - *pr = NULL; + *ptask = NULL; + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c, + "h2_ngn_shed(%ld): pull task for engine %s, shutdown=%d", + shed->c->id, ngn->id, want_shutdown); if (shed->aborted) { ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, shed->c, "h2_ngn_shed(%ld): abort while pulling requests %s", @@ -249,20 +268,27 @@ apr_status_t h2_ngn_shed_pull_req(h2_ngn_shed *shed, return ngn->shutdown? APR_EOF : APR_EAGAIN; } - if ((entry = pop_non_frozen(ngn))) { + if ((entry = pop_detached(ngn))) { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, entry->task->c, "h2_ngn_shed(%ld): pulled request %s for engine %s", shed->c->id, entry->task->id, ngn->id); ngn->no_live++; - *pr = entry->r; + *ptask = entry->task; + entry->task->assigned = ngn; return APR_SUCCESS; } + + if (1) { + h2_ngn_entry *entry = H2_REQ_ENTRIES_FIRST(&ngn->entries); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c, + "h2_ngn_shed(%ld): pull task, nothing, first task %s", + shed->c->id, entry->task->id); + } return APR_EAGAIN; } static apr_status_t ngn_done_task(h2_ngn_shed *shed, h2_req_engine *ngn, - h2_task *task, int waslive, int aborted, - int close) + h2_task *task, int waslive, int aborted) { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c, "h2_ngn_shed(%ld): task %s %s by %s", @@ -271,16 +297,13 @@ static apr_status_t ngn_done_task(h2_ngn_shed *shed, h2_req_engine *ngn, if (waslive) ngn->no_live--; ngn->no_assigned--; - if (close) { - h2_task_output_close(task->output); - } return APR_SUCCESS; } apr_status_t h2_ngn_shed_done_task(h2_ngn_shed *shed, struct h2_req_engine *ngn, h2_task *task) { - return ngn_done_task(shed, ngn, task, 1, 0, 0); + return ngn_done_task(shed, ngn, task, 1, 0); } void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn) @@ -302,13 +325,12 @@ void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn) for (entry = H2_REQ_ENTRIES_FIRST(&ngn->entries); entry != H2_REQ_ENTRIES_SENTINEL(&ngn->entries); entry = H2_NGN_ENTRY_NEXT(entry)) { - request_rec *r = entry->r; - h2_task *task = h2_ctx_rget_task(r); + h2_task *task = entry->task; ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, shed->c, "h2_ngn_shed(%ld): engine %s has queued task %s, " "frozen=%d, aborting", shed->c->id, ngn->id, task->id, task->frozen); - ngn_done_task(shed, ngn, task, 0, 1, 1); + ngn_done_task(shed, ngn, task, 0, 1); } } if (!shed->aborted && (ngn->no_assigned > 1 || ngn->no_live > 1)) { diff --git a/modules/http2/h2_ngn_shed.h b/modules/http2/h2_ngn_shed.h index 3dc9e375ef..832dbd3a8e 100644 --- a/modules/http2/h2_ngn_shed.h +++ b/modules/http2/h2_ngn_shed.h @@ -35,12 +35,17 @@ struct h2_ngn_shed { const char *h2_req_engine_get_id(h2_req_engine *engine); int h2_req_engine_is_shutdown(h2_req_engine *engine); +void h2_req_engine_out_consumed(h2_req_engine *engine, conn_rec *c, + apr_off_t bytes); + typedef apr_status_t h2_shed_ngn_init(h2_req_engine *engine, const char *id, const char *type, apr_pool_t *pool, apr_uint32_t req_buffer_size, - request_rec *r); + request_rec *r, + h2_output_consumed **pconsumed, + void **pbaton); h2_ngn_shed *h2_ngn_shed_create(apr_pool_t *pool, conn_rec *c, apr_uint32_t default_capactiy, @@ -53,13 +58,13 @@ h2_ngn_shed *h2_ngn_shed_get_shed(struct h2_req_engine *ngn); void h2_ngn_shed_abort(h2_ngn_shed *shed); -apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type, - struct h2_task *task, request_rec *r, +apr_status_t h2_ngn_shed_push_task(h2_ngn_shed *shed, const char *ngn_type, + struct h2_task *task, h2_shed_ngn_init *init_cb); -apr_status_t h2_ngn_shed_pull_req(h2_ngn_shed *shed, h2_req_engine *pub_ngn, - apr_uint32_t capacity, - int want_shutdown, request_rec **pr); +apr_status_t h2_ngn_shed_pull_task(h2_ngn_shed *shed, h2_req_engine *pub_ngn, + apr_uint32_t capacity, + int want_shutdown, struct h2_task **ptask); apr_status_t h2_ngn_shed_done_task(h2_ngn_shed *shed, struct h2_req_engine *ngn, diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c index d99573850d..4372353c13 100644 --- a/modules/http2/h2_session.c +++ b/modules/http2/h2_session.c @@ -235,8 +235,11 @@ static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags, } return 0; } - - status = h2_stream_write_data(stream, (const char *)data, len); + + /* FIXME: enabling setting EOS this way seems to break input handling + * in mod_proxy_http2. why? */ + status = h2_stream_write_data(stream, (const char *)data, len, + 0 /*flags & NGHTTP2_FLAG_END_STREAM*/); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c, "h2_stream(%ld-%d): data_chunk_recv, written %ld bytes", session->id, stream_id, (long)len); @@ -683,7 +686,9 @@ static apr_status_t h2_session_shutdown(h2_session *session, int reason, h2_mplx_get_max_stream_started(session->mplx), reason, (uint8_t*)err, err? strlen(err):0); status = nghttp2_session_send(session->ngh2); - h2_conn_io_pass(&session->io, 1); + if (status == APR_SUCCESS) { + status = h2_conn_io_flush(&session->io); + } ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03069) "session(%ld): sent GOAWAY, err=%d, msg=%s", session->id, reason, err? err : ""); @@ -1015,7 +1020,6 @@ static apr_status_t h2_session_start(h2_session *session, int *rv) } } - h2_conn_io_pass(&session->io, 1); return status; } @@ -1430,6 +1434,9 @@ apr_status_t h2_session_stream_destroy(h2_session *session, h2_stream *stream) { apr_pool_t *pool = h2_stream_detach_pool(stream); + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, + "h2_stream(%ld-%d): cleanup by EOS bucket destroy", + session->id, stream->id); /* this may be called while the session has already freed * some internal structures or even when the mplx is locked. */ if (session->mplx) { @@ -1702,6 +1709,7 @@ static void h2_session_ev_init(h2_session *session, int arg, const char *msg) static void h2_session_ev_local_goaway(h2_session *session, int arg, const char *msg) { + session->local_shutdown = 1; switch (session->state) { case H2_SESSION_ST_LOCAL_SHUTDOWN: /* already did that? */ @@ -1954,15 +1962,20 @@ static const int MAX_WAIT_MICROS = 200 * 1000; static void update_child_status(h2_session *session, int status, const char *msg) { - apr_snprintf(session->status, sizeof(session->status), - "%s, streams: %d/%d/%d/%d/%d (open/recv/resp/push/rst)", - msg? msg : "-", - (int)h2_ihash_count(session->streams), - (int)session->requests_received, - (int)session->responses_submitted, - (int)session->pushes_submitted, - (int)session->pushes_reset + session->streams_reset); - ap_update_child_status_descr(session->c->sbh, status, session->status); + /* Assume that we also change code/msg when something really happened and + * avoid updating the scoreboard in between */ + if (session->last_status_code != status + || session->last_status_msg != msg) { + apr_snprintf(session->status, sizeof(session->status), + "%s, streams: %d/%d/%d/%d/%d (open/recv/resp/push/rst)", + msg? msg : "-", + (int)h2_ihash_count(session->streams), + (int)session->requests_received, + (int)session->responses_submitted, + (int)session->pushes_submitted, + (int)session->pushes_reset + session->streams_reset); + ap_update_child_status_descr(session->c->sbh, status, session->status); + } } apr_status_t h2_session_process(h2_session *session, int async) @@ -2016,7 +2029,6 @@ apr_status_t h2_session_process(h2_session *session, int async) update_child_status(session, (no_streams? SERVER_BUSY_KEEPALIVE : SERVER_BUSY_READ), "idle"); /* make certain, the client receives everything before we idle */ - h2_conn_io_flush(&session->io); if (!session->keep_sync_until && async && no_streams && !session->r && session->requests_received) { ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c, @@ -2093,10 +2105,12 @@ apr_status_t h2_session_process(h2_session *session, int async) case H2_SESSION_ST_LOCAL_SHUTDOWN: case H2_SESSION_ST_REMOTE_SHUTDOWN: if (nghttp2_session_want_read(session->ngh2)) { + ap_update_child_status(session->c->sbh, SERVER_BUSY_READ, NULL); h2_filter_cin_timeout_set(session->cin, session->s->timeout); status = h2_session_read(session, 0); if (status == APR_SUCCESS) { have_read = 1; + update_child_status(session, SERVER_BUSY_READ, "busy"); dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL); } else if (status == APR_EAGAIN) { @@ -2133,7 +2147,7 @@ apr_status_t h2_session_process(h2_session *session, int async) } } - while (nghttp2_session_want_write(session->ngh2)) { + if (nghttp2_session_want_write(session->ngh2)) { ap_update_child_status(session->c->sbh, SERVER_BUSY_WRITE, NULL); status = h2_session_send(session); if (status == APR_SUCCESS) { @@ -2149,7 +2163,6 @@ apr_status_t h2_session_process(h2_session *session, int async) if (have_read || have_written) { if (session->wait_us) { session->wait_us = 0; - update_child_status(session, SERVER_BUSY_READ, "busy"); } } else if (!nghttp2_session_want_write(session->ngh2)) { @@ -2180,8 +2193,6 @@ apr_status_t h2_session_process(h2_session *session, int async) "h2_session: wait for data, %ld micros", (long)session->wait_us); } - /* make certain, the client receives everything before we idle */ - h2_conn_io_flush(&session->io); status = h2_mplx_out_trywait(session->mplx, session->wait_us, session->iowait); if (status == APR_SUCCESS) { @@ -2190,7 +2201,8 @@ apr_status_t h2_session_process(h2_session *session, int async) } else if (status == APR_TIMEUP) { /* go back to checking all inputs again */ - transit(session, "wait cycle", H2_SESSION_ST_BUSY); + transit(session, "wait cycle", session->local_shutdown? + H2_SESSION_ST_LOCAL_SHUTDOWN : H2_SESSION_ST_BUSY); } else { ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, c, @@ -2214,7 +2226,10 @@ apr_status_t h2_session_process(h2_session *session, int async) break; } - h2_conn_io_pass(&session->io, 1); + status = h2_conn_io_flush(&session->io); + if (status != APR_SUCCESS) { + dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL); + } if (!nghttp2_session_want_read(session->ngh2) && !nghttp2_session_want_write(session->ngh2)) { dispatch_event(session, H2_SESSION_EV_NGH2_DONE, 0, NULL); diff --git a/modules/http2/h2_session.h b/modules/http2/h2_session.h index fa98bf9186..566e79dee2 100644 --- a/modules/http2/h2_session.h +++ b/modules/http2/h2_session.h @@ -85,6 +85,7 @@ typedef struct h2_session { unsigned int reprioritize : 1; /* scheduled streams priority changed */ unsigned int eoc_written : 1; /* h2 eoc bucket written */ unsigned int flush : 1; /* flushing output necessary */ + unsigned int local_shutdown: 1; /* GOAWAY has been sent by us */ apr_interval_time_t wait_us; /* timout during BUSY_WAIT state, micro secs */ int unsent_submits; /* number of submitted, but not yet written responses. */ @@ -130,6 +131,8 @@ typedef struct h2_session { struct h2_push_diary *push_diary; /* remember pushes, avoid duplicates */ char status[64]; /* status message for scoreboard */ + int last_status_code; /* the one already reported */ + const char *last_status_msg; /* the one already reported */ } h2_session; diff --git a/modules/http2/h2_stream.c b/modules/http2/h2_stream.c index 29df7afd82..2b368b67cf 100644 --- a/modules/http2/h2_stream.c +++ b/modules/http2/h2_stream.c @@ -41,13 +41,6 @@ #include "h2_util.h" -#define H2_STREAM_IN(lvl,s,msg) \ - do { \ - if (APLOG_C_IS_LEVEL((s)->session->c,lvl)) \ - h2_util_bb_log((s)->session->c,(s)->id,lvl,msg,(s)->bbin); \ - } while(0) - - static int state_transition[][7] = { /* ID OP RL RR CI CO CL */ /*ID*/{ 1, 0, 0, 0, 0, 0, 0 }, @@ -144,19 +137,13 @@ static int output_open(h2_stream *stream) static h2_sos *h2_sos_mplx_create(h2_stream *stream, h2_response *response); -h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session) +h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session) { h2_stream *stream = apr_pcalloc(pool, sizeof(h2_stream)); stream->id = id; stream->state = H2_STREAM_ST_IDLE; stream->pool = pool; stream->session = session; - return stream; -} - -h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session) -{ - h2_stream *stream = h2_stream_create(id, pool, session); set_state(stream, H2_STREAM_ST_OPEN); stream->request = h2_request_create(id, pool, h2_config_geti(session->config, H2_CONF_SER_HEADERS)); @@ -296,8 +283,6 @@ apr_status_t h2_stream_schedule(h2_stream *stream, int eos, int push_enabled, if (status == APR_SUCCESS) { if (!eos) { stream->request->body = 1; - stream->bbin = apr_brigade_create(stream->pool, - stream->session->c->bucket_alloc); } stream->input_remaining = stream->request->content_length; @@ -328,33 +313,6 @@ int h2_stream_is_scheduled(const h2_stream *stream) return stream->scheduled; } -static apr_status_t h2_stream_input_flush(h2_stream *stream) -{ - apr_status_t status = APR_SUCCESS; - if (stream->bbin && !APR_BRIGADE_EMPTY(stream->bbin)) { - - status = h2_mplx_in_write(stream->session->mplx, stream->id, stream->bbin); - if (status != APR_SUCCESS) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->mplx->c, - "h2_stream(%ld-%d): flushing input data", - stream->session->id, stream->id); - } - } - return status; -} - -static apr_status_t input_flush(apr_bucket_brigade *bb, void *ctx) -{ - (void)bb; - return h2_stream_input_flush(ctx); -} - -static apr_status_t input_add_data(h2_stream *stream, - const char *data, size_t len) -{ - return apr_brigade_write(stream->bbin, input_flush, stream, data, len); -} - apr_status_t h2_stream_close_input(h2_stream *stream) { apr_status_t status = APR_SUCCESS; @@ -368,28 +326,23 @@ apr_status_t h2_stream_close_input(h2_stream *stream) return APR_ECONNRESET; } - H2_STREAM_IN(APLOG_TRACE2, stream, "close_pre"); - if (close_input(stream) && stream->bbin) { - status = h2_stream_input_flush(stream); - if (status == APR_SUCCESS) { - status = h2_mplx_in_close(stream->session->mplx, stream->id); - } + if (close_input(stream)) { + status = h2_mplx_in_close(stream->session->mplx, stream->id); } - H2_STREAM_IN(APLOG_TRACE2, stream, "close_post"); return status; } apr_status_t h2_stream_write_data(h2_stream *stream, - const char *data, size_t len) + const char *data, size_t len, int eos) { apr_status_t status = APR_SUCCESS; AP_DEBUG_ASSERT(stream); - if (input_closed(stream) || !stream->request->eoh || !stream->bbin) { + if (input_closed(stream) || !stream->request->eoh) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, - "h2_stream(%ld-%d): writing denied, closed=%d, eoh=%d, bbin=%d", + "h2_stream(%ld-%d): writing denied, closed=%d, eoh=%d", stream->session->id, stream->id, input_closed(stream), - stream->request->eoh, !!stream->bbin); + stream->request->eoh); return APR_EINVAL; } @@ -397,7 +350,6 @@ apr_status_t h2_stream_write_data(h2_stream *stream, "h2_stream(%ld-%d): add %ld input bytes", stream->session->id, stream->id, (long)len); - H2_STREAM_IN(APLOG_TRACE2, stream, "write_data_pre"); if (!stream->request->chunked) { stream->input_remaining -= len; if (stream->input_remaining < 0) { @@ -413,11 +365,10 @@ apr_status_t h2_stream_write_data(h2_stream *stream, } } - status = input_add_data(stream, data, len); - if (status == APR_SUCCESS) { - status = h2_stream_input_flush(stream); + status = h2_mplx_in_write(stream->session->mplx, stream->id, data, len, eos); + if (eos) { + close_input(stream); } - H2_STREAM_IN(APLOG_TRACE2, stream, "write_data_post"); return status; } diff --git a/modules/http2/h2_stream.h b/modules/http2/h2_stream.h index 7d724259fa..b7df632502 100644 --- a/modules/http2/h2_stream.h +++ b/modules/http2/h2_stream.h @@ -57,7 +57,6 @@ struct h2_stream { unsigned int submitted : 1; /* response HEADER has been sent */ apr_off_t input_remaining; /* remaining bytes on input as advertised via content-length */ - apr_bucket_brigade *bbin; /* input DATA */ struct h2_sos *sos; /* stream output source, e.g. to read output from */ apr_off_t data_frames_sent; /* # of DATA frames sent out for this stream */ @@ -66,15 +65,6 @@ struct h2_stream { #define H2_STREAM_RST(s, def) (s->rst_error? s->rst_error : (def)) -/** - * Create a stream in IDLE state. - * @param id the stream identifier - * @param pool the memory pool to use for this stream - * @param session the session this stream belongs to - * @return the newly created IDLE stream - */ -h2_stream *h2_stream_create(int id, apr_pool_t *pool, struct h2_session *session); - /** * Create a stream in OPEN state. * @param id the stream identifier @@ -155,7 +145,7 @@ apr_status_t h2_stream_close_input(h2_stream *stream); * @param len the number of bytes to write */ apr_status_t h2_stream_write_data(h2_stream *stream, - const char *data, size_t len); + const char *data, size_t len, int eos); /** * Reset the stream. Stream write/reads will return errors afterwards. diff --git a/modules/http2/h2_task.c b/modules/http2/h2_task.c index 7b1aa8df67..b722f5281e 100644 --- a/modules/http2/h2_task.c +++ b/modules/http2/h2_task.c @@ -86,27 +86,6 @@ static apr_status_t h2_filter_read_response(ap_filter_t* f, return h2_from_h1_read_response(task->output->from_h1, f, bb); } -static apr_status_t h2_response_freeze_filter(ap_filter_t* f, - apr_bucket_brigade* bb) -{ - h2_task *task = f->ctx; - AP_DEBUG_ASSERT(task); - - if (task->frozen) { - ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, f->r, - "h2_response_freeze_filter, saving"); - return ap_save_brigade(f, &task->output->frozen_bb, &bb, task->c->pool); - } - - if (APR_BRIGADE_EMPTY(bb)) { - return APR_SUCCESS; - } - - ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, f->r, - "h2_response_freeze_filter, passing"); - return ap_pass_brigade(f->next, bb); -} - /******************************************************************************* * Register various hooks */ @@ -141,8 +120,6 @@ void h2_task_register_hooks(void) NULL, AP_FTYPE_PROTOCOL); ap_register_output_filter("H2_TRAILERS", h2_response_trailers_filter, NULL, AP_FTYPE_PROTOCOL); - ap_register_output_filter("H2_RESPONSE_FREEZE", h2_response_freeze_filter, - NULL, AP_FTYPE_RESOURCE); } /* post config init */ @@ -314,15 +291,11 @@ static int h2_task_process_conn(conn_rec* c) return DECLINED; } -apr_status_t h2_task_freeze(h2_task *task, request_rec *r) +apr_status_t h2_task_freeze(h2_task *task) { if (!task->frozen) { - conn_rec *c = task->c; - task->frozen = 1; - task->output->frozen_bb = apr_brigade_create(c->pool, c->bucket_alloc); - ap_add_output_filter("H2_RESPONSE_FREEZE", task, r, r->connection); - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->c, "h2_task(%s), frozen", task->id); } return APR_SUCCESS; diff --git a/modules/http2/h2_task.h b/modules/http2/h2_task.h index c4c1c13d1d..fd3e8c9b39 100644 --- a/modules/http2/h2_task.h +++ b/modules/http2/h2_task.h @@ -66,7 +66,9 @@ struct h2_task { struct h2_task_output *output; struct apr_thread_cond_t *io; /* used to wait for events on */ - struct h2_req_engine *engine; + struct h2_req_engine *engine; /* engine hosted by this task */ + struct h2_req_engine *assigned; /* engine that task has been assigned to */ + request_rec *r; /* request being processed in this task */ }; h2_task *h2_task_create(long session_id, const struct h2_request *req, @@ -83,7 +85,7 @@ apr_status_t h2_task_init(apr_pool_t *pool, server_rec *s); extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_in) *h2_task_logio_add_bytes_in; extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_out) *h2_task_logio_add_bytes_out; -apr_status_t h2_task_freeze(h2_task *task, request_rec *r); +apr_status_t h2_task_freeze(h2_task *task); apr_status_t h2_task_thaw(h2_task *task); int h2_task_is_detached(h2_task *task); diff --git a/modules/http2/h2_task_output.c b/modules/http2/h2_task_output.c index 025c139873..1ff08484c9 100644 --- a/modules/http2/h2_task_output.c +++ b/modules/http2/h2_task_output.c @@ -42,9 +42,6 @@ h2_task_output *h2_task_output_create(h2_task *task, conn_rec *c) output->task = task; output->state = H2_TASK_OUT_INIT; output->from_h1 = h2_from_h1_create(task->stream_id, c->pool); - if (!output->from_h1) { - return NULL; - } } return output; } @@ -66,47 +63,43 @@ static apr_table_t *get_trailers(h2_task_output *output) return NULL; } -static apr_status_t open_if_needed(h2_task_output *output, ap_filter_t *f, - apr_bucket_brigade *bb, const char *caller) +static apr_status_t open_response(h2_task_output *output, ap_filter_t *f, + apr_bucket_brigade *bb, const char *caller) { - if (output->state == H2_TASK_OUT_INIT) { - h2_response *response; - output->state = H2_TASK_OUT_STARTED; - response = h2_from_h1_get_response(output->from_h1); - if (!response) { - if (f) { - /* This happens currently when ap_die(status, r) is invoked - * by a read request filter. */ - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, output->c, APLOGNO(03204) - "h2_task_output(%s): write without response by %s " - "for %s %s %s", - output->task->id, caller, - output->task->request->method, - output->task->request->authority, - output->task->request->path); - output->c->aborted = 1; - } - if (output->task->io) { - apr_thread_cond_broadcast(output->task->io); - } - return APR_ECONNABORTED; + h2_response *response; + response = h2_from_h1_get_response(output->from_h1); + if (!response) { + if (f) { + /* This happens currently when ap_die(status, r) is invoked + * by a read request filter. */ + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, output->c, APLOGNO(03204) + "h2_task_output(%s): write without response by %s " + "for %s %s %s", + output->task->id, caller, + output->task->request->method, + output->task->request->authority, + output->task->request->path); + output->c->aborted = 1; } - - if (h2_task_logio_add_bytes_out) { - /* counter headers as if we'd do a HTTP/1.1 serialization */ - output->written = h2_util_table_bytes(response->headers, 3)+1; - h2_task_logio_add_bytes_out(output->c, output->written); + if (output->task->io) { + apr_thread_cond_broadcast(output->task->io); } - get_trailers(output); - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, output->c, APLOGNO(03348) - "h2_task(%s): open response to %s %s %s", - output->task->id, output->task->request->method, - output->task->request->authority, - output->task->request->path); - return h2_mplx_out_open(output->task->mplx, output->task->stream_id, - response, f, bb, output->task->io); + return APR_ECONNABORTED; + } + + if (h2_task_logio_add_bytes_out) { + /* count headers as if we'd do a HTTP/1.1 serialization */ + output->written = h2_util_table_bytes(response->headers, 3)+1; + h2_task_logio_add_bytes_out(output->c, output->written); } - return APR_SUCCESS; + get_trailers(output); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, output->c, APLOGNO(03348) + "h2_task(%s): open response to %s %s %s", + output->task->id, output->task->request->method, + output->task->request->authority, + output->task->request->path); + return h2_mplx_out_open(output->task->mplx, output->task->stream_id, + response, f, bb, output->task->io); } static apr_status_t write_brigade_raw(h2_task_output *output, @@ -145,7 +138,7 @@ static apr_status_t write_brigade_raw(h2_task_output *output, apr_status_t h2_task_output_write(h2_task_output *output, ap_filter_t* f, apr_bucket_brigade* bb) { - apr_status_t status; + apr_status_t status = APR_SUCCESS; if (APR_BRIGADE_EMPTY(bb)) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, output->c, @@ -155,15 +148,17 @@ apr_status_t h2_task_output_write(h2_task_output *output, if (output->task->frozen) { h2_util_bb_log(output->c, output->task->stream_id, APLOG_TRACE2, - "frozen task output write", bb); - return ap_save_brigade(f, &output->frozen_bb, &bb, output->c->pool); + "frozen task output write, ignored", bb); + return APR_SUCCESS; } - status = open_if_needed(output, f, bb, "write"); + if (output->state == H2_TASK_OUT_INIT) { + status = open_response(output, f, bb, "write"); + output->state = H2_TASK_OUT_STARTED; + } /* Attempt to write saved brigade first */ - if (status == APR_SUCCESS && output->bb - && !APR_BRIGADE_EMPTY(output->bb)) { + if (status == APR_SUCCESS && output->bb && !APR_BRIGADE_EMPTY(output->bb)) { status = write_brigade_raw(output, f, output->bb); } @@ -188,20 +183,3 @@ apr_status_t h2_task_output_write(h2_task_output *output, return status; } -void h2_task_output_close(h2_task_output *output) -{ - if (output->task->frozen) { - return; - } - open_if_needed(output, NULL, NULL, "close"); - if (output->state != H2_TASK_OUT_DONE) { - if (output->frozen_bb && !APR_BRIGADE_EMPTY(output->frozen_bb)) { - h2_mplx_out_write(output->task->mplx, output->task->stream_id, - NULL, 1, output->frozen_bb, NULL, NULL); - } - output->state = H2_TASK_OUT_DONE; - h2_mplx_out_close(output->task->mplx, output->task->stream_id, - get_trailers(output)); - } -} - diff --git a/modules/http2/h2_task_output.h b/modules/http2/h2_task_output.h index 26326f0908..7670582072 100644 --- a/modules/http2/h2_task_output.h +++ b/modules/http2/h2_task_output.h @@ -44,7 +44,6 @@ struct h2_task_output { apr_off_t written; apr_bucket_brigade *bb; - apr_bucket_brigade *frozen_bb; }; h2_task_output *h2_task_output_create(struct h2_task *task, conn_rec *c); @@ -53,8 +52,6 @@ apr_status_t h2_task_output_write(h2_task_output *output, ap_filter_t* filter, apr_bucket_brigade* brigade); -void h2_task_output_close(h2_task_output *output); - apr_status_t h2_task_output_freeze(h2_task_output *output); apr_status_t h2_task_output_thaw(h2_task_output *output); diff --git a/modules/http2/h2_util.c b/modules/http2/h2_util.c index 904349658c..e84a4aa72f 100644 --- a/modules/http2/h2_util.c +++ b/modules/http2/h2_util.c @@ -14,7 +14,6 @@ */ #include - #include #include @@ -537,6 +536,7 @@ apr_status_t h2_util_move(apr_bucket_brigade *to, apr_bucket_brigade *from, else { const char *data; apr_size_t len; + status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ); if (status == APR_SUCCESS && len > 0) { status = apr_brigade_write(to, NULL, NULL, data, len); @@ -635,20 +635,6 @@ apr_status_t h2_util_copy(apr_bucket_brigade *to, apr_bucket_brigade *from, return status; } -int h2_util_has_flush_or_eos(apr_bucket_brigade *bb) -{ - apr_bucket *b; - for (b = APR_BRIGADE_FIRST(bb); - b != APR_BRIGADE_SENTINEL(bb); - b = APR_BUCKET_NEXT(b)) - { - if (APR_BUCKET_IS_EOS(b) || APR_BUCKET_IS_FLUSH(b)) { - return 1; - } - } - return 0; -} - int h2_util_has_eos(apr_bucket_brigade *bb, apr_off_t len) { apr_bucket *b, *end; @@ -949,6 +935,27 @@ apr_status_t h2_transfer_brigade(apr_bucket_brigade *to, return APR_SUCCESS; } +apr_off_t h2_brigade_mem_size(apr_bucket_brigade *bb) +{ + apr_bucket *b; + apr_off_t total = 0; + + for (b = APR_BRIGADE_FIRST(bb); + b != APR_BRIGADE_SENTINEL(bb); + b = APR_BUCKET_NEXT(b)) + { + total += sizeof(*b); + if (b->length > 0) { + if (APR_BUCKET_IS_HEAP(b) + || APR_BUCKET_IS_POOL(b)) { + total += b->length; + } + } + } + return total; +} + + /******************************************************************************* * h2_ngheader ******************************************************************************/ diff --git a/modules/http2/h2_util.h b/modules/http2/h2_util.h index 4fffabb959..a83f362ffc 100644 --- a/modules/http2/h2_util.h +++ b/modules/http2/h2_util.h @@ -196,7 +196,6 @@ apr_status_t h2_util_copy(apr_bucket_brigade *to, apr_bucket_brigade *from, * @param bb the brigade to check on * @return != 0 iff brigade holds FLUSH or EOS bucket (or both) */ -int h2_util_has_flush_or_eos(apr_bucket_brigade *bb); int h2_util_has_eos(apr_bucket_brigade *bb, apr_off_t len); int h2_util_bb_has_data(apr_bucket_brigade *bb); int h2_util_bb_has_data_or_eos(apr_bucket_brigade *bb); @@ -257,4 +256,13 @@ apr_status_t h2_transfer_brigade(apr_bucket_brigade *to, apr_off_t *plen, int *peos); +/** + * Get an approximnation of the memory footprint of the given + * brigade. This varies from apr_brigade_length as + * - no buckets are ever read + * - only buckets known to allocate memory (HEAP+POOL) are counted + * - the bucket struct itself is counted + */ +apr_off_t h2_brigade_mem_size(apr_bucket_brigade *bb); + #endif /* defined(__mod_h2__h2_util__) */ diff --git a/modules/http2/h2_version.h b/modules/http2/h2_version.h index ddb5f3de56..70f8c790ef 100644 --- a/modules/http2/h2_version.h +++ b/modules/http2/h2_version.h @@ -26,7 +26,7 @@ * @macro * Version number of the http2 module as c string */ -#define MOD_HTTP2_VERSION "1.4.1" +#define MOD_HTTP2_VERSION "1.4.2" /** * @macro @@ -34,7 +34,7 @@ * release. This is a 24 bit number with 8 bits for major number, 8 bits * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203. */ -#define MOD_HTTP2_VERSION_NUM 0x010401 +#define MOD_HTTP2_VERSION_NUM 0x010402 #endif /* mod_h2_h2_version_h */ diff --git a/modules/http2/mod_http2.c b/modules/http2/mod_http2.c index c3b01733a9..6450eb9ea0 100644 --- a/modules/http2/mod_http2.c +++ b/modules/http2/mod_http2.c @@ -130,7 +130,7 @@ static int http2_is_h2(conn_rec *); static apr_status_t http2_req_engine_push(const char *ngn_type, request_rec *r, - h2_req_engine_init *einit) + http2_req_engine_init *einit) { return h2_mplx_req_engine_push(ngn_type, r, einit); } diff --git a/modules/http2/mod_http2.h b/modules/http2/mod_http2.h index c5cfe704e3..3073579282 100644 --- a/modules/http2/mod_http2.h +++ b/modules/http2/mod_http2.h @@ -36,6 +36,8 @@ struct apr_thread_cond_t; typedef struct h2_req_engine h2_req_engine; +typedef void http2_output_consumed(void *ctx, conn_rec *c, apr_off_t consumed); + /** * Initialize a h2_req_engine. The structure will be passed in but * only the name and master are set. The function should initialize @@ -43,12 +45,14 @@ typedef struct h2_req_engine h2_req_engine; * @param engine the allocated, partially filled structure * @param r the first request to process, or NULL */ -typedef apr_status_t h2_req_engine_init(h2_req_engine *engine, - const char *id, - const char *type, - apr_pool_t *pool, - apr_uint32_t req_buffer_size, - request_rec *r); +typedef apr_status_t http2_req_engine_init(h2_req_engine *engine, + const char *id, + const char *type, + apr_pool_t *pool, + apr_uint32_t req_buffer_size, + request_rec *r, + http2_output_consumed **pconsumed, + void **pbaton); /** * Push a request to an engine with the specified name for further processing. @@ -66,7 +70,7 @@ typedef apr_status_t h2_req_engine_init(h2_req_engine *engine, APR_DECLARE_OPTIONAL_FN(apr_status_t, http2_req_engine_push, (const char *engine_type, request_rec *r, - h2_req_engine_init *einit)); + http2_req_engine_init *einit)); /** * Get a new request for processing in this engine. -- 2.40.0