From: Stefan Eissing Date: Wed, 16 Mar 2016 14:01:53 +0000 (+0000) Subject: mod_http2: fix for bucket lifetime on master conn, mod_proxy_http2: flow control... X-Git-Tag: 2.5.0-alpha~1886 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=ba1fbba02466f3280110afef14541089433351de;p=apache mod_http2: fix for bucket lifetime on master conn, mod_proxy_http2: flow control from front- to backend h2 connection git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1735230 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/CHANGES b/CHANGES index 7b472699b5..550739c2e5 100644 --- a/CHANGES +++ b/CHANGES @@ -1,6 +1,13 @@ -*- coding: utf-8 -*- Changes with Apache 2.5.0 + *) mod_proxy_http2: using HTTP/2 flow control for backend streams by + observing data actually send out on the frontend h2 connection. + [Stefan Eissing] + + *) mod_http2: fixes problem with wrong lifetime of file buckets on main + connection. [Stefan Eissing] + *) mpm: Generalise the ap_mpm_register_socket functions to accept pipes or sockets. [Graham Leggett] diff --git a/modules/http2/h2_bucket_eoc.h b/modules/http2/h2_bucket_eoc.h index f1cd6f8135..2d46691995 100644 --- a/modules/http2/h2_bucket_eoc.h +++ b/modules/http2/h2_bucket_eoc.h @@ -21,6 +21,7 @@ struct h2_session; /** End Of HTTP/2 SESSION (H2EOC) bucket */ extern const apr_bucket_type_t h2_bucket_type_eoc; +#define H2_BUCKET_IS_H2EOC(e) (e->type == &h2_bucket_type_eoc) apr_bucket * h2_bucket_eoc_make(apr_bucket *b, struct h2_session *session); diff --git a/modules/http2/h2_bucket_eos.h b/modules/http2/h2_bucket_eos.h index bd3360db5a..27b501dad3 100644 --- a/modules/http2/h2_bucket_eos.h +++ b/modules/http2/h2_bucket_eos.h @@ -21,6 +21,7 @@ struct h2_stream; /** End Of HTTP/2 STREAM (H2EOS) bucket */ extern const apr_bucket_type_t h2_bucket_type_eos; +#define H2_BUCKET_IS_H2EOS(e) (e->type == &h2_bucket_type_eos) apr_bucket *h2_bucket_eos_make(apr_bucket *b, struct h2_stream *stream); diff --git a/modules/http2/h2_conn_io.c b/modules/http2/h2_conn_io.c index 8a6e66784e..59561ecd61 100644 --- a/modules/http2/h2_conn_io.c +++ b/modules/http2/h2_conn_io.c @@ -14,16 +14,18 @@ */ #include - +#include #include #include #include #include #include +#include #include "h2_private.h" #include "h2_bucket_eoc.h" +#include "h2_bucket_eos.h" #include "h2_config.h" #include "h2_conn_io.h" #include "h2_h2.h" @@ -46,6 +48,84 @@ #define WRITE_BUFFER_SIZE (5*WRITE_SIZE_MAX) +static void h2_conn_io_bb_log(conn_rec *c, int stream_id, int level, + const char *tag, apr_bucket_brigade *bb) +{ + char buffer[16 * 1024]; + const char *line = "(null)"; + apr_size_t bmax = sizeof(buffer)/sizeof(buffer[0]); + int off = 0; + apr_bucket *b; + + if (bb) { + memset(buffer, 0, bmax--); + for (b = APR_BRIGADE_FIRST(bb); + bmax && (b != APR_BRIGADE_SENTINEL(bb)); + b = APR_BUCKET_NEXT(b)) { + + if (APR_BUCKET_IS_METADATA(b)) { + if (APR_BUCKET_IS_EOS(b)) { + off += apr_snprintf(buffer+off, bmax-off, "eos "); + } + else if (APR_BUCKET_IS_FLUSH(b)) { + off += apr_snprintf(buffer+off, bmax-off, "flush "); + } + else if (AP_BUCKET_IS_EOR(b)) { + off += apr_snprintf(buffer+off, bmax-off, "eor "); + } + else if (H2_BUCKET_IS_H2EOC(b)) { + off += apr_snprintf(buffer+off, bmax-off, "h2eoc "); + } + else if (H2_BUCKET_IS_H2EOS(b)) { + off += apr_snprintf(buffer+off, bmax-off, "h2eos "); + } + else { + off += apr_snprintf(buffer+off, bmax-off, "meta(unknown) "); + } + } + else { + const char *btype = "data"; + if (APR_BUCKET_IS_FILE(b)) { + btype = "file"; + } + else if (APR_BUCKET_IS_PIPE(b)) { + btype = "pipe"; + } + else if (APR_BUCKET_IS_SOCKET(b)) { + btype = "socket"; + } + else if (APR_BUCKET_IS_HEAP(b)) { + btype = "heap"; + } + else if (APR_BUCKET_IS_TRANSIENT(b)) { + btype = "transient"; + } + else if (APR_BUCKET_IS_IMMORTAL(b)) { + btype = "immortal"; + } +#if APR_HAS_MMAP + else if (APR_BUCKET_IS_MMAP(b)) { + btype = "mmap"; + } +#endif + else if (APR_BUCKET_IS_POOL(b)) { + btype = "pool"; + } + + off += apr_snprintf(buffer+off, bmax-off, "%s[%ld] ", + btype, + (long)(b->length == ((apr_size_t)-1)? + -1 : b->length)); + } + } + line = *buffer? buffer : "(empty)"; + } + /* Intentional no APLOGNO */ + ap_log_cerror(APLOG_MARK, level, 0, c, "bb_dump(%ld-%d)-%s: %s", + c->id, stream_id, tag, line); + +} + apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c, const h2_config *cfg, apr_pool_t *pool) @@ -112,16 +192,17 @@ static apr_status_t pass_out(apr_bucket_brigade *bb, void *ctx) } ap_update_child_status_from_conn(c->sbh, SERVER_BUSY_WRITE, c); - status = apr_brigade_length(bb, 0, &bblen); - if (status == APR_SUCCESS) { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(03044) + apr_brigade_length(bb, 0, &bblen); + h2_conn_io_bb_log(c, 0, APLOG_TRACE2, "master conn pass", bb); + status = ap_pass_brigade(c->output_filters, bb); + if (status == APR_SUCCESS && pctx->io) { + pctx->io->bytes_written += (apr_size_t)bblen; + pctx->io->last_write = apr_time_now(); + } + if (status != APR_SUCCESS) { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c, APLOGNO(03044) "h2_conn_io(%ld): pass_out brigade %ld bytes", c->id, (long)bblen); - status = ap_pass_brigade(c->output_filters, bb); - if (status == APR_SUCCESS && pctx->io) { - pctx->io->bytes_written += (apr_size_t)bblen; - pctx->io->last_write = apr_time_now(); - } } apr_brigade_cleanup(bb); return status; @@ -179,9 +260,10 @@ apr_status_t h2_conn_io_writeb(h2_conn_io *io, apr_bucket *b) return APR_SUCCESS; } -static apr_status_t h2_conn_io_flush_int(h2_conn_io *io, int force, int eoc) +static apr_status_t h2_conn_io_flush_int(h2_conn_io *io, int flush, int eoc) { pass_out_ctx ctx; + apr_bucket *b; if (io->buflen == 0 && APR_BRIGADE_EMPTY(io->output)) { return APR_SUCCESS; @@ -195,13 +277,12 @@ static apr_status_t h2_conn_io_flush_int(h2_conn_io *io, int force, int eoc) bucketeer_buffer(io); } - if (force) { - apr_bucket *b = apr_bucket_flush_create(io->c->bucket_alloc); + if (flush) { + b = apr_bucket_flush_create(io->c->bucket_alloc); APR_BRIGADE_INSERT_TAIL(io->output, b); } ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c, "h2_conn_io: flush"); - /* Send it out */ io->buflen = 0; ctx.c = io->c; ctx.io = eoc? NULL : io; @@ -221,11 +302,11 @@ apr_status_t h2_conn_io_consider_pass(h2_conn_io *io) apr_off_t len = 0; if (!APR_BRIGADE_EMPTY(io->output)) { - apr_brigade_length(io->output, 0, &len); + len = h2_brigade_mem_size(io->output); } len += io->buflen; if (len >= WRITE_BUFFER_SIZE) { - return h2_conn_io_flush_int(io, 0, 0); + return h2_conn_io_flush_int(io, 1, 0); } return APR_SUCCESS; } @@ -256,7 +337,7 @@ apr_status_t h2_conn_io_write(h2_conn_io *io, while (length > 0 && (status == APR_SUCCESS)) { apr_size_t avail = io->bufsize - io->buflen; if (avail <= 0) { - h2_conn_io_flush_int(io, 0, 0); + status = h2_conn_io_flush_int(io, 0, 0); } else if (length > avail) { memcpy(io->buffer + io->buflen, buf, avail); diff --git a/modules/http2/h2_io.c b/modules/http2/h2_io.c index 39ebad3a79..0beb85606d 100644 --- a/modules/http2/h2_io.c +++ b/modules/http2/h2_io.c @@ -397,8 +397,10 @@ apr_status_t h2_io_out_read_to(h2_io *io, apr_bucket_brigade *bb, if (!is_out_readable(io, plen, peos, &status)) { return status; } - io->eos_out_read = *peos = h2_util_has_eos(io->bbout, *plen); status = h2_util_move(bb, io->bbout, *plen, NULL, "h2_io_read_to"); + if (status == APR_SUCCESS && io->eos_out && APR_BRIGADE_EMPTY(io->bbout)) { + io->eos_out_read = *peos = 1; + } io->output_consumed += *plen; return status; } @@ -423,21 +425,6 @@ apr_status_t h2_io_out_write(h2_io *io, apr_bucket_brigade *bb, return APR_ECONNABORTED; } - if (!io->eor) { - /* Filter the EOR bucket and set it aside. We prefer to tear down - * the request when the whole h2 stream is done */ - for (b = APR_BRIGADE_FIRST(bb); - b != APR_BRIGADE_SENTINEL(bb); - b = APR_BUCKET_NEXT(b)) - { - if (AP_BUCKET_IS_EOR(b)) { - APR_BUCKET_REMOVE(b); - io->eor = b; - break; - } - } - } - if (io->eos_out) { apr_off_t len = 0; /* We have already delivered an EOS bucket to a reader, no @@ -448,6 +435,23 @@ apr_status_t h2_io_out_write(h2_io *io, apr_bucket_brigade *bb, return (len > 0)? APR_EOF : APR_SUCCESS; } + /* Filter the EOR bucket and set it aside. We prefer to tear down + * the request when the whole h2 stream is done */ + for (b = APR_BRIGADE_FIRST(bb); + b != APR_BRIGADE_SENTINEL(bb); + b = APR_BUCKET_NEXT(b)) + { + if (AP_BUCKET_IS_EOR(b)) { + APR_BUCKET_REMOVE(b); + io->eor = b; + break; + } + else if (APR_BUCKET_IS_EOS(b)) { + io->eos_out = 1; + break; + } + } + process_trailers(io, trailers); /* Let's move the buckets from the request processing in here, so diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index f77a404e7a..1284c43255 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -195,7 +195,7 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, return NULL; } - status = apr_thread_cond_create(&m->req_added, m->pool); + status = apr_thread_cond_create(&m->task_thawed, m->pool); if (status != APR_SUCCESS) { h2_mplx_destroy(m); return NULL; @@ -254,7 +254,7 @@ static void workers_register(h2_mplx *m) h2_workers_register(m->workers, m); } -static int io_process_events(h2_mplx *m, h2_io *io) +static int io_in_consumed_signal(h2_mplx *m, h2_io *io) { if (io->input_consumed && m->input_consumed) { m->input_consumed(m->input_consumed_ctx, @@ -265,6 +265,17 @@ static int io_process_events(h2_mplx *m, h2_io *io) return 0; } +static int io_out_consumed_signal(h2_mplx *m, h2_io *io) +{ + if (io->output_consumed && io->task && io->task->assigned) { + h2_req_engine_out_consumed(io->task->assigned, io->task->c, + io->output_consumed); + io->output_consumed = 0; + return 1; + } + return 0; +} + static void io_destroy(h2_mplx *m, h2_io *io, int events) { apr_pool_t *pool; @@ -273,7 +284,7 @@ static void io_destroy(h2_mplx *m, h2_io *io, int events) h2_io_in_shutdown(io); if (events) { /* Process outstanding events before destruction */ - io_process_events(m, io); + io_in_consumed_signal(m, io); } /* The pool is cleared/destroyed which also closes all @@ -299,7 +310,7 @@ static void io_destroy(h2_mplx *m, h2_io *io, int events) pool = io->pool; io->pool = NULL; - if (pool) { + if (0 && pool) { apr_pool_clear(pool); if (m->spare_pool) { apr_pool_destroy(m->spare_pool); @@ -377,7 +388,7 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) h2_mplx_set_consumed_cb(m, NULL, NULL); h2_iq_clear(m->q); - apr_thread_cond_broadcast(m->req_added); + apr_thread_cond_broadcast(m->task_thawed); while (!h2_io_set_iter(m->stream_ios, stream_done_iter, m)) { /* iterate until all ios have been orphaned or destroyed */ } @@ -413,7 +424,7 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) } } h2_mplx_abort(m); - apr_thread_cond_broadcast(m->req_added); + apr_thread_cond_broadcast(m->task_thawed); } } @@ -460,7 +471,7 @@ apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error) * for processing, e.g. when we received all HEADERs. But when * a stream is cancelled very early, it will not exist. */ if (io) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, "h2_mplx(%ld-%d): marking stream as done.", m->id, stream_id); io_stream_done(m, io, rst_error); @@ -523,7 +534,7 @@ apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id, status = h2_io_in_write(io, data, len, eos); H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_post"); h2_io_signal(io, H2_IO_READ); - io_process_events(m, io); + io_in_consumed_signal(m, io); } else { status = APR_ECONNABORTED; @@ -545,7 +556,7 @@ apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id) status = h2_io_in_close(io); H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_close"); h2_io_signal(io, H2_IO_READ); - io_process_events(m, io); + io_in_consumed_signal(m, io); } else { status = APR_ECONNABORTED; @@ -555,6 +566,12 @@ apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id) return status; } +void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx) +{ + m->input_consumed = cb; + m->input_consumed_ctx = ctx; +} + typedef struct { h2_mplx * m; int streams_updated; @@ -563,18 +580,12 @@ typedef struct { static int update_window(void *ctx, h2_io *io) { update_ctx *uctx = (update_ctx*)ctx; - if (io_process_events(uctx->m, io)) { + if (io_in_consumed_signal(uctx->m, io)) { ++uctx->streams_updated; } return 1; } -void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx) -{ - m->input_consumed = cb; - m->input_consumed_ctx = ctx; -} - apr_status_t h2_mplx_in_update_windows(h2_mplx *m) { apr_status_t status; @@ -702,7 +713,7 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams) * shutdown input and send out any events (e.g. window * updates) asap. */ h2_io_in_shutdown(io); - io_process_events(m, io); + io_in_consumed_signal(m, io); } } @@ -729,8 +740,10 @@ static apr_status_t out_write(h2_mplx *m, h2_io *io, && !APR_BRIGADE_EMPTY(bb) && !is_aborted(m, &status)) { - status = h2_io_out_write(io, bb, m->stream_max_mem, trailers, - &m->tx_handles_reserved); + status = h2_io_out_write(io, bb, blocking? m->stream_max_mem : INT_MAX, + trailers, &m->tx_handles_reserved); + io_out_consumed_signal(m, io); + /* Wait for data to drain until there is room again or * stream timeout expires */ h2_io_signal_init(io, H2_IO_WRITE, m->stream_timeout, iowait); @@ -740,6 +753,9 @@ static apr_status_t out_write(h2_mplx *m, h2_io *io, && (m->stream_max_mem <= h2_io_out_length(io)) && !is_aborted(m, &status)) { if (!blocking) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, + "h2_mplx(%ld-%d): incomplete write", + m->id, io->id); return APR_INCOMPLETE; } trailers = NULL; @@ -874,11 +890,8 @@ apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id, apr_table_t *trailers) trailers? "yes" : "no"); status = h2_io_out_close(io, trailers); H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_close"); + io_out_consumed_signal(m, io); - if (io->eor) { - apr_bucket_delete(io->eor); - io->eor = NULL; - } have_out_data_for(m, stream_id); } else { @@ -1061,7 +1074,7 @@ static h2_task *pop_task(h2_mplx *m) } } else if (io) { - conn_rec *slave = h2_slave_create(m->c, io->pool, m->spare_allocator); + conn_rec *slave = h2_slave_create(m->c, m->pool, m->spare_allocator); m->spare_allocator = NULL; io->task = task = h2_task_create(m->id, io->request, slave, m); apr_table_setn(slave->notes, H2_TASK_ID_NOTE, task->id); @@ -1100,7 +1113,7 @@ h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more) return task; } -static void task_done(h2_mplx *m, h2_task *task) +static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) { if (task) { if (task->frozen) { @@ -1112,7 +1125,7 @@ static void task_done(h2_mplx *m, h2_task *task) * bodies into the mplx. */ /* FIXME: this implementation is incomplete. */ h2_task_set_io_blocking(task, 0); - apr_thread_cond_broadcast(m->req_added); + apr_thread_cond_broadcast(m->task_thawed); } else { h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id); @@ -1126,6 +1139,18 @@ static void task_done(h2_mplx *m, h2_task *task) * other mplx's. Perhaps leave after n requests? */ h2_mplx_out_close(m, task->stream_id, NULL); + if (ngn && io) { + apr_off_t bytes = io->output_consumed + h2_io_out_length(io); + if (bytes > 0) { + /* we need to report consumed and current buffered output + * to the engine. The request will be streamed out or cancelled, + * no more data is coming from it and the engine should update + * its calculations before we destroy this information. */ + h2_req_engine_out_consumed(ngn, task->c, bytes); + io->output_consumed = 0; + } + } + if (task->engine) { if (!h2_req_engine_is_shutdown(task->engine)) { ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, @@ -1194,7 +1219,7 @@ void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask) int acquired; if (enter_mutex(m, &acquired) == APR_SUCCESS) { - task_done(m, task); + task_done(m, task, NULL); --m->workers_busy; if (ptask) { /* caller wants another task */ @@ -1347,8 +1372,37 @@ apr_status_t h2_mplx_idle(h2_mplx *m) * HTTP/2 request engines ******************************************************************************/ +typedef struct { + h2_mplx * m; + h2_req_engine *ngn; + int streams_updated; +} ngn_update_ctx; + +static int ngn_update_window(void *ctx, h2_io *io) +{ + ngn_update_ctx *uctx = ctx; + if (io && io->task && io->task->assigned == uctx->ngn + && io_out_consumed_signal(uctx->m, io)) { + ++uctx->streams_updated; + } + return 1; +} + +static apr_status_t ngn_out_update_windows(h2_mplx *m, h2_req_engine *ngn) +{ + ngn_update_ctx ctx; + + ctx.m = m; + ctx.ngn = ngn; + ctx.streams_updated = 0; + h2_io_set_iter(m->stream_ios, ngn_update_window, &ctx); + + return ctx.streams_updated? APR_SUCCESS : APR_EAGAIN; +} + apr_status_t h2_mplx_req_engine_push(const char *ngn_type, - request_rec *r, h2_req_engine_init *einit) + request_rec *r, + http2_req_engine_init *einit) { apr_status_t status; h2_mplx *m; @@ -1360,6 +1414,7 @@ apr_status_t h2_mplx_req_engine_push(const char *ngn_type, return APR_ECONNABORTED; } m = task->mplx; + task->r = r; if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id); @@ -1367,8 +1422,7 @@ apr_status_t h2_mplx_req_engine_push(const char *ngn_type, status = APR_ECONNABORTED; } else { - status = h2_ngn_shed_push_req(m->ngn_shed, ngn_type, - task, r, einit); + status = h2_ngn_shed_push_task(m->ngn_shed, ngn_type, task, einit); } leave_mutex(m, acquired); } @@ -1383,30 +1437,37 @@ apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn, h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn); h2_mplx *m = h2_ngn_shed_get_ctx(shed); apr_status_t status; + h2_task *task = NULL; int acquired; - *pr = NULL; if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { int want_shutdown = (block == APR_BLOCK_READ); + + /* Take this opportunity to update output consummation + * for this engine */ + ngn_out_update_windows(m, ngn); + if (want_shutdown && !h2_iq_empty(m->q)) { /* For a blocking read, check first if requests are to be * had and, if not, wait a short while before doing the * blocking, and if unsuccessful, terminating read. */ - status = h2_ngn_shed_pull_req(shed, ngn, capacity, 1, pr); + status = h2_ngn_shed_pull_task(shed, ngn, capacity, 1, &task); if (APR_STATUS_IS_EAGAIN(status)) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): start block engine pull", m->id); - apr_thread_cond_timedwait(m->req_added, m->lock, + apr_thread_cond_timedwait(m->task_thawed, m->lock, apr_time_from_msec(20)); - status = h2_ngn_shed_pull_req(shed, ngn, capacity, 1, pr); + status = h2_ngn_shed_pull_task(shed, ngn, capacity, 1, &task); } } else { - status = h2_ngn_shed_pull_req(shed, ngn, capacity, want_shutdown, pr); + status = h2_ngn_shed_pull_task(shed, ngn, capacity, + want_shutdown, &task); } leave_mutex(m, acquired); } + *pr = task? task->r : NULL; return status; } @@ -1419,13 +1480,16 @@ void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn) int acquired; if (enter_mutex(m, &acquired) == APR_SUCCESS) { + ngn_out_update_windows(m, ngn); h2_ngn_shed_done_task(m->ngn_shed, ngn, task); if (task->engine) { /* cannot report that as done until engine returns */ } else { - task_done(m, task); + task_done(m, task, ngn); } + /* Take this opportunity to update output consummation + * for this engine */ leave_mutex(m, acquired); } } diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index f50239c3a1..840f34b464 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -90,7 +90,7 @@ struct h2_mplx { apr_thread_mutex_t *lock; struct apr_thread_cond_t *added_output; - struct apr_thread_cond_t *req_added; + struct apr_thread_cond_t *task_thawed; struct apr_thread_cond_t *join_wait; apr_size_t stream_max_mem; @@ -405,12 +405,15 @@ apr_status_t h2_mplx_idle(h2_mplx *m); * h2_req_engine handling ******************************************************************************/ +typedef void h2_output_consumed(void *ctx, conn_rec *c, apr_off_t consumed); typedef apr_status_t h2_mplx_req_engine_init(struct h2_req_engine *engine, const char *id, const char *type, apr_pool_t *pool, apr_uint32_t req_buffer_size, - request_rec *r); + request_rec *r, + h2_output_consumed **pconsumed, + void **pbaton); apr_status_t h2_mplx_req_engine_push(const char *ngn_type, request_rec *r, diff --git a/modules/http2/h2_ngn_shed.c b/modules/http2/h2_ngn_shed.c index 79ca72e846..3e8667aa23 100644 --- a/modules/http2/h2_ngn_shed.c +++ b/modules/http2/h2_ngn_shed.c @@ -34,6 +34,7 @@ #include "h2_ctx.h" #include "h2_h2.h" #include "h2_int_queue.h" +#include "h2_mplx.h" #include "h2_response.h" #include "h2_request.h" #include "h2_task.h" @@ -46,7 +47,6 @@ typedef struct h2_ngn_entry h2_ngn_entry; struct h2_ngn_entry { APR_RING_ENTRY(h2_ngn_entry) link; h2_task *task; - request_rec *r; }; #define H2_NGN_ENTRY_NEXT(e) APR_RING_NEXT((e), link) @@ -84,6 +84,9 @@ struct h2_req_engine { apr_uint32_t no_assigned; /* # of assigned requests */ apr_uint32_t no_live; /* # of live */ apr_uint32_t no_finished; /* # of finished */ + + h2_output_consumed *out_consumed; + void *out_consumed_ctx; }; const char *h2_req_engine_get_id(h2_req_engine *engine) @@ -96,6 +99,14 @@ int h2_req_engine_is_shutdown(h2_req_engine *engine) return engine->shutdown; } +void h2_req_engine_out_consumed(h2_req_engine *engine, conn_rec *c, + apr_off_t bytes) +{ + if (engine->out_consumed) { + engine->out_consumed(engine->out_consumed_ctx, c, bytes); + } +} + h2_ngn_shed *h2_ngn_shed_create(apr_pool_t *pool, conn_rec *c, apr_uint32_t default_capacity, apr_uint32_t req_buffer_size) @@ -132,26 +143,25 @@ void h2_ngn_shed_abort(h2_ngn_shed *shed) shed->aborted = 1; } -static void ngn_add_req(h2_req_engine *ngn, h2_task *task, request_rec *r) +static void ngn_add_task(h2_req_engine *ngn, h2_task *task) { h2_ngn_entry *entry = apr_pcalloc(task->c->pool, sizeof(*entry)); APR_RING_ELEM_INIT(entry, link); entry->task = task; - entry->r = r; H2_REQ_ENTRIES_INSERT_TAIL(&ngn->entries, entry); } -apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type, - h2_task *task, request_rec *r, - h2_req_engine_init *einit){ +apr_status_t h2_ngn_shed_push_task(h2_ngn_shed *shed, const char *ngn_type, + h2_task *task, http2_req_engine_init *einit) +{ h2_req_engine *ngn; AP_DEBUG_ASSERT(shed); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c, "h2_ngn_shed(%ld): PUSHing request (task=%s)", shed->c->id, - apr_table_get(r->connection->notes, H2_TASK_ID_NOTE)); + task->id); if (task->ser_headers) { /* Max compatibility, deny processing of this */ return APR_EOF; @@ -165,10 +175,10 @@ apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type, "h2_ngn_shed(%ld): pushing request %s to %s", shed->c->id, task->id, ngn->id); if (!h2_task_is_detached(task)) { - h2_task_freeze(task, r); + h2_task_freeze(task); } /* FIXME: sometimes ngn is garbage, probly alread freed */ - ngn_add_req(ngn, task, r); + ngn_add_task(ngn, task); ngn->no_assigned++; return APR_SUCCESS; } @@ -191,7 +201,8 @@ apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type, APR_RING_INIT(&newngn->entries, h2_ngn_entry, link); status = einit(newngn, newngn->id, newngn->type, newngn->pool, - shed->req_buffer_size, r); + shed->req_buffer_size, task->r, + &newngn->out_consumed, &newngn->out_consumed_ctx); ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, task->c, "h2_ngn_shed(%ld): create engine %s (%s)", shed->c->id, newngn->id, newngn->type); @@ -199,6 +210,7 @@ apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type, AP_DEBUG_ASSERT(task->engine == NULL); newngn->task = task; task->engine = newngn; + task->assigned = newngn; apr_hash_set(shed->ngns, newngn->type, APR_HASH_KEY_STRING, newngn); } return status; @@ -206,13 +218,17 @@ apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type, return APR_EOF; } -static h2_ngn_entry *pop_non_frozen(h2_req_engine *ngn) +static h2_ngn_entry *pop_detached(h2_req_engine *ngn) { h2_ngn_entry *entry; for (entry = H2_REQ_ENTRIES_FIRST(&ngn->entries); entry != H2_REQ_ENTRIES_SENTINEL(&ngn->entries); entry = H2_NGN_ENTRY_NEXT(entry)) { - if (!entry->task->frozen) { + if (h2_task_is_detached(entry->task) + || (entry->task->engine == ngn)) { + /* The task hosting this engine can always be pulled by it. + * For other task, they need to become detached, e.g. no longer + * assigned to another worker. */ H2_NGN_ENTRY_REMOVE(entry); return entry; } @@ -220,16 +236,19 @@ static h2_ngn_entry *pop_non_frozen(h2_req_engine *ngn) return NULL; } -apr_status_t h2_ngn_shed_pull_req(h2_ngn_shed *shed, - h2_req_engine *ngn, - apr_uint32_t capacity, - int want_shutdown, - request_rec **pr) +apr_status_t h2_ngn_shed_pull_task(h2_ngn_shed *shed, + h2_req_engine *ngn, + apr_uint32_t capacity, + int want_shutdown, + h2_task **ptask) { h2_ngn_entry *entry; AP_DEBUG_ASSERT(ngn); - *pr = NULL; + *ptask = NULL; + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c, + "h2_ngn_shed(%ld): pull task for engine %s, shutdown=%d", + shed->c->id, ngn->id, want_shutdown); if (shed->aborted) { ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, shed->c, "h2_ngn_shed(%ld): abort while pulling requests %s", @@ -249,14 +268,22 @@ apr_status_t h2_ngn_shed_pull_req(h2_ngn_shed *shed, return ngn->shutdown? APR_EOF : APR_EAGAIN; } - if ((entry = pop_non_frozen(ngn))) { + if ((entry = pop_detached(ngn))) { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, entry->task->c, "h2_ngn_shed(%ld): pulled request %s for engine %s", shed->c->id, entry->task->id, ngn->id); ngn->no_live++; - *pr = entry->r; + *ptask = entry->task; + entry->task->assigned = ngn; return APR_SUCCESS; } + + if (1) { + h2_ngn_entry *entry = H2_REQ_ENTRIES_FIRST(&ngn->entries); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c, + "h2_ngn_shed(%ld): pull task, nothing, first task %s", + shed->c->id, entry->task->id); + } return APR_EAGAIN; } @@ -298,8 +325,7 @@ void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn) for (entry = H2_REQ_ENTRIES_FIRST(&ngn->entries); entry != H2_REQ_ENTRIES_SENTINEL(&ngn->entries); entry = H2_NGN_ENTRY_NEXT(entry)) { - request_rec *r = entry->r; - h2_task *task = h2_ctx_rget_task(r); + h2_task *task = entry->task; ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, shed->c, "h2_ngn_shed(%ld): engine %s has queued task %s, " "frozen=%d, aborting", diff --git a/modules/http2/h2_ngn_shed.h b/modules/http2/h2_ngn_shed.h index 3dc9e375ef..832dbd3a8e 100644 --- a/modules/http2/h2_ngn_shed.h +++ b/modules/http2/h2_ngn_shed.h @@ -35,12 +35,17 @@ struct h2_ngn_shed { const char *h2_req_engine_get_id(h2_req_engine *engine); int h2_req_engine_is_shutdown(h2_req_engine *engine); +void h2_req_engine_out_consumed(h2_req_engine *engine, conn_rec *c, + apr_off_t bytes); + typedef apr_status_t h2_shed_ngn_init(h2_req_engine *engine, const char *id, const char *type, apr_pool_t *pool, apr_uint32_t req_buffer_size, - request_rec *r); + request_rec *r, + h2_output_consumed **pconsumed, + void **pbaton); h2_ngn_shed *h2_ngn_shed_create(apr_pool_t *pool, conn_rec *c, apr_uint32_t default_capactiy, @@ -53,13 +58,13 @@ h2_ngn_shed *h2_ngn_shed_get_shed(struct h2_req_engine *ngn); void h2_ngn_shed_abort(h2_ngn_shed *shed); -apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type, - struct h2_task *task, request_rec *r, +apr_status_t h2_ngn_shed_push_task(h2_ngn_shed *shed, const char *ngn_type, + struct h2_task *task, h2_shed_ngn_init *init_cb); -apr_status_t h2_ngn_shed_pull_req(h2_ngn_shed *shed, h2_req_engine *pub_ngn, - apr_uint32_t capacity, - int want_shutdown, request_rec **pr); +apr_status_t h2_ngn_shed_pull_task(h2_ngn_shed *shed, h2_req_engine *pub_ngn, + apr_uint32_t capacity, + int want_shutdown, struct h2_task **ptask); apr_status_t h2_ngn_shed_done_task(h2_ngn_shed *shed, struct h2_req_engine *ngn, diff --git a/modules/http2/h2_proxy_session.c b/modules/http2/h2_proxy_session.c index e3bad32917..0fcd8d0c11 100644 --- a/modules/http2/h2_proxy_session.c +++ b/modules/http2/h2_proxy_session.c @@ -372,7 +372,6 @@ static int on_data_chunk_recv(nghttp2_session *ngh2, uint8_t flags, stream_id, NGHTTP2_STREAM_CLOSED); return NGHTTP2_ERR_STREAM_CLOSING; } - nghttp2_session_consume(ngh2, stream_id, len); return 0; } @@ -1042,6 +1041,7 @@ static void ev_stream_done(h2_proxy_session *session, int stream_id, ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, "h2_proxy_sesssion(%s): stream(%d) closed", session->id, stream_id); + if (!stream->data_received) { apr_bucket *b; /* if the response had no body, this is the time to flush @@ -1286,7 +1286,8 @@ static int done_iter(void *udata, void *val) { cleanup_iter_ctx *ctx = udata; h2_proxy_stream *stream = val; - int touched = (stream->id <= ctx->session->last_stream_id); + int touched = (!ctx->session->last_stream_id || + stream->id <= ctx->session->last_stream_id); ctx->done(ctx->session, stream->r, 0, touched); return 1; } @@ -1306,3 +1307,49 @@ void h2_proxy_session_cleanup(h2_proxy_session *session, } } +typedef struct { + h2_proxy_session *session; + conn_rec *c; + apr_off_t bytes; + int updated; +} win_update_ctx; + +static int win_update_iter(void *udata, void *val) +{ + win_update_ctx *ctx = udata; + h2_proxy_stream *stream = val; + + if (stream->r && stream->r->connection == ctx->c) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, ctx->session->c, + "h2_proxy_session(%s-%d): win_update %ld bytes", + ctx->session->id, (int)stream->id, (long)ctx->bytes); + nghttp2_session_consume(ctx->session->ngh2, stream->id, ctx->bytes); + ctx->updated = 1; + return 0; + } + return 1; +} + + +void h2_proxy_session_update_window(h2_proxy_session *session, + conn_rec *c, apr_off_t bytes) +{ + if (session->streams && !h2_ihash_is_empty(session->streams)) { + win_update_ctx ctx; + ctx.session = session; + ctx.c = c; + ctx.bytes = bytes; + ctx.updated = 0; + h2_ihash_iter(session->streams, win_update_iter, &ctx); + + if (!ctx.updated) { + /* could not find the stream any more, possibly closed, update + * the connection window at least */ + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, + "h2_proxy_session(%s): win_update conn %ld bytes", + session->id, (long)bytes); + nghttp2_session_consume_connection(session->ngh2, (size_t)bytes); + } + } +} + diff --git a/modules/http2/h2_proxy_session.h b/modules/http2/h2_proxy_session.h index 284f9c630d..7078981c7a 100644 --- a/modules/http2/h2_proxy_session.h +++ b/modules/http2/h2_proxy_session.h @@ -103,6 +103,9 @@ apr_status_t h2_proxy_session_process(h2_proxy_session *s); void h2_proxy_session_cleanup(h2_proxy_session *s, h2_proxy_request_done *done); +void h2_proxy_session_update_window(h2_proxy_session *s, + conn_rec *c, apr_off_t bytes); + #define H2_PROXY_REQ_URL_NOTE "h2-proxy-req-url" #endif /* h2_proxy_session_h */ diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c index 08bcd18b0e..4372353c13 100644 --- a/modules/http2/h2_session.c +++ b/modules/http2/h2_session.c @@ -686,7 +686,9 @@ static apr_status_t h2_session_shutdown(h2_session *session, int reason, h2_mplx_get_max_stream_started(session->mplx), reason, (uint8_t*)err, err? strlen(err):0); status = nghttp2_session_send(session->ngh2); - h2_conn_io_flush(&session->io); + if (status == APR_SUCCESS) { + status = h2_conn_io_flush(&session->io); + } ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03069) "session(%ld): sent GOAWAY, err=%d, msg=%s", session->id, reason, err? err : ""); @@ -1432,6 +1434,9 @@ apr_status_t h2_session_stream_destroy(h2_session *session, h2_stream *stream) { apr_pool_t *pool = h2_stream_detach_pool(stream); + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, + "h2_stream(%ld-%d): cleanup by EOS bucket destroy", + session->id, stream->id); /* this may be called while the session has already freed * some internal structures or even when the mplx is locked. */ if (session->mplx) { @@ -1704,6 +1709,7 @@ static void h2_session_ev_init(h2_session *session, int arg, const char *msg) static void h2_session_ev_local_goaway(h2_session *session, int arg, const char *msg) { + session->local_shutdown = 1; switch (session->state) { case H2_SESSION_ST_LOCAL_SHUTDOWN: /* already did that? */ @@ -2195,7 +2201,8 @@ apr_status_t h2_session_process(h2_session *session, int async) } else if (status == APR_TIMEUP) { /* go back to checking all inputs again */ - transit(session, "wait cycle", H2_SESSION_ST_BUSY); + transit(session, "wait cycle", session->local_shutdown? + H2_SESSION_ST_LOCAL_SHUTDOWN : H2_SESSION_ST_BUSY); } else { ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, c, @@ -2219,7 +2226,10 @@ apr_status_t h2_session_process(h2_session *session, int async) break; } - h2_conn_io_flush(&session->io); + status = h2_conn_io_flush(&session->io); + if (status != APR_SUCCESS) { + dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL); + } if (!nghttp2_session_want_read(session->ngh2) && !nghttp2_session_want_write(session->ngh2)) { dispatch_event(session, H2_SESSION_EV_NGH2_DONE, 0, NULL); diff --git a/modules/http2/h2_session.h b/modules/http2/h2_session.h index c8e131d114..566e79dee2 100644 --- a/modules/http2/h2_session.h +++ b/modules/http2/h2_session.h @@ -85,6 +85,7 @@ typedef struct h2_session { unsigned int reprioritize : 1; /* scheduled streams priority changed */ unsigned int eoc_written : 1; /* h2 eoc bucket written */ unsigned int flush : 1; /* flushing output necessary */ + unsigned int local_shutdown: 1; /* GOAWAY has been sent by us */ apr_interval_time_t wait_us; /* timout during BUSY_WAIT state, micro secs */ int unsent_submits; /* number of submitted, but not yet written responses. */ diff --git a/modules/http2/h2_task.c b/modules/http2/h2_task.c index 97b23f9281..b722f5281e 100644 --- a/modules/http2/h2_task.c +++ b/modules/http2/h2_task.c @@ -291,7 +291,7 @@ static int h2_task_process_conn(conn_rec* c) return DECLINED; } -apr_status_t h2_task_freeze(h2_task *task, request_rec *r) +apr_status_t h2_task_freeze(h2_task *task) { if (!task->frozen) { task->frozen = 1; diff --git a/modules/http2/h2_task.h b/modules/http2/h2_task.h index c4c1c13d1d..fd3e8c9b39 100644 --- a/modules/http2/h2_task.h +++ b/modules/http2/h2_task.h @@ -66,7 +66,9 @@ struct h2_task { struct h2_task_output *output; struct apr_thread_cond_t *io; /* used to wait for events on */ - struct h2_req_engine *engine; + struct h2_req_engine *engine; /* engine hosted by this task */ + struct h2_req_engine *assigned; /* engine that task has been assigned to */ + request_rec *r; /* request being processed in this task */ }; h2_task *h2_task_create(long session_id, const struct h2_request *req, @@ -83,7 +85,7 @@ apr_status_t h2_task_init(apr_pool_t *pool, server_rec *s); extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_in) *h2_task_logio_add_bytes_in; extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_out) *h2_task_logio_add_bytes_out; -apr_status_t h2_task_freeze(h2_task *task, request_rec *r); +apr_status_t h2_task_freeze(h2_task *task); apr_status_t h2_task_thaw(h2_task *task); int h2_task_is_detached(h2_task *task); diff --git a/modules/http2/h2_task_output.c b/modules/http2/h2_task_output.c index 87bbe38c3c..1ff08484c9 100644 --- a/modules/http2/h2_task_output.c +++ b/modules/http2/h2_task_output.c @@ -158,8 +158,7 @@ apr_status_t h2_task_output_write(h2_task_output *output, } /* Attempt to write saved brigade first */ - if (status == APR_SUCCESS && output->bb - && !APR_BRIGADE_EMPTY(output->bb)) { + if (status == APR_SUCCESS && output->bb && !APR_BRIGADE_EMPTY(output->bb)) { status = write_brigade_raw(output, f, output->bb); } diff --git a/modules/http2/h2_util.c b/modules/http2/h2_util.c index 8fe2b26f4a..e84a4aa72f 100644 --- a/modules/http2/h2_util.c +++ b/modules/http2/h2_util.c @@ -14,7 +14,6 @@ */ #include - #include #include @@ -636,20 +635,6 @@ apr_status_t h2_util_copy(apr_bucket_brigade *to, apr_bucket_brigade *from, return status; } -int h2_util_has_flush_or_eos(apr_bucket_brigade *bb) -{ - apr_bucket *b; - for (b = APR_BRIGADE_FIRST(bb); - b != APR_BRIGADE_SENTINEL(bb); - b = APR_BUCKET_NEXT(b)) - { - if (APR_BUCKET_IS_EOS(b) || APR_BUCKET_IS_FLUSH(b)) { - return 1; - } - } - return 0; -} - int h2_util_has_eos(apr_bucket_brigade *bb, apr_off_t len) { apr_bucket *b, *end; @@ -950,6 +935,27 @@ apr_status_t h2_transfer_brigade(apr_bucket_brigade *to, return APR_SUCCESS; } +apr_off_t h2_brigade_mem_size(apr_bucket_brigade *bb) +{ + apr_bucket *b; + apr_off_t total = 0; + + for (b = APR_BRIGADE_FIRST(bb); + b != APR_BRIGADE_SENTINEL(bb); + b = APR_BUCKET_NEXT(b)) + { + total += sizeof(*b); + if (b->length > 0) { + if (APR_BUCKET_IS_HEAP(b) + || APR_BUCKET_IS_POOL(b)) { + total += b->length; + } + } + } + return total; +} + + /******************************************************************************* * h2_ngheader ******************************************************************************/ diff --git a/modules/http2/h2_util.h b/modules/http2/h2_util.h index 4fffabb959..a83f362ffc 100644 --- a/modules/http2/h2_util.h +++ b/modules/http2/h2_util.h @@ -196,7 +196,6 @@ apr_status_t h2_util_copy(apr_bucket_brigade *to, apr_bucket_brigade *from, * @param bb the brigade to check on * @return != 0 iff brigade holds FLUSH or EOS bucket (or both) */ -int h2_util_has_flush_or_eos(apr_bucket_brigade *bb); int h2_util_has_eos(apr_bucket_brigade *bb, apr_off_t len); int h2_util_bb_has_data(apr_bucket_brigade *bb); int h2_util_bb_has_data_or_eos(apr_bucket_brigade *bb); @@ -257,4 +256,13 @@ apr_status_t h2_transfer_brigade(apr_bucket_brigade *to, apr_off_t *plen, int *peos); +/** + * Get an approximnation of the memory footprint of the given + * brigade. This varies from apr_brigade_length as + * - no buckets are ever read + * - only buckets known to allocate memory (HEAP+POOL) are counted + * - the bucket struct itself is counted + */ +apr_off_t h2_brigade_mem_size(apr_bucket_brigade *bb); + #endif /* defined(__mod_h2__h2_util__) */ diff --git a/modules/http2/mod_http2.c b/modules/http2/mod_http2.c index c3b01733a9..6450eb9ea0 100644 --- a/modules/http2/mod_http2.c +++ b/modules/http2/mod_http2.c @@ -130,7 +130,7 @@ static int http2_is_h2(conn_rec *); static apr_status_t http2_req_engine_push(const char *ngn_type, request_rec *r, - h2_req_engine_init *einit) + http2_req_engine_init *einit) { return h2_mplx_req_engine_push(ngn_type, r, einit); } diff --git a/modules/http2/mod_http2.h b/modules/http2/mod_http2.h index c5cfe704e3..3073579282 100644 --- a/modules/http2/mod_http2.h +++ b/modules/http2/mod_http2.h @@ -36,6 +36,8 @@ struct apr_thread_cond_t; typedef struct h2_req_engine h2_req_engine; +typedef void http2_output_consumed(void *ctx, conn_rec *c, apr_off_t consumed); + /** * Initialize a h2_req_engine. The structure will be passed in but * only the name and master are set. The function should initialize @@ -43,12 +45,14 @@ typedef struct h2_req_engine h2_req_engine; * @param engine the allocated, partially filled structure * @param r the first request to process, or NULL */ -typedef apr_status_t h2_req_engine_init(h2_req_engine *engine, - const char *id, - const char *type, - apr_pool_t *pool, - apr_uint32_t req_buffer_size, - request_rec *r); +typedef apr_status_t http2_req_engine_init(h2_req_engine *engine, + const char *id, + const char *type, + apr_pool_t *pool, + apr_uint32_t req_buffer_size, + request_rec *r, + http2_output_consumed **pconsumed, + void **pbaton); /** * Push a request to an engine with the specified name for further processing. @@ -66,7 +70,7 @@ typedef apr_status_t h2_req_engine_init(h2_req_engine *engine, APR_DECLARE_OPTIONAL_FN(apr_status_t, http2_req_engine_push, (const char *engine_type, request_rec *r, - h2_req_engine_init *einit)); + http2_req_engine_init *einit)); /** * Get a new request for processing in this engine. diff --git a/modules/http2/mod_proxy_http2.c b/modules/http2/mod_proxy_http2.c index c98db1a7bb..e5dd0436e0 100644 --- a/modules/http2/mod_proxy_http2.c +++ b/modules/http2/mod_proxy_http2.c @@ -42,7 +42,7 @@ AP_DECLARE_MODULE(proxy_http2) = { /* Optional functions from mod_http2 */ static int (*is_h2)(conn_rec *c); static apr_status_t (*req_engine_push)(const char *name, request_rec *r, - h2_req_engine_init *einit); + http2_req_engine_init *einit); static apr_status_t (*req_engine_pull)(h2_req_engine *engine, apr_read_type_e block, apr_uint32_t capacity, @@ -71,7 +71,8 @@ typedef struct h2_proxy_ctx { unsigned is_ssl : 1; unsigned flushall : 1; - apr_status_t r_status; /* status of our first request work */ + apr_status_t r_status; /* status of our first request work */ + h2_proxy_session *session; /* current http2 session against backend */ } h2_proxy_ctx; static int h2_proxy_post_config(apr_pool_t *p, apr_pool_t *plog, @@ -196,12 +197,23 @@ static int proxy_http2_canon(request_rec *r, char *url) return OK; } +static void out_consumed(void *baton, conn_rec *c, apr_off_t bytes) +{ + h2_proxy_ctx *ctx = baton; + + if (ctx->session) { + h2_proxy_session_update_window(ctx->session, c, bytes); + } +} + static apr_status_t proxy_engine_init(h2_req_engine *engine, const char *id, const char *type, apr_pool_t *pool, apr_uint32_t req_buffer_size, - request_rec *r) + request_rec *r, + http2_output_consumed **pconsumed, + void **pctx) { h2_proxy_ctx *ctx = ap_get_module_config(r->connection->conn_config, &proxy_http2_module); @@ -212,6 +224,8 @@ static apr_status_t proxy_engine_init(h2_req_engine *engine, ctx->engine_pool = pool; ctx->req_buffer_size = req_buffer_size; ctx->capacity = 100; + *pconsumed = out_consumed; + *pctx = ctx; return APR_SUCCESS; } ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, r, @@ -250,7 +264,7 @@ static void request_done(h2_proxy_session *session, request_rec *r, if (req_engine_push && is_h2 && is_h2(ctx->owner)) { if (req_engine_push(ctx->engine_type, r, NULL) == APR_SUCCESS) { /* push to engine */ - ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, r->connection, + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r->connection, "h2_proxy_session(%s): rescheduled request %s", ctx->engine_id, task_id); return; @@ -287,8 +301,8 @@ static apr_status_t next_request(h2_proxy_ctx *ctx, int before_leave) } else if (req_engine_pull && ctx->engine) { apr_status_t status; - status = req_engine_pull(ctx->engine, - before_leave? APR_BLOCK_READ: APR_NONBLOCK_READ, + status = req_engine_pull(ctx->engine, before_leave? + APR_BLOCK_READ: APR_NONBLOCK_READ, ctx->capacity, &ctx->next); ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, ctx->owner, "h2_proxy_engine(%s): pulled request %s", @@ -301,40 +315,39 @@ static apr_status_t next_request(h2_proxy_ctx *ctx, int before_leave) static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) { apr_status_t status = OK; - h2_proxy_session *session; /* Step Four: Send the Request in a new HTTP/2 stream and * loop until we got the response or encounter errors. */ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, ctx->owner, "eng(%s): setup session", ctx->engine_id); - session = h2_proxy_session_setup(ctx->engine_id, ctx->p_conn, ctx->conf, - 30, h2_log2(ctx->req_buffer_size), - request_done); - if (!session) { + ctx->session = h2_proxy_session_setup(ctx->engine_id, ctx->p_conn, ctx->conf, + 30, h2_log2(ctx->req_buffer_size), + request_done); + if (!ctx->session) { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner, "session unavailable"); return HTTP_SERVICE_UNAVAILABLE; } ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner, - "eng(%s): run session %s", ctx->engine_id, session->id); - session->user_data = ctx; + "eng(%s): run session %s", ctx->engine_id, ctx->session->id); + ctx->session->user_data = ctx; while (1) { if (ctx->next) { - add_request(session, ctx->next); + add_request(ctx->session, ctx->next); ctx->next = NULL; } - status = h2_proxy_session_process(session); + status = h2_proxy_session_process(ctx->session); if (status == APR_SUCCESS) { apr_status_t s2; /* ongoing processing, call again */ - if (session->remote_max_concurrent > 0 - && session->remote_max_concurrent != ctx->capacity) { - ctx->capacity = session->remote_max_concurrent; + if (ctx->session->remote_max_concurrent > 0 + && ctx->session->remote_max_concurrent != ctx->capacity) { + ctx->capacity = ctx->session->remote_max_concurrent; } s2 = next_request(ctx, 0); if (s2 == APR_ECONNABORTED) { @@ -344,7 +357,7 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) { status = s2; break; } - if (!ctx->next && h2_ihash_is_empty(session->streams)) { + if (!ctx->next && h2_ihash_is_empty(ctx->session->streams)) { break; } } @@ -357,12 +370,13 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) { * a) be reopened on the new session iff safe to do so * b) reported as done (failed) otherwise */ - h2_proxy_session_cleanup(session, request_done); + h2_proxy_session_cleanup(ctx->session, request_done); break; } } - session->user_data = NULL; + ctx->session->user_data = NULL; + ctx->session = NULL; return status; } @@ -556,6 +570,8 @@ run_session: /* session and connection still ok */ if (next_request(ctx, 1) == APR_SUCCESS) { /* more requests, run again */ + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner, + "run_session, again"); goto run_session; } /* done */