From a082ae1643bf33e21cb84f5dd67a8bd25f2214de Mon Sep 17 00:00:00 2001 From: Stefan Eissing Date: Thu, 10 Mar 2016 15:51:14 +0000 Subject: [PATCH] mod_http2: some code cleanup of stream request body handling, potential avoid a buffer copy git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1734428 13f79535-47bb-0310-9956-ffa450edef68 --- modules/http2/h2_io.c | 34 +++++++++---------- modules/http2/h2_io.h | 11 +----- modules/http2/h2_mplx.c | 44 ++---------------------- modules/http2/h2_mplx.h | 14 ++------ modules/http2/h2_session.c | 7 ++-- modules/http2/h2_stream.c | 69 ++++++-------------------------------- modules/http2/h2_stream.h | 12 +------ 7 files changed, 36 insertions(+), 155 deletions(-) diff --git a/modules/http2/h2_io.c b/modules/http2/h2_io.c index 3f82c60f10..92f4275173 100644 --- a/modules/http2/h2_io.c +++ b/modules/http2/h2_io.c @@ -45,6 +45,14 @@ h2_io *h2_io_create(int id, apr_pool_t *pool, const h2_request *request) return io; } +static void check_bbin(h2_io *io) +{ + if (!io->bbin) { + io->bbin = apr_brigade_create(io->pool, io->bucket_alloc); + io->tmp = apr_brigade_create(io->pool, io->bucket_alloc); + } +} + void h2_io_redo(h2_io *io) { io->worker_started = 0; @@ -85,23 +93,12 @@ void h2_io_set_response(h2_io *io, h2_response *response) } } - void h2_io_rst(h2_io *io, int error) { io->rst_error = error; io->eos_in = 1; } -int h2_io_in_has_eos_for(h2_io *io) -{ - return io->eos_in || (io->bbin && h2_util_has_eos(io->bbin, -1)); -} - -int h2_io_in_has_data(h2_io *io) -{ - return io->bbin && h2_util_bb_has_data_or_eos(io->bbin); -} - int h2_io_out_has_data(h2_io *io) { return io->bbout && h2_util_bb_has_data_or_eos(io->bbout); @@ -298,7 +295,7 @@ apr_status_t h2_io_in_read(h2_io *io, apr_bucket_brigade *bb, return status; } -apr_status_t h2_io_in_write(h2_io *io, apr_bucket_brigade *bb) +apr_status_t h2_io_in_write(h2_io *io, const char *d, apr_size_t len, int eos) { if (io->rst_error) { return APR_ECONNABORTED; @@ -307,13 +304,12 @@ apr_status_t h2_io_in_write(h2_io *io, apr_bucket_brigade *bb) if (io->eos_in) { return APR_EOF; } - io->eos_in = h2_util_has_eos(bb, -1); - if (!APR_BRIGADE_EMPTY(bb)) { - if (!io->bbin) { - io->bbin = apr_brigade_create(io->pool, io->bucket_alloc); - io->tmp = apr_brigade_create(io->pool, io->bucket_alloc); - } - return h2_util_move(io->bbin, bb, -1, NULL, "h2_io_in_write"); + if (eos) { + io->eos_in = 1; + } + if (len > 0) { + check_bbin(io); + return apr_brigade_write(io->bbin, NULL, NULL, d, len); } return APR_SUCCESS; } diff --git a/modules/http2/h2_io.h b/modules/http2/h2_io.h index d92b7eb0d4..b9742f99fe 100644 --- a/modules/http2/h2_io.h +++ b/modules/http2/h2_io.h @@ -92,19 +92,10 @@ void h2_io_rst(h2_io *io, int error); int h2_io_is_repeatable(h2_io *io); void h2_io_redo(h2_io *io); -/** - * The input data is completely queued. Blocked reads will return immediately - * and give either data or EOF. - */ -int h2_io_in_has_eos_for(h2_io *io); /** * Output data is available. */ int h2_io_out_has_data(h2_io *io); -/** - * Input data is available. - */ -int h2_io_in_has_data(h2_io *io); void h2_io_signal(h2_io *io, h2_io_op op); void h2_io_signal_init(h2_io *io, h2_io_op op, apr_interval_time_t timeout, @@ -127,7 +118,7 @@ apr_status_t h2_io_in_read(h2_io *io, apr_bucket_brigade *bb, /** * Appends given bucket to the input. */ -apr_status_t h2_io_in_write(h2_io *io, apr_bucket_brigade *bb); +apr_status_t h2_io_in_write(h2_io *io, const char *d, apr_size_t len, int eos); /** * Closes the input. After existing data has been read, APR_EOF will diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index 4d7f63bb52..b60d328959 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -498,7 +498,7 @@ apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block, } apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id, - apr_bucket_brigade *bb) + const char *data, apr_size_t len, int eos) { apr_status_t status; int acquired; @@ -508,7 +508,7 @@ apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id, h2_io *io = h2_io_set_get(m->stream_ios, stream_id); if (io && !io->orphaned) { H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_pre"); - status = h2_io_in_write(io, bb); + status = h2_io_in_write(io, data, len, eos); H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_post"); h2_io_signal(io, H2_IO_READ); io_process_events(m, io); @@ -898,46 +898,6 @@ apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error) return status; } -int h2_mplx_in_has_eos_for(h2_mplx *m, int stream_id) -{ - int has_eos = 0; - int acquired; - - apr_status_t status; - AP_DEBUG_ASSERT(m); - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); - if (io && !io->orphaned) { - has_eos = h2_io_in_has_eos_for(io); - } - else { - has_eos = 1; - } - leave_mutex(m, acquired); - } - return has_eos; -} - -int h2_mplx_in_has_data_for(h2_mplx *m, int stream_id) -{ - apr_status_t status; - int has_data = 0; - int acquired; - - AP_DEBUG_ASSERT(m); - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); - if (io && !io->orphaned) { - has_data = h2_io_in_has_data(io); - } - else { - has_data = 0; - } - leave_mutex(m, acquired); - } - return has_data; -} - int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id) { apr_status_t status; diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index a61a63891a..e33d5e5a2a 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -171,10 +171,6 @@ apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error); */ int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id); -/* Return != 0 iff the multiplexer has input data for the given stream. - */ -int h2_mplx_in_has_data_for(h2_mplx *m, int stream_id); - /** * Waits on output data from any stream in this session to become available. * Returns APR_TIMEUP if no data arrived in the given time. @@ -238,20 +234,14 @@ apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block, * Appends data to the input of the given stream. Storage of input data is * not subject to flow control. */ -apr_status_t h2_mplx_in_write(h2_mplx *mplx, int stream_id, - apr_bucket_brigade *bb); +apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id, + const char *data, apr_size_t len, int eos); /** * Closes the input for the given stream_id. */ apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id); -/** - * Returns != 0 iff the input for the given stream has been closed. There - * could still be data queued, but it can be read without blocking. - */ -int h2_mplx_in_has_eos_for(h2_mplx *m, int stream_id); - /** * Invoke the consumed callback for all streams that had bytes read since the * last call to this function. If no stream had input data consumed, the diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c index d99573850d..78b91efccf 100644 --- a/modules/http2/h2_session.c +++ b/modules/http2/h2_session.c @@ -235,8 +235,11 @@ static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags, } return 0; } - - status = h2_stream_write_data(stream, (const char *)data, len); + + /* FIXME: enabling setting EOS this way seems to break input handling + * in mod_proxy_http2. why? */ + status = h2_stream_write_data(stream, (const char *)data, len, + 0 /*flags & NGHTTP2_FLAG_END_STREAM*/); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c, "h2_stream(%ld-%d): data_chunk_recv, written %ld bytes", session->id, stream_id, (long)len); diff --git a/modules/http2/h2_stream.c b/modules/http2/h2_stream.c index 29df7afd82..2b368b67cf 100644 --- a/modules/http2/h2_stream.c +++ b/modules/http2/h2_stream.c @@ -41,13 +41,6 @@ #include "h2_util.h" -#define H2_STREAM_IN(lvl,s,msg) \ - do { \ - if (APLOG_C_IS_LEVEL((s)->session->c,lvl)) \ - h2_util_bb_log((s)->session->c,(s)->id,lvl,msg,(s)->bbin); \ - } while(0) - - static int state_transition[][7] = { /* ID OP RL RR CI CO CL */ /*ID*/{ 1, 0, 0, 0, 0, 0, 0 }, @@ -144,19 +137,13 @@ static int output_open(h2_stream *stream) static h2_sos *h2_sos_mplx_create(h2_stream *stream, h2_response *response); -h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session) +h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session) { h2_stream *stream = apr_pcalloc(pool, sizeof(h2_stream)); stream->id = id; stream->state = H2_STREAM_ST_IDLE; stream->pool = pool; stream->session = session; - return stream; -} - -h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session) -{ - h2_stream *stream = h2_stream_create(id, pool, session); set_state(stream, H2_STREAM_ST_OPEN); stream->request = h2_request_create(id, pool, h2_config_geti(session->config, H2_CONF_SER_HEADERS)); @@ -296,8 +283,6 @@ apr_status_t h2_stream_schedule(h2_stream *stream, int eos, int push_enabled, if (status == APR_SUCCESS) { if (!eos) { stream->request->body = 1; - stream->bbin = apr_brigade_create(stream->pool, - stream->session->c->bucket_alloc); } stream->input_remaining = stream->request->content_length; @@ -328,33 +313,6 @@ int h2_stream_is_scheduled(const h2_stream *stream) return stream->scheduled; } -static apr_status_t h2_stream_input_flush(h2_stream *stream) -{ - apr_status_t status = APR_SUCCESS; - if (stream->bbin && !APR_BRIGADE_EMPTY(stream->bbin)) { - - status = h2_mplx_in_write(stream->session->mplx, stream->id, stream->bbin); - if (status != APR_SUCCESS) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->mplx->c, - "h2_stream(%ld-%d): flushing input data", - stream->session->id, stream->id); - } - } - return status; -} - -static apr_status_t input_flush(apr_bucket_brigade *bb, void *ctx) -{ - (void)bb; - return h2_stream_input_flush(ctx); -} - -static apr_status_t input_add_data(h2_stream *stream, - const char *data, size_t len) -{ - return apr_brigade_write(stream->bbin, input_flush, stream, data, len); -} - apr_status_t h2_stream_close_input(h2_stream *stream) { apr_status_t status = APR_SUCCESS; @@ -368,28 +326,23 @@ apr_status_t h2_stream_close_input(h2_stream *stream) return APR_ECONNRESET; } - H2_STREAM_IN(APLOG_TRACE2, stream, "close_pre"); - if (close_input(stream) && stream->bbin) { - status = h2_stream_input_flush(stream); - if (status == APR_SUCCESS) { - status = h2_mplx_in_close(stream->session->mplx, stream->id); - } + if (close_input(stream)) { + status = h2_mplx_in_close(stream->session->mplx, stream->id); } - H2_STREAM_IN(APLOG_TRACE2, stream, "close_post"); return status; } apr_status_t h2_stream_write_data(h2_stream *stream, - const char *data, size_t len) + const char *data, size_t len, int eos) { apr_status_t status = APR_SUCCESS; AP_DEBUG_ASSERT(stream); - if (input_closed(stream) || !stream->request->eoh || !stream->bbin) { + if (input_closed(stream) || !stream->request->eoh) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, - "h2_stream(%ld-%d): writing denied, closed=%d, eoh=%d, bbin=%d", + "h2_stream(%ld-%d): writing denied, closed=%d, eoh=%d", stream->session->id, stream->id, input_closed(stream), - stream->request->eoh, !!stream->bbin); + stream->request->eoh); return APR_EINVAL; } @@ -397,7 +350,6 @@ apr_status_t h2_stream_write_data(h2_stream *stream, "h2_stream(%ld-%d): add %ld input bytes", stream->session->id, stream->id, (long)len); - H2_STREAM_IN(APLOG_TRACE2, stream, "write_data_pre"); if (!stream->request->chunked) { stream->input_remaining -= len; if (stream->input_remaining < 0) { @@ -413,11 +365,10 @@ apr_status_t h2_stream_write_data(h2_stream *stream, } } - status = input_add_data(stream, data, len); - if (status == APR_SUCCESS) { - status = h2_stream_input_flush(stream); + status = h2_mplx_in_write(stream->session->mplx, stream->id, data, len, eos); + if (eos) { + close_input(stream); } - H2_STREAM_IN(APLOG_TRACE2, stream, "write_data_post"); return status; } diff --git a/modules/http2/h2_stream.h b/modules/http2/h2_stream.h index 7d724259fa..b7df632502 100644 --- a/modules/http2/h2_stream.h +++ b/modules/http2/h2_stream.h @@ -57,7 +57,6 @@ struct h2_stream { unsigned int submitted : 1; /* response HEADER has been sent */ apr_off_t input_remaining; /* remaining bytes on input as advertised via content-length */ - apr_bucket_brigade *bbin; /* input DATA */ struct h2_sos *sos; /* stream output source, e.g. to read output from */ apr_off_t data_frames_sent; /* # of DATA frames sent out for this stream */ @@ -66,15 +65,6 @@ struct h2_stream { #define H2_STREAM_RST(s, def) (s->rst_error? s->rst_error : (def)) -/** - * Create a stream in IDLE state. - * @param id the stream identifier - * @param pool the memory pool to use for this stream - * @param session the session this stream belongs to - * @return the newly created IDLE stream - */ -h2_stream *h2_stream_create(int id, apr_pool_t *pool, struct h2_session *session); - /** * Create a stream in OPEN state. * @param id the stream identifier @@ -155,7 +145,7 @@ apr_status_t h2_stream_close_input(h2_stream *stream); * @param len the number of bytes to write */ apr_status_t h2_stream_write_data(h2_stream *stream, - const char *data, size_t len); + const char *data, size_t len, int eos); /** * Reset the stream. Stream write/reads will return errors afterwards. -- 2.50.1