From: Stefan Eissing Date: Wed, 2 Dec 2015 14:42:42 +0000 (+0000) Subject: merge of 1717639, fixing WINDOW_UPDATEs on small stream inputs X-Git-Tag: 2.4.18~35 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=8815c47fd185cc68e2acf90e14a02335b2e359c1;p=apache merge of 1717639, fixing WINDOW_UPDATEs on small stream inputs git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/branches/2.4.x@1717641 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/modules/http2/h2_io.c b/modules/http2/h2_io.c index 6bd2b83739..98ba60e6ab 100644 --- a/modules/http2/h2_io.c +++ b/modules/http2/h2_io.c @@ -85,6 +85,17 @@ apr_off_t h2_io_out_length(h2_io *io) return 0; } +apr_status_t h2_io_in_shutdown(h2_io *io) +{ + if (io->bbin) { + apr_off_t end_len = 0; + apr_brigade_length(io->bbin, 1, &end_len); + io->input_consumed += end_len; + apr_brigade_cleanup(io->bbin); + } + return h2_io_in_close(io); +} + apr_status_t h2_io_in_read(h2_io *io, apr_bucket_brigade *bb, apr_size_t maxlen) { diff --git a/modules/http2/h2_io.h b/modules/http2/h2_io.h index dcf493539b..08f3aa3dad 100644 --- a/modules/http2/h2_io.h +++ b/modules/http2/h2_io.h @@ -107,6 +107,12 @@ apr_status_t h2_io_in_write(h2_io *io, apr_bucket_brigade *bb); */ apr_status_t h2_io_in_close(h2_io *io); +/** + * Shuts all input down. Will close input and mark any data buffered + * as consumed. + */ +apr_status_t h2_io_in_shutdown(h2_io *io); + /******************************************************************************* * Output handling of streams. ******************************************************************************/ diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index c4efed6276..333a9b5c6b 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -194,10 +194,28 @@ static void workers_unregister(h2_mplx *m) { h2_workers_unregister(m->workers, m); } -static void io_destroy(h2_mplx *m, h2_io *io) +static int io_process_events(h2_mplx *m, h2_io *io) { + if (io->input_consumed && m->input_consumed) { + m->input_consumed(m->input_consumed_ctx, + io->id, io->input_consumed); + io->input_consumed = 0; + return 1; + } + return 0; +} + + +static void io_destroy(h2_mplx *m, h2_io *io, int events) { apr_pool_t *pool = io->pool; + /* cleanup any buffered input */ + h2_io_in_shutdown(io); + if (events) { + /* Process outstanding events before destruction */ + io_process_events(m, io); + } + io->pool = NULL; /* The pool is cleared/destroyed which also closes all * allocated file handles. Give this count back to our @@ -222,7 +240,7 @@ static int io_stream_done(h2_mplx *m, h2_io *io, int rst_error) h2_io_set_remove(m->ready_ios, io); if (io->task_done || h2_tq_remove(m->q, io->id)) { /* already finished or not even started yet */ - io_destroy(m, io); + io_destroy(m, io, 1); return 0; } else { @@ -310,7 +328,7 @@ void h2_mplx_task_done(h2_mplx *m, int stream_id) if (io) { io->task_done = 1; if (io->orphaned) { - io_destroy(m, io); + io_destroy(m, io, 0); } else { /* hang around until the stream deregisteres */ @@ -371,6 +389,7 @@ apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id, if (io->input_arrived) { apr_thread_cond_signal(io->input_arrived); } + io_process_events(m, io); } else { status = APR_EOF; @@ -396,6 +415,7 @@ apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id) if (io->input_arrived) { apr_thread_cond_signal(io->input_arrived); } + io_process_events(m, io); } else { status = APR_ECONNABORTED; @@ -406,24 +426,26 @@ apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id) } typedef struct { - h2_mplx_consumed_cb *cb; - void *cb_ctx; + h2_mplx * m; int streams_updated; } update_ctx; static int update_window(void *ctx, h2_io *io) { - if (io->input_consumed) { - update_ctx *uctx = (update_ctx*)ctx; - uctx->cb(uctx->cb_ctx, io->id, io->input_consumed); - io->input_consumed = 0; + update_ctx *uctx = (update_ctx*)ctx; + if (io_process_events(uctx->m, io)) { ++uctx->streams_updated; } return 1; } -apr_status_t h2_mplx_in_update_windows(h2_mplx *m, - h2_mplx_consumed_cb *cb, void *cb_ctx) +void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx) +{ + m->input_consumed = cb; + m->input_consumed_ctx = ctx; +} + +apr_status_t h2_mplx_in_update_windows(h2_mplx *m) { apr_status_t status; AP_DEBUG_ASSERT(m); @@ -434,8 +456,7 @@ apr_status_t h2_mplx_in_update_windows(h2_mplx *m, if (APR_SUCCESS == status) { update_ctx ctx; - ctx.cb = cb; - ctx.cb_ctx = cb_ctx; + ctx.m = m; ctx.streams_updated = 0; status = APR_EAGAIN; @@ -549,11 +570,15 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_stream_set *streams) m->id, io->id); io->orphaned = 1; if (io->task_done) { - io_destroy(m, io); + io_destroy(m, io, 1); } else { - /* hang around until the h2_task is done */ + /* hang around until the h2_task is done, but + * shutdown input and send out any events (e.g. window + * updates) asap. */ + h2_io_in_shutdown(io); h2_io_rst(io, H2_ERR_STREAM_CLOSED); + io_process_events(m, io); } } diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index f145428ff3..f2805be373 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -53,6 +53,12 @@ struct h2_task_queue; typedef struct h2_mplx h2_mplx; +/** + * Callback invoked for every stream that had input data read since + * the last invocation. + */ +typedef void h2_mplx_consumed_cb(void *ctx, int stream_id, apr_off_t consumed); + struct h2_mplx { long id; APR_RING_ENTRY(h2_mplx) link; @@ -75,6 +81,9 @@ struct h2_mplx { apr_pool_t *spare_pool; /* spare pool, ready for next io */ struct h2_workers *workers; int file_handles_allowed; + + h2_mplx_consumed_cb *input_consumed; + void *input_consumed_ctx; }; @@ -173,6 +182,17 @@ apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx) struct h2_task *h2_mplx_pop_task(h2_mplx *mplx, struct h2_worker *w, int *has_more); +/** + * Register a callback for the amount of input data consumed per stream. The + * will only ever be invoked from the thread creating this h2_mplx, e.g. when + * calls from that thread into this h2_mplx are made. + * + * @param m the multiplexer to register the callback at + * @param cb the function to invoke + * @param ctx user supplied argument to invocation. + */ +void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx); + /******************************************************************************* * Input handling of streams. ******************************************************************************/ @@ -207,20 +227,15 @@ apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id); int h2_mplx_in_has_eos_for(h2_mplx *m, int stream_id); /** - * Callback invoked for every stream that had input data read since - * the last invocation. - */ -typedef void h2_mplx_consumed_cb(void *ctx, int stream_id, apr_off_t consumed); - -/** - * Invoke the callback for all streams that had bytes read since the last - * call to this function. If no stream had input data consumed, the callback - * is not invoked. + * Invoke the consumed callback for all streams that had bytes read since the + * last call to this function. If no stream had input data consumed, the + * callback is not invoked. + * The consumed callback may also be invoked at other times whenever + * the need arises. * Returns APR_SUCCESS when an update happened, APR_EAGAIN if no update * happened. */ -apr_status_t h2_mplx_in_update_windows(h2_mplx *m, - h2_mplx_consumed_cb *cb, void *ctx); +apr_status_t h2_mplx_in_update_windows(h2_mplx *m); /******************************************************************************* * Output handling of streams. diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c index ca1d87824d..cce8ca7d5a 100644 --- a/modules/http2/h2_session.c +++ b/modules/http2/h2_session.c @@ -57,6 +57,16 @@ static int h2_session_status_from_apr_status(apr_status_t rv) return NGHTTP2_ERR_PROTO; } +static void update_window(void *ctx, int stream_id, apr_off_t bytes_read) +{ + h2_session *session = (h2_session*)ctx; + nghttp2_session_consume(session->ngh2, stream_id, bytes_read); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, + "h2_session(%ld-%d): consumed %ld bytes", + session->id, stream_id, (long)bytes_read); +} + + h2_stream *h2_session_open_stream(h2_session *session, int stream_id) { h2_stream * stream; @@ -221,6 +231,7 @@ static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags, "h2_stream(%ld-%d): data_chunk_recv, written %ld bytes", session->id, stream_id, (long)len); if (status != APR_SUCCESS) { + update_window(session, stream_id, len); rv = nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE, stream_id, H2_STREAM_RST(stream, H2_ERR_INTERNAL_ERROR)); if (nghttp2_is_fatal(rv)) { @@ -555,11 +566,11 @@ static int on_frame_send_cb(nghttp2_session *ngh2, void *user_data) { h2_session *session = user_data; - if (APLOGctrace1(session->c)) { + if (APLOGcdebug(session->c)) { char buffer[256]; frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0])); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, "h2_session(%ld): frame_send %s", session->id, buffer); } @@ -681,6 +692,8 @@ static h2_session *h2_session_create_int(conn_rec *c, session->workers = workers; session->mplx = h2_mplx_create(c, session->pool, config, workers); + h2_mplx_set_consumed_cb(session->mplx, update_window, session); + h2_conn_io_init(&session->io, c, config, session->pool); session->bbtmp = apr_brigade_create(session->pool, c->bucket_alloc); @@ -998,12 +1011,6 @@ static int h2_session_resume_streams_with_data(h2_session *session) { return 0; } -static void update_window(void *ctx, int stream_id, apr_off_t bytes_read) -{ - h2_session *session = (h2_session*)ctx; - nghttp2_session_consume(session->ngh2, stream_id, bytes_read); -} - h2_stream *h2_session_get_stream(h2_session *session, int stream_id) { if (!session->last_stream || stream_id != session->last_stream->id) { @@ -1579,9 +1586,13 @@ apr_status_t h2_session_process(h2_session *session) } if (wait_micros > 0) { - ap_log_cerror( APLOG_MARK, APLOG_TRACE3, 0, session->c, - "h2_session: wait for data, %ld micros", (long)(wait_micros)); - h2_conn_io_pass(&session->io); + if (APLOGcdebug(session->c)) { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, + "h2_session: wait for data, %ld micros", + (long)wait_micros); + } + nghttp2_session_send(session->ngh2); + h2_conn_io_flush(&session->io); status = h2_mplx_out_trywait(session->mplx, wait_micros, session->iowait); if (status == APR_TIMEUP) { @@ -1687,16 +1698,14 @@ apr_status_t h2_session_process(h2_session *session) } } - if (h2_stream_set_has_open_input(session->streams)) { - /* Check that any pending window updates are sent. */ - status = h2_mplx_in_update_windows(session->mplx, update_window, session); - if (APR_STATUS_IS_EAGAIN(status)) { - status = APR_SUCCESS; - } - else if (status == APR_SUCCESS) { - /* need to flush window updates onto the connection asap */ - h2_conn_io_flush(&session->io); - } + /* Check that any pending window updates are sent. */ + status = h2_mplx_in_update_windows(session->mplx); + if (APR_STATUS_IS_EAGAIN(status)) { + status = APR_SUCCESS; + } + else if (status == APR_SUCCESS) { + /* need to flush window updates onto the connection asap */ + h2_conn_io_flush(&session->io); } }