From 0b107b1c464cb6ec66d11ac0c4ced0694954d45e Mon Sep 17 00:00:00 2001 From: Stefan Eissing Date: Tue, 3 Nov 2015 14:33:11 +0000 Subject: [PATCH] rework of output handling on stream/session close, rework of cleartext (http:) output to pass buckets to core filters, splitting of stream/io memory pools for stability and less sync git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1712300 13f79535-47bb-0310-9956-ffa450edef68 --- modules/http2/config.m4 | 2 + modules/http2/h2_bucket_eoc.c | 108 ++++++++++++++++++ modules/http2/h2_bucket_eoc.h | 31 ++++++ modules/http2/h2_bucket_eos.c | 108 ++++++++++++++++++ modules/http2/h2_bucket_eos.h | 31 ++++++ modules/http2/h2_config.c | 4 +- modules/http2/h2_conn.c | 31 +++--- modules/http2/h2_conn.h | 3 +- modules/http2/h2_conn_io.c | 53 +++++---- modules/http2/h2_conn_io.h | 7 +- modules/http2/h2_ctx.c | 10 +- modules/http2/h2_ctx.h | 8 +- modules/http2/h2_from_h1.c | 6 +- modules/http2/h2_h2.c | 10 +- modules/http2/h2_h2.h | 3 + modules/http2/h2_io.c | 6 +- modules/http2/h2_io.h | 3 + modules/http2/h2_mplx.c | 194 +++++++++++++++------------------ modules/http2/h2_mplx.h | 30 ++--- modules/http2/h2_request.c | 12 +- modules/http2/h2_session.c | 193 ++++++++++++++++---------------- modules/http2/h2_session.h | 17 +++ modules/http2/h2_stream.c | 142 ++++++++++++------------ modules/http2/h2_stream.h | 13 ++- modules/http2/h2_stream_set.c | 8 +- modules/http2/h2_stream_set.h | 2 +- modules/http2/h2_task.c | 170 +++++++++++------------------ modules/http2/h2_task.h | 31 +----- modules/http2/h2_task_input.c | 48 ++++---- modules/http2/h2_task_input.h | 6 +- modules/http2/h2_task_output.c | 30 ++--- modules/http2/h2_task_output.h | 6 +- modules/http2/h2_to_h1.c | 17 +-- modules/http2/h2_to_h1.h | 3 +- modules/http2/h2_util.c | 6 +- 35 files changed, 783 insertions(+), 569 deletions(-) create mode 100644 modules/http2/h2_bucket_eoc.c create mode 100644 modules/http2/h2_bucket_eoc.h create mode 100644 modules/http2/h2_bucket_eos.c create mode 100644 modules/http2/h2_bucket_eos.h diff --git a/modules/http2/config.m4 b/modules/http2/config.m4 index 9c5eb86740..35b25e11a3 100644 --- a/modules/http2/config.m4 +++ b/modules/http2/config.m4 @@ -20,6 +20,8 @@ dnl # list of module object files http2_objs="dnl mod_http2.lo dnl h2_alt_svc.lo dnl +h2_bucket_eoc.lo dnl +h2_bucket_eos.lo dnl h2_config.lo dnl h2_conn.lo dnl h2_conn_io.lo dnl diff --git a/modules/http2/h2_bucket_eoc.c b/modules/http2/h2_bucket_eoc.c new file mode 100644 index 0000000000..130a9ebf34 --- /dev/null +++ b/modules/http2/h2_bucket_eoc.c @@ -0,0 +1,108 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include +#include +#include + +#include "h2_private.h" +#include "h2_mplx.h" +#include "h2_session.h" +#include "h2_bucket_eoc.h" + +typedef struct { + apr_bucket_refcount refcount; + h2_session *session; +} h2_bucket_eoc; + +static apr_status_t bucket_cleanup(void *data) +{ + h2_session **psession = data; + + if (*psession) { + /* + * If bucket_destroy is called after us, this prevents + * bucket_destroy from trying to destroy the pool again. + */ + *psession = NULL; + } + return APR_SUCCESS; +} + +static apr_status_t bucket_read(apr_bucket *b, const char **str, + apr_size_t *len, apr_read_type_e block) +{ + *str = NULL; + *len = 0; + return APR_SUCCESS; +} + +AP_DECLARE(apr_bucket *) h2_bucket_eoc_make(apr_bucket *b, + h2_session *session) +{ + h2_bucket_eoc *h; + + h = apr_bucket_alloc(sizeof(*h), b->list); + h->session = session; + + b = apr_bucket_shared_make(b, h, 0, 0); + b->type = &ap_bucket_type_h2_eoc; + + return b; +} + +AP_DECLARE(apr_bucket *) h2_bucket_eoc_create(apr_bucket_alloc_t *list, + h2_session *session) +{ + apr_bucket *b = apr_bucket_alloc(sizeof(*b), list); + + APR_BUCKET_INIT(b); + b->free = apr_bucket_free; + b->list = list; + b = h2_bucket_eoc_make(b, session); + if (session) { + h2_bucket_eoc *h = b->data; + apr_pool_pre_cleanup_register(session->pool, &h->session, bucket_cleanup); + } + return b; +} + +static void bucket_destroy(void *data) +{ + h2_bucket_eoc *h = data; + + if (apr_bucket_shared_destroy(h)) { + h2_session *session = h->session; + if (session) { + h2_session_cleanup(session); + } + apr_bucket_free(h); + } +} + +AP_DECLARE_DATA const apr_bucket_type_t ap_bucket_type_h2_eoc = { + "H2EOC", 5, APR_BUCKET_METADATA, + bucket_destroy, + bucket_read, + apr_bucket_setaside_noop, + apr_bucket_split_notimpl, + apr_bucket_shared_copy +}; + diff --git a/modules/http2/h2_bucket_eoc.h b/modules/http2/h2_bucket_eoc.h new file mode 100644 index 0000000000..b8a62158a1 --- /dev/null +++ b/modules/http2/h2_bucket_eoc.h @@ -0,0 +1,31 @@ +/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef mod_http2_h2_bucket_eoc_h +#define mod_http2_h2_bucket_eoc_h + +struct h2_session; + +/** End Of HTTP/2 SESSION (H2EOC) bucket */ +AP_DECLARE_DATA extern const apr_bucket_type_t ap_bucket_type_h2_eoc; + + +AP_DECLARE(apr_bucket *) h2_bucket_eoc_make(apr_bucket *b, + struct h2_session *session); + +AP_DECLARE(apr_bucket *) h2_bucket_eoc_create(apr_bucket_alloc_t *list, + struct h2_session *session); + +#endif /* mod_http2_h2_bucket_eoc_h */ diff --git a/modules/http2/h2_bucket_eos.c b/modules/http2/h2_bucket_eos.c new file mode 100644 index 0000000000..b74754059c --- /dev/null +++ b/modules/http2/h2_bucket_eos.c @@ -0,0 +1,108 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include +#include +#include + +#include "h2_private.h" +#include "h2_mplx.h" +#include "h2_stream.h" +#include "h2_bucket_eos.h" + +typedef struct { + apr_bucket_refcount refcount; + h2_stream *stream; +} h2_bucket_eos; + +static apr_status_t bucket_cleanup(void *data) +{ + h2_stream **pstream = data; + + if (*pstream) { + /* + * If bucket_destroy is called after us, this prevents + * bucket_destroy from trying to destroy the pool again. + */ + *pstream = NULL; + } + return APR_SUCCESS; +} + +static apr_status_t bucket_read(apr_bucket *b, const char **str, + apr_size_t *len, apr_read_type_e block) +{ + *str = NULL; + *len = 0; + return APR_SUCCESS; +} + +AP_DECLARE(apr_bucket *) h2_bucket_eos_make(apr_bucket *b, + h2_stream *stream) +{ + h2_bucket_eos *h; + + h = apr_bucket_alloc(sizeof(*h), b->list); + h->stream = stream; + + b = apr_bucket_shared_make(b, h, 0, 0); + b->type = &ap_bucket_type_h2_eos; + + return b; +} + +AP_DECLARE(apr_bucket *) h2_bucket_eos_create(apr_bucket_alloc_t *list, + h2_stream *stream) +{ + apr_bucket *b = apr_bucket_alloc(sizeof(*b), list); + + APR_BUCKET_INIT(b); + b->free = apr_bucket_free; + b->list = list; + b = h2_bucket_eos_make(b, stream); + if (stream) { + h2_bucket_eos *h = b->data; + apr_pool_pre_cleanup_register(stream->pool, &h->stream, bucket_cleanup); + } + return b; +} + +static void bucket_destroy(void *data) +{ + h2_bucket_eos *h = data; + + if (apr_bucket_shared_destroy(h)) { + h2_stream *stream = h->stream; + if (stream) { + h2_stream_cleanup(stream); + } + apr_bucket_free(h); + } +} + +AP_DECLARE_DATA const apr_bucket_type_t ap_bucket_type_h2_eos = { + "H2EOS", 5, APR_BUCKET_METADATA, + bucket_destroy, + bucket_read, + apr_bucket_setaside_noop, + apr_bucket_split_notimpl, + apr_bucket_shared_copy +}; + diff --git a/modules/http2/h2_bucket_eos.h b/modules/http2/h2_bucket_eos.h new file mode 100644 index 0000000000..e7ab871d15 --- /dev/null +++ b/modules/http2/h2_bucket_eos.h @@ -0,0 +1,31 @@ +/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef mod_http2_h2_bucket_stream_eos_h +#define mod_http2_h2_bucket_stream_eos_h + +struct h2_stream; + +/** End Of HTTP/2 STREAM (H2EOS) bucket */ +AP_DECLARE_DATA extern const apr_bucket_type_t ap_bucket_type_h2_eos; + + +AP_DECLARE(apr_bucket *) h2_bucket_eos_make(apr_bucket *b, + struct h2_stream *stream); + +AP_DECLARE(apr_bucket *) h2_bucket_eos_create(apr_bucket_alloc_t *list, + struct h2_stream *stream); + +#endif /* mod_http2_h2_bucket_stream_eos_h */ diff --git a/modules/http2/h2_config.c b/modules/http2/h2_config.c index 2b1cdd9800..25fdd29908 100644 --- a/modules/http2/h2_config.c +++ b/modules/http2/h2_config.c @@ -316,8 +316,8 @@ static const char *h2_conf_set_session_extra_files(cmd_parms *parms, { h2_config *cfg = h2_config_sget(parms->server); apr_int64_t max = (int)apr_atoi64(value); - if (max <= 0) { - return "value must be a positive number"; + if (max < 0) { + return "value must be a non-negative number"; } cfg->session_extra_files = (int)max; (void)arg; diff --git a/modules/http2/h2_conn.c b/modules/http2/h2_conn.c index b246f87a31..877447e2ff 100644 --- a/modules/http2/h2_conn.c +++ b/modules/http2/h2_conn.c @@ -240,7 +240,7 @@ static apr_status_t h2_conn_loop(h2_session *session) session->c->local_addr->port); if (status != APR_SUCCESS) { h2_session_abort(session, status, rv); - h2_session_destroy(session); + h2_session_cleanup(session); return status; } @@ -343,12 +343,9 @@ static apr_status_t h2_conn_loop(h2_session *session) ap_log_cerror( APLOG_MARK, APLOG_DEBUG, status, session->c, "h2_session(%ld): done", session->id); + h2_session_close(session); ap_update_child_status_from_conn(session->c->sbh, SERVER_CLOSING, session->c); - - h2_session_close(session); - h2_session_destroy(session); - return DONE; } @@ -411,11 +408,11 @@ conn_rec *h2_conn_create(conn_rec *master, apr_pool_t *pool) return c; } -apr_status_t h2_conn_setup(h2_task_env *env, struct h2_worker *worker) +apr_status_t h2_conn_setup(h2_task *task, struct h2_worker *worker) { - conn_rec *master = env->mplx->c; + conn_rec *master = task->mplx->c; - ap_log_perror(APLOG_MARK, APLOG_TRACE3, 0, env->pool, + ap_log_perror(APLOG_MARK, APLOG_TRACE3, 0, task->pool, "h2_conn(%ld): created from master", master->id); /* Ok, we are just about to start processing the connection and @@ -424,17 +421,17 @@ apr_status_t h2_conn_setup(h2_task_env *env, struct h2_worker *worker) * sub-resources from it, so that we get a nice reuse of * pools. */ - env->c.pool = env->pool; - env->c.bucket_alloc = h2_worker_get_bucket_alloc(worker); - env->c.current_thread = h2_worker_get_thread(worker); + task->c->pool = task->pool; + task->c->bucket_alloc = h2_worker_get_bucket_alloc(worker); + task->c->current_thread = h2_worker_get_thread(worker); - env->c.conn_config = ap_create_conn_config(env->pool); - env->c.notes = apr_table_make(env->pool, 5); + task->c->conn_config = ap_create_conn_config(task->pool); + task->c->notes = apr_table_make(task->pool, 5); /* In order to do this in 2.4.x, we need to add a member to conn_rec */ - env->c.master = master; + task->c->master = master; - ap_set_module_config(env->c.conn_config, &core_module, + ap_set_module_config(task->c->conn_config, &core_module, h2_worker_get_socket(worker)); /* This works for mpm_worker so far. Other mpm modules have @@ -446,7 +443,7 @@ apr_status_t h2_conn_setup(h2_task_env *env, struct h2_worker *worker) /* all fine */ break; case H2_MPM_EVENT: - fix_event_conn(&env->c, master); + fix_event_conn(task->c, master); break; default: /* fingers crossed */ @@ -458,7 +455,7 @@ apr_status_t h2_conn_setup(h2_task_env *env, struct h2_worker *worker) * 400 Bad Request * when names do not match. We prefer a predictable 421 status. */ - env->c.keepalives = 1; + task->c->keepalives = 1; return APR_SUCCESS; } diff --git a/modules/http2/h2_conn.h b/modules/http2/h2_conn.h index 49a70db850..752f28a74c 100644 --- a/modules/http2/h2_conn.h +++ b/modules/http2/h2_conn.h @@ -17,7 +17,6 @@ #define __mod_h2__h2_conn__ struct h2_task; -struct h2_task_env; struct h2_worker; /* Process the connection that is now starting the HTTP/2 @@ -52,7 +51,7 @@ h2_mpm_type_t h2_conn_mpm_type(void); conn_rec *h2_conn_create(conn_rec *master, apr_pool_t *stream_pool); -apr_status_t h2_conn_setup(struct h2_task_env *env, struct h2_worker *worker); +apr_status_t h2_conn_setup(struct h2_task *task, struct h2_worker *worker); apr_status_t h2_conn_post(conn_rec *c, struct h2_worker *worker); apr_status_t h2_conn_process(conn_rec *c, apr_socket_t *socket); diff --git a/modules/http2/h2_conn_io.c b/modules/http2/h2_conn_io.c index 9fa42207a5..0299343063 100644 --- a/modules/http2/h2_conn_io.c +++ b/modules/http2/h2_conn_io.c @@ -87,12 +87,6 @@ apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c) return APR_SUCCESS; } -void h2_conn_io_destroy(h2_conn_io *io) -{ - io->input = NULL; - io->output = NULL; -} - int h2_conn_io_is_buffered(h2_conn_io *io) { return io->bufsize > 0; @@ -277,11 +271,17 @@ apr_status_t h2_conn_io_write(h2_conn_io *io, const char *buf, size_t length) { apr_status_t status = APR_SUCCESS; - io->unflushed = 1; + io->unflushed = 1; if (io->bufsize > 0) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, io->connection, "h2_conn_io: buffering %ld bytes", (long)length); + + if (!APR_BRIGADE_EMPTY(io->output)) { + status = h2_conn_io_flush(io); + io->unflushed = 1; + } + while (length > 0 && (status == APR_SUCCESS)) { apr_size_t avail = io->bufsize - io->buflen; if (avail <= 0) { @@ -304,16 +304,6 @@ apr_status_t h2_conn_io_write(h2_conn_io *io, } } - else if (1) { - apr_bucket *b; - - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, io->connection, - "h2_conn_io: passing %ld transient bytes to output filters", - (long)length); - b = apr_bucket_transient_create(buf,length, io->output->bucket_alloc); - APR_BRIGADE_INSERT_TAIL(io->output, b); - status = pass_out(io->output, io); - } else { ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, io->connection, "h2_conn_io: writing %ld bytes to brigade", (long)length); @@ -323,15 +313,38 @@ apr_status_t h2_conn_io_write(h2_conn_io *io, return status; } -apr_status_t h2_conn_io_append(h2_conn_io *io, apr_bucket *b) +apr_status_t h2_conn_io_writeb(h2_conn_io *io, apr_bucket *b) { APR_BRIGADE_INSERT_TAIL(io->output, b); + io->unflushed = 1; return APR_SUCCESS; } -apr_status_t h2_conn_io_pass(h2_conn_io *io, apr_bucket_brigade *bb) +apr_status_t h2_conn_io_consider_flush(h2_conn_io *io) { - return h2_util_move(io->output, bb, 0, NULL, "h2_conn_io_pass"); + apr_status_t status = APR_SUCCESS; + int flush_now = 0; + + /* The HTTP/1.1 network output buffer/flush behaviour does not + * give optimal performance in the HTTP/2 case, as the pattern of + * buckets (data/eor/eos) is different. + * As long as we do not have found out the "best" way to deal with + * this, force a flush at least every WRITE_BUFFER_SIZE amount + * of data which seems to work nicely. + */ + if (io->unflushed) { + apr_off_t len = 0; + if (!APR_BRIGADE_EMPTY(io->output)) { + apr_brigade_length(io->output, 0, &len); + } + len += io->buflen; + flush_now = (len >= WRITE_BUFFER_SIZE); + } + + if (flush_now) { + return h2_conn_io_flush(io); + } + return status; } apr_status_t h2_conn_io_flush(h2_conn_io *io) diff --git a/modules/http2/h2_conn_io.h b/modules/http2/h2_conn_io.h index 342f008ed6..c5a861605c 100644 --- a/modules/http2/h2_conn_io.h +++ b/modules/http2/h2_conn_io.h @@ -44,8 +44,6 @@ typedef struct { apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c); -void h2_conn_io_destroy(h2_conn_io *io); - int h2_conn_io_is_buffered(h2_conn_io *io); typedef apr_status_t (*h2_conn_io_on_read_cb)(const char *data, apr_size_t len, @@ -61,8 +59,9 @@ apr_status_t h2_conn_io_write(h2_conn_io *io, const char *buf, size_t length); -apr_status_t h2_conn_io_append(h2_conn_io *io, apr_bucket *b); -apr_status_t h2_conn_io_pass(h2_conn_io *io, apr_bucket_brigade *bb); +apr_status_t h2_conn_io_writeb(h2_conn_io *io, apr_bucket *b); + +apr_status_t h2_conn_io_consider_flush(h2_conn_io *io); apr_status_t h2_conn_io_flush(h2_conn_io *io); diff --git a/modules/http2/h2_ctx.c b/modules/http2/h2_ctx.c index 422835c2df..08bdd8612d 100644 --- a/modules/http2/h2_ctx.c +++ b/modules/http2/h2_ctx.c @@ -32,11 +32,11 @@ static h2_ctx *h2_ctx_create(const conn_rec *c) return ctx; } -h2_ctx *h2_ctx_create_for(const conn_rec *c, h2_task_env *env) +h2_ctx *h2_ctx_create_for(const conn_rec *c, h2_task *task) { h2_ctx *ctx = h2_ctx_create(c); if (ctx) { - ctx->task_env = env; + ctx->task = task; } return ctx; } @@ -76,7 +76,7 @@ h2_ctx *h2_ctx_server_set(h2_ctx *ctx, server_rec *s) int h2_ctx_is_task(h2_ctx *ctx) { - return ctx && !!ctx->task_env; + return ctx && !!ctx->task; } int h2_ctx_is_active(h2_ctx *ctx) @@ -84,7 +84,7 @@ int h2_ctx_is_active(h2_ctx *ctx) return ctx && ctx->is_h2; } -struct h2_task_env *h2_ctx_get_task(h2_ctx *ctx) +struct h2_task *h2_ctx_get_task(h2_ctx *ctx) { - return ctx->task_env; + return ctx->task; } diff --git a/modules/http2/h2_ctx.h b/modules/http2/h2_ctx.h index 86c59206ed..e4bc7506ae 100644 --- a/modules/http2/h2_ctx.h +++ b/modules/http2/h2_ctx.h @@ -16,7 +16,7 @@ #ifndef __mod_h2__h2_ctx__ #define __mod_h2__h2_ctx__ -struct h2_task_env; +struct h2_task; struct h2_config; /** @@ -30,7 +30,7 @@ struct h2_config; typedef struct h2_ctx { int is_h2; /* h2 engine is used */ const char *protocol; /* the protocol negotiated */ - struct h2_task_env *task_env; /* the h2_task environment or NULL */ + struct h2_task *task; /* the h2_task executing or NULL */ const char *hostname; /* hostname negotiated via SNI, optional */ server_rec *server; /* httpd server config selected. */ struct h2_config *config; /* effective config in this context */ @@ -38,7 +38,7 @@ typedef struct h2_ctx { h2_ctx *h2_ctx_get(const conn_rec *c); h2_ctx *h2_ctx_rget(const request_rec *r); -h2_ctx *h2_ctx_create_for(const conn_rec *c, struct h2_task_env *env); +h2_ctx *h2_ctx_create_for(const conn_rec *c, struct h2_task *task); /* Set the h2 protocol established on this connection context or @@ -58,6 +58,6 @@ const char *h2_ctx_protocol_get(const conn_rec *c); int h2_ctx_is_task(h2_ctx *ctx); int h2_ctx_is_active(h2_ctx *ctx); -struct h2_task_env *h2_ctx_get_task(h2_ctx *ctx); +struct h2_task *h2_ctx_get_task(h2_ctx *ctx); #endif /* defined(__mod_h2__h2_ctx__) */ diff --git a/modules/http2/h2_from_h1.c b/modules/http2/h2_from_h1.c index 2b77db2f00..c763ca56ea 100644 --- a/modules/http2/h2_from_h1.c +++ b/modules/http2/h2_from_h1.c @@ -492,8 +492,8 @@ static h2_response *create_response(h2_from_h1 *from_h1, request_rec *r) apr_status_t h2_response_output_filter(ap_filter_t *f, apr_bucket_brigade *bb) { - h2_task_env *env = f->ctx; - h2_from_h1 *from_h1 = env->output? env->output->from_h1 : NULL; + h2_task *task = f->ctx; + h2_from_h1 *from_h1 = task->output? task->output->from_h1 : NULL; request_rec *r = f->r; apr_bucket *b; ap_bucket_error *eb = NULL; @@ -503,7 +503,7 @@ apr_status_t h2_response_output_filter(ap_filter_t *f, apr_bucket_brigade *bb) ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, "h2_from_h1(%d): output_filter called", from_h1->stream_id); - if (r->header_only && env->output && from_h1->response) { + if (r->header_only && task->output && from_h1->response) { /* throw away any data after we have compiled the response */ apr_brigade_cleanup(bb); return OK; diff --git a/modules/http2/h2_h2.c b/modules/http2/h2_h2.c index 09cc14b488..64ac321db1 100644 --- a/modules/http2/h2_h2.c +++ b/modules/http2/h2_h2.c @@ -662,21 +662,21 @@ int h2_h2_process_conn(conn_rec* c) static int h2_h2_post_read_req(request_rec *r) { h2_ctx *ctx = h2_ctx_rget(r); - struct h2_task_env *env = h2_ctx_get_task(ctx); - if (env) { + struct h2_task *task = h2_ctx_get_task(ctx); + if (task) { /* h2_task connection for a stream, not for h2c */ ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, "adding h1_to_h2_resp output filter"); - if (env->serialize_headers) { + if (task->serialize_headers) { ap_remove_output_filter_byhandle(r->output_filters, "H1_TO_H2_RESP"); - ap_add_output_filter("H1_TO_H2_RESP", env, r, r->connection); + ap_add_output_filter("H1_TO_H2_RESP", task, r, r->connection); } else { /* replace the core http filter that formats response headers * in HTTP/1 with our own that collects status and headers */ ap_remove_output_filter_byhandle(r->output_filters, "HTTP_HEADER"); ap_remove_output_filter_byhandle(r->output_filters, "H2_RESPONSE"); - ap_add_output_filter("H2_RESPONSE", env, r, r->connection); + ap_add_output_filter("H2_RESPONSE", task, r, r->connection); } } return DECLINED; diff --git a/modules/http2/h2_h2.h b/modules/http2/h2_h2.h index f04cc786e3..4cf2785929 100644 --- a/modules/http2/h2_h2.h +++ b/modules/http2/h2_h2.h @@ -49,6 +49,9 @@ extern const char *H2_MAGIC_TOKEN; #define H2_ERR_INADEQUATE_SECURITY (0x0c) #define H2_ERR_HTTP_1_1_REQUIRED (0x0d) +/* Maximum number of padding bytes in a frame, rfc7540 */ +#define H2_MAX_PADLEN 256 + /** * Provide a user readable description of the HTTP/2 error code- * @param h2_error http/2 error code, as in rfc 7540, ch. 7 diff --git a/modules/http2/h2_io.c b/modules/http2/h2_io.c index 119d480554..e952ad0f6b 100644 --- a/modules/http2/h2_io.c +++ b/modules/http2/h2_io.c @@ -23,6 +23,7 @@ #include "h2_private.h" #include "h2_io.h" #include "h2_response.h" +#include "h2_task.h" #include "h2_util.h" h2_io *h2_io_create(int id, apr_pool_t *pool, apr_bucket_alloc_t *bucket_alloc) @@ -39,7 +40,10 @@ h2_io *h2_io_create(int id, apr_pool_t *pool, apr_bucket_alloc_t *bucket_alloc) static void h2_io_cleanup(h2_io *io) { - (void)io; + if (io->task) { + h2_task_destroy(io->task); + io->task = NULL; + } } void h2_io_destroy(h2_io *io) diff --git a/modules/http2/h2_io.h b/modules/http2/h2_io.h index c54c61e2bc..d8769dd118 100644 --- a/modules/http2/h2_io.h +++ b/modules/http2/h2_io.h @@ -35,7 +35,10 @@ struct h2_io { int eos_in; int task_done; int rst_error; + int zombie; + struct h2_task *task; /* task created for this io */ + apr_size_t input_consumed; /* how many bytes have been read */ struct apr_thread_cond_t *input_arrived; /* block on reading */ diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index 754ad57645..ad6bd30e32 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -127,7 +127,6 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, h2_workers *workers) m->q = h2_tq_create(m->pool, h2_config_geti(conf, H2_CONF_MAX_STREAMS)); m->stream_ios = h2_io_set_create(m->pool); m->ready_ios = h2_io_set_create(m->pool); - m->closed = h2_stream_set_create(m->pool); m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM); m->workers = workers; @@ -222,52 +221,18 @@ void h2_mplx_abort(h2_mplx *m) } -h2_stream *h2_mplx_open_io(h2_mplx *m, int stream_id) +static void io_destroy(h2_mplx *m, h2_io *io) { - h2_stream *stream = NULL; - apr_status_t status; - h2_io *io; - - if (m->aborted) { - return NULL; - } - status = apr_thread_mutex_lock(m->lock); - if (APR_SUCCESS == status) { - apr_pool_t *stream_pool = m->spare_pool; - - if (!stream_pool) { - apr_pool_create(&stream_pool, m->pool); - } - else { - m->spare_pool = NULL; - } - - stream = h2_stream_create(stream_id, stream_pool, m); - stream->state = H2_STREAM_ST_OPEN; - - io = h2_io_set_get(m->stream_ios, stream_id); - if (!io) { - io = h2_io_create(stream_id, stream_pool, m->bucket_alloc); - h2_io_set_add(m->stream_ios, io); - } - status = io? APR_SUCCESS : APR_ENOMEM; - apr_thread_mutex_unlock(m->lock); - } - return stream; -} - -static void stream_destroy(h2_mplx *m, h2_stream *stream, h2_io *io) -{ - apr_pool_t *pool = h2_stream_detach_pool(stream); - if (pool) { - apr_pool_clear(pool); - if (m->spare_pool) { - apr_pool_destroy(m->spare_pool); - } - m->spare_pool = pool; - } - h2_stream_destroy(stream); if (io) { + apr_pool_t *pool = io->pool; + if (pool) { + io->pool = NULL; + apr_pool_clear(pool); + if (m->spare_pool) { + apr_pool_destroy(m->spare_pool); + } + m->spare_pool = pool; + } /* The pool is cleared/destroyed which also closes all * allocated file handles. Give this count back to our * file handle pool. */ @@ -278,31 +243,36 @@ static void stream_destroy(h2_mplx *m, h2_stream *stream, h2_io *io) } } -apr_status_t h2_mplx_cleanup_stream(h2_mplx *m, h2_stream *stream) +apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error) { apr_status_t status; + AP_DEBUG_ASSERT(m); + if (m->aborted) { + return APR_ECONNABORTED; + } status = apr_thread_mutex_lock(m->lock); if (APR_SUCCESS == status) { - h2_io *io = h2_io_set_get(m->stream_ios, stream->id); + h2_io *io = h2_io_set_get(m->stream_ios, stream_id); + if (io) { /* Remove io from ready set, we will never submit it */ h2_io_set_remove(m->ready_ios, io); - if (stream->rst_error) { - /* Forward error code to fail any further attempt to - * write to io */ - h2_io_rst(io, stream->rst_error); + + if (io->task_done) { + io_destroy(m, io); + } + else { + /* cleanup once task is done */ + io->zombie = 1; + if (rst_error) { + /* Forward error code to fail any further attempt to + * write to io */ + h2_io_rst(io, rst_error); + } } } - if (!io || io->task_done) { - /* No more io or task already done -> cleanup immediately */ - stream_destroy(m, stream, io); - } - else { - /* Add stream to closed set for cleanup when task is done */ - h2_stream_set_add(m->closed, stream); - } apr_thread_mutex_unlock(m->lock); } return status; @@ -312,21 +282,17 @@ void h2_mplx_task_done(h2_mplx *m, int stream_id) { apr_status_t status = apr_thread_mutex_lock(m->lock); if (APR_SUCCESS == status) { - h2_stream *stream = h2_stream_set_get(m->closed, stream_id); h2_io *io = h2_io_set_get(m->stream_ios, stream_id); ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, "h2_mplx(%ld): task(%d) done", m->id, stream_id); - if (stream) { - /* stream was already closed by main connection and is in - * zombie state. Now that the task is done with it, we - * can free its resources. */ - h2_stream_set_remove(m->closed, stream); - stream_destroy(m, stream, io); - } - else if (io) { - /* main connection has not finished stream. Mark task as done - * so that eventual cleanup can start immediately. */ + if (io) { io->task_done = 1; + if (io->zombie) { + io_destroy(m, io); + } + else { + /* hang around until the stream deregisteres */ + } } apr_thread_mutex_unlock(m->lock); } @@ -506,11 +472,11 @@ apr_status_t h2_mplx_out_read_to(h2_mplx *m, int stream_id, if (APR_SUCCESS == status) { h2_io *io = h2_io_set_get(m->stream_ios, stream_id); if (io) { - H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_readx_pre"); + H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_read_to_pre"); status = h2_io_out_read_to(io, bb, plen, peos); - H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_readx_post"); + H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_read_to_post"); if (status == APR_SUCCESS && io->output_drained) { apr_thread_cond_signal(io->output_drained); } @@ -614,8 +580,9 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response, if (io) { if (f) { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c, - "h2_mplx(%ld-%d): open response: %s", - m->id, stream_id, response->status); + "h2_mplx(%ld-%d): open response: %s, rst=%d", + m->id, stream_id, response->status, + response->rst_error); } h2_io_set_response(io, response); @@ -752,11 +719,8 @@ apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error) H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_rst"); have_out_data_for(m, stream_id); - if (m->aborted) { - /* if we were the last output, the whole session might - * have gone down in the meantime. - */ - return APR_SUCCESS; + if (io->output_drained) { + apr_thread_cond_signal(io->output_drained); } } else { @@ -873,8 +837,28 @@ apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx) return status; } -apr_status_t h2_mplx_do_task(h2_mplx *m, struct h2_task *task, - h2_stream_pri_cmp *cmp, void *ctx) +static h2_io *open_io(h2_mplx *m, int stream_id) +{ + apr_pool_t *io_pool = m->spare_pool; + h2_io *io; + + if (!io_pool) { + apr_pool_create(&io_pool, m->pool); + } + else { + m->spare_pool = NULL; + } + + io = h2_io_create(stream_id, io_pool, m->bucket_alloc); + h2_io_set_add(m->stream_ios, io); + + return io; +} + + +apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, + struct h2_request *r, int eos, + h2_stream_pri_cmp *cmp, void *ctx) { apr_status_t status; @@ -884,17 +868,32 @@ apr_status_t h2_mplx_do_task(h2_mplx *m, struct h2_task *task, } status = apr_thread_mutex_lock(m->lock); if (APR_SUCCESS == status) { + conn_rec *c; + h2_io *io; cmp_ctx x; - x.cmp = cmp; - x.ctx = ctx; - h2_tq_add(m->q, task, task_cmp, &x); + io = open_io(m, stream_id); + c = h2_conn_create(m->c, io->pool); + io->task = h2_task_create(m->id, stream_id, io->pool, m, c); + + status = h2_request_end_headers(r, m, io->task, eos); + if (status == APR_SUCCESS && eos) { + status = h2_io_in_close(io); + } - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx: do task(%s)", task->id); + if (status == APR_SUCCESS) { + x.cmp = cmp; + x.ctx = ctx; + h2_tq_add(m->q, io->task, task_cmp, &x); + } + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c, + "h2_mplx(%ld-%d): process", m->c->id, stream_id); apr_thread_mutex_unlock(m->lock); } - workers_register(m); + + if (status == APR_SUCCESS) { + workers_register(m); + } return status; } @@ -910,30 +909,9 @@ h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more) status = apr_thread_mutex_lock(m->lock); if (APR_SUCCESS == status) { task = h2_tq_shift(m->q); - if (task) { - h2_task_set_started(task); - } *has_more = !h2_tq_empty(m->q); apr_thread_mutex_unlock(m->lock); } return task; } -apr_status_t h2_mplx_create_task(h2_mplx *m, struct h2_stream *stream) -{ - apr_status_t status; - AP_DEBUG_ASSERT(m); - if (m->aborted) { - return APR_ECONNABORTED; - } - status = apr_thread_mutex_lock(m->lock); - if (APR_SUCCESS == status) { - conn_rec *c = h2_conn_create(m->c, stream->pool); - stream->task = h2_task_create(m->id, stream->id, - stream->pool, m, c); - - apr_thread_mutex_unlock(m->lock); - } - return status; -} - diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index 40c715a386..2bb650535e 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -41,6 +41,7 @@ struct h2_config; struct h2_response; struct h2_task; struct h2_stream; +struct h2_request; struct h2_io_set; struct apr_thread_cond_t; struct h2_workers; @@ -70,8 +71,7 @@ struct h2_mplx { int aborted; apr_size_t stream_max_mem; - apr_pool_t *spare_pool; /* spare pool, ready for next stream */ - struct h2_stream_set *closed; /* streams closed, but task ongoing */ + apr_pool_t *spare_pool; /* spare pool, ready for next io */ struct h2_workers *workers; int file_handles_allowed; }; @@ -120,15 +120,16 @@ void h2_mplx_task_done(h2_mplx *m, int stream_id); /******************************************************************************* * IO lifetime of streams. ******************************************************************************/ -/** - * Prepares the multiplexer to handle in-/output on the given stream id. - */ -struct h2_stream *h2_mplx_open_io(h2_mplx *mplx, int stream_id); /** - * Ends cleanup of a stream in sync with execution thread. + * Notifies mplx that a stream has finished processing. + * + * @param m the mplx itself + * @param stream_id the id of the stream being done + * @param rst_error if != 0, the stream was reset with the error given + * */ -apr_status_t h2_mplx_cleanup_stream(h2_mplx *m, struct h2_stream *stream); +apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error); /* Return != 0 iff the multiplexer has data for the given stream. */ @@ -146,15 +147,18 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout, ******************************************************************************/ /** - * Schedule a task for execution. + * Process a stream request. * * @param m the multiplexer - * @param task the task to schedule + * @param stream_id the identifier of the stream + * @param r the request to be processed + * @param eos if input is complete * @param cmp the stream priority compare function * @param ctx context data for the compare function */ -apr_status_t h2_mplx_do_task(h2_mplx *m, struct h2_task *task, - h2_stream_pri_cmp *cmp, void *ctx); +apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, + struct h2_request *r, int eos, + h2_stream_pri_cmp *cmp, void *ctx); /** * Stream priorities have changed, reschedule pending tasks. @@ -167,8 +171,6 @@ apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx) struct h2_task *h2_mplx_pop_task(h2_mplx *mplx, int *has_more); -apr_status_t h2_mplx_create_task(h2_mplx *mplx, struct h2_stream *stream); - /******************************************************************************* * Input handling of streams. ******************************************************************************/ diff --git a/modules/http2/h2_request.c b/modules/http2/h2_request.c index ca9a362cb5..4a1e19058a 100644 --- a/modules/http2/h2_request.c +++ b/modules/http2/h2_request.c @@ -151,13 +151,21 @@ apr_status_t h2_request_write_data(h2_request *req, apr_status_t h2_request_end_headers(h2_request *req, struct h2_mplx *m, h2_task *task, int eos) { + apr_status_t status; + if (!req->to_h1) { - apr_status_t status = insert_request_line(req, m); + status = insert_request_line(req, m); if (status != APR_SUCCESS) { return status; } } - return h2_to_h1_end_headers(req->to_h1, task, eos); + status = h2_to_h1_end_headers(req->to_h1, eos); + h2_task_set_request(task, req->to_h1->method, + req->to_h1->scheme, + req->to_h1->authority, + req->to_h1->path, + req->to_h1->headers, eos); + return status; } apr_status_t h2_request_close(h2_request *req) diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c index 93962def20..fc0287042b 100644 --- a/modules/http2/h2_session.c +++ b/modules/http2/h2_session.c @@ -24,6 +24,8 @@ #include #include "h2_private.h" +#include "h2_bucket_eoc.h" +#include "h2_bucket_eos.h" #include "h2_config.h" #include "h2_h2.h" #include "h2_mplx.h" @@ -56,29 +58,32 @@ static int h2_session_status_from_apr_status(apr_status_t rv) static int stream_open(h2_session *session, int stream_id) { h2_stream * stream; + apr_pool_t *stream_pool; if (session->aborted) { return NGHTTP2_ERR_CALLBACK_FAILURE; } - stream = h2_mplx_open_io(session->mplx, stream_id); - if (stream) { - h2_stream_set_add(session->streams, stream); - if (stream->id > session->max_stream_received) { - session->max_stream_received = stream->id; - } - - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, - "h2_session: stream(%ld-%d): opened", - session->id, stream_id); - - return 0; + if (session->spare) { + stream_pool = session->spare; + session->spare = NULL; + } + else { + apr_pool_create(&stream_pool, session->pool); } - ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, session->c, - APLOGNO(02918) - "h2_session: stream(%ld-%d): unable to create", + stream = h2_stream_create(stream_id, stream_pool, session); + stream->state = H2_STREAM_ST_OPEN; + + h2_stream_set_add(session->streams, stream); + if (stream->id > session->max_stream_received) { + session->max_stream_received = stream->id; + } + + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, + "h2_session: stream(%ld-%d): opened", session->id, stream_id); - return NGHTTP2_ERR_INVALID_STREAM_ID; + + return 0; } /** @@ -247,8 +252,6 @@ static int before_frame_send_cb(nghttp2_session *ngh2, case NGHTTP2_GOAWAY: session->flush = 1; break; - case NGHTTP2_DATA: - default: break; @@ -317,8 +320,9 @@ static apr_status_t stream_destroy(h2_session *session, h2_stream_rst(stream, error_code); } - h2_stream_set_remove(session->streams, stream); - return h2_mplx_cleanup_stream(session->mplx, stream); + return h2_conn_io_writeb(&session->io, + h2_bucket_eos_create(session->c->bucket_alloc, + stream)); } static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id, @@ -342,9 +346,6 @@ static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id, session->id, (int)stream_id, error_code); } - /* always flush on eos */ - session->flush = 1; - return 0; } @@ -522,7 +523,8 @@ static apr_status_t pass_data(void *ctx, return h2_conn_io_write(&((h2_session*)ctx)->io, data, length); } -static char immortal_zeros[256]; + +static char immortal_zeros[H2_MAX_PADLEN]; static int on_send_data_cb(nghttp2_session *ngh2, nghttp2_frame *frame, @@ -575,9 +577,7 @@ static int on_send_data_cb(nghttp2_session *ngh2, if (status == APR_SUCCESS && padlen) { if (padlen) { - char pad[256]; - memset(pad, 0, padlen); - status = h2_conn_io_write(&session->io, pad, padlen); + status = h2_conn_io_write(&session->io, immortal_zeros, padlen); } } } @@ -591,7 +591,7 @@ static int on_send_data_cb(nghttp2_session *ngh2, } b = apr_bucket_pool_create(header, padlen? 10 : 9, stream->pool, session->c->bucket_alloc); - status = h2_conn_io_append(&session->io, b); + status = h2_conn_io_writeb(&session->io, b); if (status == APR_SUCCESS) { apr_size_t len = length; @@ -605,12 +605,14 @@ static int on_send_data_cb(nghttp2_session *ngh2, if (status == APR_SUCCESS && padlen) { b = apr_bucket_immortal_create(immortal_zeros, padlen, session->c->bucket_alloc); - status = h2_conn_io_append(&session->io, b); + status = h2_conn_io_writeb(&session->io, b); } } if (status == APR_SUCCESS) { + stream->data_frames_sent++; + h2_conn_io_consider_flush(&session->io); return 0; } else { @@ -779,13 +781,11 @@ void h2_session_destroy(h2_session *session) nghttp2_session_del(session->ngh2); session->ngh2 = NULL; } - h2_conn_io_destroy(&session->io); - - if (session->iowait) { - apr_thread_cond_destroy(session->iowait); - session->iowait = NULL; - } - +} + +void h2_session_cleanup(h2_session *session) +{ + h2_session_destroy(session); if (session->pool) { apr_pool_destroy(session->pool); } @@ -796,14 +796,18 @@ static apr_status_t h2_session_abort_int(h2_session *session, int reason) AP_DEBUG_ASSERT(session); if (!session->aborted) { session->aborted = 1; - if (session->ngh2) { - - if (!reason) { + + if (session->ngh2) { + if (NGHTTP2_ERR_EOF == reason) { + /* This is our way of indication that the connection is + * gone. No use to send any GOAWAY frames. */ + nghttp2_session_terminate_session(session->ngh2, reason); + } + else if (!reason) { nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, session->max_stream_received, reason, NULL, 0); nghttp2_session_send(session->ngh2); - h2_conn_io_flush(&session->io); } else { const char *err = nghttp2_strerror(reason); @@ -812,22 +816,15 @@ static apr_status_t h2_session_abort_int(h2_session *session, int reason) "session(%ld): aborting session, reason=%d %s", session->id, reason, err); - if (NGHTTP2_ERR_EOF == reason) { - /* This is our way of indication that the connection is - * gone. No use to send any GOAWAY frames. */ - nghttp2_session_terminate_session(session->ngh2, reason); - } - else { - /* The connection might still be there and we shut down - * with GOAWAY and reason information. */ - nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, - session->max_stream_received, - reason, (const uint8_t *)err, - strlen(err)); - nghttp2_session_send(session->ngh2); - h2_conn_io_flush(&session->io); - } + /* The connection might still be there and we shut down + * with GOAWAY and reason information. */ + nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, + session->max_stream_received, + reason, (const uint8_t *)err, + strlen(err)); + nghttp2_session_send(session->ngh2); } + h2_conn_io_flush(&session->io); } h2_mplx_abort(session->mplx); } @@ -973,7 +970,7 @@ static int resume_on_data(void *ctx, h2_stream *stream) { AP_DEBUG_ASSERT(stream); if (h2_stream_is_suspended(stream)) { - if (h2_mplx_out_has_data_for(stream->m, stream->id)) { + if (h2_mplx_out_has_data_for(stream->session->mplx, stream->id)) { int rv; h2_stream_set_suspended(stream, 0); ++rctx->resume_count; @@ -1148,30 +1145,15 @@ apr_status_t h2_session_read(h2_session *session, apr_read_type_e block) apr_status_t h2_session_close(h2_session *session) { AP_DEBUG_ASSERT(session); - return session->aborted? APR_SUCCESS : h2_conn_io_flush(&session->io); -} - -/* The session wants to send more DATA for the given stream. - */ - -typedef struct { - char *buf; - size_t offset; - h2_session *session; - h2_stream *stream; -} cpy_ctx; - -static apr_status_t copy_buffer(void *ctx, const char *data, apr_size_t len) -{ - cpy_ctx *c = ctx; - - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c->session->c, - "h2_stream(%ld-%d): copy %ld bytes for DATA #%ld", - c->session->id, c->stream->id, - (long)len, (long)c->stream->data_frames_sent); - memcpy(c->buf + c->offset, data, len); - c->offset += len; - return APR_SUCCESS; + if (!session->aborted) { + h2_session_abort_int(session, 0); + } + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0,session->c, + "h2_session: closing, writing eoc"); + h2_conn_io_writeb(&session->io, + h2_bucket_eoc_create(session->c->bucket_alloc, + session)); + return h2_conn_io_flush(&session->io); } static ssize_t stream_data_cb(nghttp2_session *ng2s, @@ -1189,12 +1171,21 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s, h2_stream *stream; AP_DEBUG_ASSERT(session); + /* The session wants to send more DATA for the stream. We need + * to find out how much of the requested length we can send without + * blocking. + * Indicate EOS when we encounter it or DEFERRED if the stream + * should be suspended. + * TODO: for handling of TRAILERS, the EOF indication needs + * to be aware of that. + */ + (void)ng2s; (void)buf; (void)source; stream = h2_stream_set_get(session->streams, stream_id); if (!stream) { - ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_NOTFOUND, session->c, + ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c, APLOGNO(02937) "h2_stream(%ld-%d): data requested but stream not found", session->id, (int)stream_id); @@ -1203,25 +1194,9 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s, AP_DEBUG_ASSERT(!h2_stream_is_suspended(stream)); - if (1 || h2_conn_io_is_buffered(&session->io)) { - status = h2_stream_prep_read(stream, &nread, &eos); - if (nread) { - *data_flags |= NGHTTP2_DATA_FLAG_NO_COPY; - } - } - else { - cpy_ctx ctx; - ctx.buf = (char *)buf; - ctx.offset = 0; - ctx.session = session; - ctx.stream = stream; - - status = h2_stream_readx(stream, copy_buffer, &ctx, &nread, &eos); - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, - "h2_stream(%ld-%d): read %ld bytes (DATA #%ld)", - session->id, (int)stream_id, (long)nread, - (long)stream->data_frames_sent); - stream->data_frames_sent++; + status = h2_stream_prep_read(stream, &nread, &eos); + if (nread) { + *data_flags |= NGHTTP2_DATA_FLAG_NO_COPY; } switch (status) { @@ -1332,6 +1307,24 @@ apr_status_t h2_session_handle_response(h2_session *session, h2_stream *stream) return status; } +apr_status_t h2_session_stream_destroy(h2_session *session, h2_stream *stream) +{ + apr_pool_t *pool = h2_stream_detach_pool(stream); + + h2_mplx_stream_done(session->mplx, stream->id, stream->rst_error); + h2_stream_set_remove(session->streams, stream->id); + h2_stream_destroy(stream); + + if (pool) { + apr_pool_clear(pool); + if (session->spare) { + apr_pool_destroy(session->spare); + } + session->spare = pool; + } + return APR_SUCCESS; +} + int h2_session_is_done(h2_session *session) { AP_DEBUG_ASSERT(session); diff --git a/modules/http2/h2_session.h b/modules/http2/h2_session.h index b7c55fc9b9..a1d718a9ca 100644 --- a/modules/http2/h2_session.h +++ b/modules/http2/h2_session.h @@ -77,6 +77,8 @@ struct h2_session { int max_stream_received; /* highest stream id created */ int max_stream_handled; /* highest stream id handled successfully */ + apr_pool_t *spare; /* spare stream pool */ + struct nghttp2_session *ngh2; /* the nghttp2 session (internal use) */ struct h2_workers *workers; /* for executing stream tasks */ }; @@ -111,6 +113,13 @@ h2_session *h2_session_rcreate(request_rec *r, struct h2_config *cfg, */ void h2_session_destroy(h2_session *session); +/** + * Cleanup the session and all objects it still contains. This will not + * destroy h2_task instances that have not finished yet. + * @param session the session to destroy + */ +void h2_session_cleanup(h2_session *session); + /** * Called once at start of session. * Sets up the session and sends the initial SETTINGS frame. @@ -160,4 +169,12 @@ apr_status_t h2_session_handle_response(h2_session *session, /* Get the h2_stream for the given stream idenrtifier. */ struct h2_stream *h2_session_get_stream(h2_session *session, int stream_id); +/** + * Destroy the stream and release it everywhere. Reclaim all resources. + * @param session the session to which the stream belongs + * @param stream the stream to destroy + */ +apr_status_t h2_session_stream_destroy(h2_session *session, + struct h2_stream *stream); + #endif /* defined(__mod_h2__h2_session__) */ diff --git a/modules/http2/h2_stream.c b/modules/http2/h2_stream.c index b67a84b129..5a5af6372b 100644 --- a/modules/http2/h2_stream.c +++ b/modules/http2/h2_stream.c @@ -16,8 +16,6 @@ #include #include -#define APR_POOL_DEBUG 7 - #include #include #include @@ -30,6 +28,7 @@ #include "h2_mplx.h" #include "h2_request.h" #include "h2_response.h" +#include "h2_session.h" #include "h2_stream.h" #include "h2_task.h" #include "h2_ctx.h" @@ -46,46 +45,47 @@ static void set_state(h2_stream *stream, h2_stream_state_t state) } } -h2_stream *h2_stream_create(int id, apr_pool_t *pool, struct h2_mplx *m) +h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session) { h2_stream *stream = apr_pcalloc(pool, sizeof(h2_stream)); if (stream != NULL) { stream->id = id; stream->state = H2_STREAM_ST_IDLE; stream->pool = pool; - stream->m = m; - stream->request = h2_request_create(id, pool, m->c->bucket_alloc); - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, - "h2_stream(%ld-%d): created", m->id, stream->id); + stream->session = session; + stream->bbout = apr_brigade_create(stream->pool, + stream->session->c->bucket_alloc); + stream->request = h2_request_create(id, pool, session->c->bucket_alloc); + + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, + "h2_stream(%ld-%d): created", session->id, stream->id); } return stream; } -static void h2_stream_cleanup(h2_stream *stream) +apr_status_t h2_stream_destroy(h2_stream *stream) { + AP_DEBUG_ASSERT(stream); + + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->session->c, + "h2_stream(%ld-%d): destroy", stream->session->id, stream->id); if (stream->request) { h2_request_destroy(stream->request); stream->request = NULL; } -} - -apr_status_t h2_stream_destroy(h2_stream *stream) -{ - AP_DEBUG_ASSERT(stream); - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->m->c, - "h2_stream(%ld-%d): destroy", stream->m->id, stream->id); - h2_stream_cleanup(stream); - if (stream->task) { - h2_task_destroy(stream->task); - stream->task = NULL; - } if (stream->pool) { apr_pool_destroy(stream->pool); } return APR_SUCCESS; } +void h2_stream_cleanup(h2_stream *stream) +{ + h2_session_stream_destroy(stream->session, stream); + /* stream is gone */ +} + apr_pool_t *h2_stream_detach_pool(h2_stream *stream) { apr_pool_t *pool = stream->pool; @@ -96,9 +96,9 @@ apr_pool_t *h2_stream_detach_pool(h2_stream *stream) void h2_stream_rst(h2_stream *stream, int error_code) { stream->rst_error = error_code; - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->m->c, + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->session->c, "h2_stream(%ld-%d): reset, error=%d", - stream->m->id, stream->id, error_code); + stream->session->id, stream->id, error_code); } apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response, @@ -109,27 +109,20 @@ apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response, stream->response = response; if (bb && !APR_BRIGADE_EMPTY(bb)) { int move_all = INT_MAX; - if (!stream->bbout) { - stream->bbout = apr_brigade_create(stream->pool, - stream->m->c->bucket_alloc); - } /* we can move file handles from h2_mplx into this h2_stream as many * as we want, since the lifetimes are the same and we are not freeing * the ones in h2_mplx->io before this stream is done. */ status = h2_util_move(stream->bbout, bb, 16 * 1024, &move_all, "h2_stream_set_response"); } - if (APLOGctrace1(stream->m->c)) { + if (APLOGctrace1(stream->session->c)) { apr_size_t len = 0; int eos = 0; - if (stream->bbout) { - h2_util_bb_avail(stream->bbout, &len, &eos); - } - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, stream->m->c, - "h2_stream(%ld-%d): set_response(%s), brigade=%s, " - "len=%ld, eos=%d", - stream->m->id, stream->id, response->status, - (stream->bbout? "yes" : "no"), (long)len, (int)eos); + h2_util_bb_avail(stream->bbout, &len, &eos); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, stream->session->c, + "h2_stream(%ld-%d): set_response(%s), len=%ld, eos=%d", + stream->session->id, stream->id, response->status, + (long)len, (int)eos); } return status; } @@ -160,7 +153,7 @@ apr_status_t h2_stream_rwrite(h2_stream *stream, request_rec *r) return APR_ECONNRESET; } set_state(stream, H2_STREAM_ST_OPEN); - status = h2_request_rwrite(stream->request, r, stream->m); + status = h2_request_rwrite(stream->request, r, stream->session->mplx); return status; } @@ -176,32 +169,26 @@ apr_status_t h2_stream_schedule(h2_stream *stream, int eos, /* Seeing the end-of-headers, we have everything we need to * start processing it. */ - status = h2_mplx_create_task(stream->m, stream); - if (status == APR_SUCCESS) { - status = h2_request_end_headers(stream->request, - stream->m, stream->task, eos); - if (status == APR_SUCCESS) { - status = h2_mplx_do_task(stream->m, stream->task, cmp, ctx); - } - if (eos) { - status = h2_stream_write_eos(stream); - } - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, stream->m->c, - "h2_mplx(%ld-%d): start stream, task %s %s (%s)", - stream->m->id, stream->id, - stream->request->method, stream->request->path, - stream->request->authority); - + status = h2_mplx_process(stream->session->mplx, stream->id, + stream->request, eos, cmp, ctx); + if (eos) { + set_closed(stream); } + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, stream->session->c, + "h2_mplx(%ld-%d): start stream, task %s %s (%s)", + stream->session->id, stream->id, + stream->request->method, stream->request->path, + stream->request->authority); + return status; } apr_status_t h2_stream_write_eos(h2_stream *stream) { AP_DEBUG_ASSERT(stream); - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->m->c, + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->session->c, "h2_stream(%ld-%d): closing input", - stream->m->id, stream->id); + stream->session->id, stream->id); if (stream->rst_error) { return APR_ECONNRESET; } @@ -229,7 +216,7 @@ apr_status_t h2_stream_write_header(h2_stream *stream, return APR_EINVAL; } return h2_request_write_header(stream->request, name, nlen, - value, vlen, stream->m); + value, vlen, stream->session->mplx); } apr_status_t h2_stream_write_data(h2_stream *stream, @@ -257,7 +244,8 @@ apr_status_t h2_stream_prep_read(h2_stream *stream, if (stream->rst_error) { return APR_ECONNRESET; } - if (stream->bbout && !APR_BRIGADE_EMPTY(stream->bbout)) { + + if (!APR_BRIGADE_EMPTY(stream->bbout)) { src = "stream"; status = h2_util_bb_avail(stream->bbout, plen, peos); if (status == APR_SUCCESS && !*peos && !*plen) { @@ -267,15 +255,15 @@ apr_status_t h2_stream_prep_read(h2_stream *stream, } else { src = "mplx"; - status = h2_mplx_out_readx(stream->m, stream->id, + status = h2_mplx_out_readx(stream->session->mplx, stream->id, NULL, NULL, plen, peos); } if (status == APR_SUCCESS && !*peos && !*plen) { status = APR_EAGAIN; } - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->m->c, + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c, "h2_stream(%ld-%d): prep_read %s, len=%ld eos=%d", - stream->m->id, stream->id, src, (long)*plen, *peos); + stream->session->id, stream->id, src, (long)*plen, *peos); return status; } @@ -289,24 +277,31 @@ apr_status_t h2_stream_readx(h2_stream *stream, if (stream->rst_error) { return APR_ECONNRESET; } - if (stream->bbout && !APR_BRIGADE_EMPTY(stream->bbout)) { + *peos = 0; + if (!APR_BRIGADE_EMPTY(stream->bbout)) { + apr_size_t origlen = *plen; + src = "stream"; status = h2_util_bb_readx(stream->bbout, cb, ctx, plen, peos); if (status == APR_SUCCESS && !*peos && !*plen) { apr_brigade_cleanup(stream->bbout); + *plen = origlen; return h2_stream_readx(stream, cb, ctx, plen, peos); } } else { src = "mplx"; - status = h2_mplx_out_readx(stream->m, stream->id, cb, ctx, plen, peos); + status = h2_mplx_out_readx(stream->session->mplx, stream->id, + cb, ctx, plen, peos); } + if (status == APR_SUCCESS && !*peos && !*plen) { status = APR_EAGAIN; } - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->m->c, + + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c, "h2_stream(%ld-%d): readx %s, len=%ld eos=%d", - stream->m->id, stream->id, src, (long)*plen, *peos); + stream->session->id, stream->id, src, (long)*plen, *peos); return status; } @@ -318,18 +313,29 @@ apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb, if (stream->rst_error) { return APR_ECONNRESET; } - if (stream->bbout && !APR_BRIGADE_EMPTY(stream->bbout)) { - status = h2_transfer_brigade(bb, stream->bbout, bb->p, plen, peos); + + if (APR_BRIGADE_EMPTY(stream->bbout)) { + apr_size_t tlen = *plen; + int eos; + status = h2_mplx_out_read_to(stream->session->mplx, stream->id, + stream->bbout, &tlen, &eos); + } + + if (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(stream->bbout)) { + status = h2_transfer_brigade(bb, stream->bbout, stream->pool, + plen, peos); } else { - status = h2_mplx_out_read_to(stream->m, stream->id, bb, plen, peos); + *plen = 0; + *peos = 0; } + if (status == APR_SUCCESS && !*peos && !*plen) { status = APR_EAGAIN; } - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->m->c, + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c, "h2_stream(%ld-%d): read_to, len=%ld eos=%d", - stream->m->id, stream->id, (long)*plen, *peos); + stream->session->id, stream->id, (long)*plen, *peos); return status; } diff --git a/modules/http2/h2_stream.h b/modules/http2/h2_stream.h index f4dc05537a..d09d848a66 100644 --- a/modules/http2/h2_stream.h +++ b/modules/http2/h2_stream.h @@ -48,6 +48,7 @@ typedef enum { struct h2_mplx; struct h2_request; struct h2_response; +struct h2_session; struct h2_task; typedef struct h2_stream h2_stream; @@ -55,28 +56,30 @@ typedef struct h2_stream h2_stream; struct h2_stream { int id; /* http2 stream id */ h2_stream_state_t state; /* http/2 state of this stream */ - struct h2_mplx *m; /* the multiplexer to work with */ + struct h2_session *session; /* the session this stream belongs to */ int aborted; /* was aborted */ int suspended; /* DATA sending has been suspended */ - apr_size_t data_frames_sent;/* # of DATA frames sent out for this stream */ + int rst_error; /* stream error for RST_STREAM */ apr_pool_t *pool; /* the memory pool for this stream */ struct h2_request *request; /* the request made in this stream */ - struct h2_task *task; /* task created for this stream */ struct h2_response *response; /* the response, once ready */ + apr_bucket_brigade *bbout; /* output DATA */ - int rst_error; /* stream error for RST_STREAM */ + apr_size_t data_frames_sent;/* # of DATA frames sent out for this stream */ }; #define H2_STREAM_RST(s, def) (s->rst_error? s->rst_error : (def)) -h2_stream *h2_stream_create(int id, apr_pool_t *pool, struct h2_mplx *m); +h2_stream *h2_stream_create(int id, apr_pool_t *pool, struct h2_session *session); apr_status_t h2_stream_destroy(h2_stream *stream); +void h2_stream_cleanup(h2_stream *stream); + void h2_stream_rst(h2_stream *streamm, int error_code); apr_pool_t *h2_stream_detach_pool(h2_stream *stream); diff --git a/modules/http2/h2_stream_set.c b/modules/http2/h2_stream_set.c index dddd2e3990..5ef48a13e1 100644 --- a/modules/http2/h2_stream_set.c +++ b/modules/http2/h2_stream_set.c @@ -54,9 +54,7 @@ void h2_stream_set_destroy(h2_stream_set *sp) static int h2_stream_id_cmp(const void *s1, const void *s2) { - h2_stream **pstream1 = (h2_stream **)s1; - h2_stream **pstream2 = (h2_stream **)s2; - return (*pstream1)->id - (*pstream2)->id; + return (*((h2_stream **)s1))->id - (*((h2_stream **)s2))->id; } h2_stream *h2_stream_set_get(h2_stream_set *sp, int stream_id) @@ -101,12 +99,12 @@ apr_status_t h2_stream_set_add(h2_stream_set *sp, h2_stream *stream) return APR_SUCCESS; } -h2_stream *h2_stream_set_remove(h2_stream_set *sp, h2_stream *stream) +h2_stream *h2_stream_set_remove(h2_stream_set *sp, int stream_id) { int i; for (i = 0; i < sp->list->nelts; ++i) { h2_stream *s = H2_STREAM_IDX(sp->list, i); - if (s == stream) { + if (s->id == stream_id) { int n; --sp->list->nelts; n = sp->list->nelts - i; diff --git a/modules/http2/h2_stream_set.h b/modules/http2/h2_stream_set.h index 5607583455..8bc6409223 100644 --- a/modules/http2/h2_stream_set.h +++ b/modules/http2/h2_stream_set.h @@ -35,7 +35,7 @@ apr_status_t h2_stream_set_add(h2_stream_set *sp, h2_stream *stream); h2_stream *h2_stream_set_get(h2_stream_set *sp, int stream_id); -h2_stream *h2_stream_set_remove(h2_stream_set *sp,h2_stream *stream); +h2_stream *h2_stream_set_remove(h2_stream_set *sp, int stream_id); void h2_stream_set_remove_all(h2_stream_set *sp); diff --git a/modules/http2/h2_task.c b/modules/http2/h2_task.c index 6b3b3d2b81..ee78f4347e 100644 --- a/modules/http2/h2_task.c +++ b/modules/http2/h2_task.c @@ -52,35 +52,37 @@ static apr_status_t h2_filter_stream_input(ap_filter_t* filter, ap_input_mode_t mode, apr_read_type_e block, apr_off_t readbytes) { - h2_task_env *env = filter->ctx; - AP_DEBUG_ASSERT(env); - if (!env->input) { + h2_task *task = filter->ctx; + AP_DEBUG_ASSERT(task); + if (!task->input) { return APR_ECONNABORTED; } - return h2_task_input_read(env->input, filter, brigade, + return h2_task_input_read(task->input, filter, brigade, mode, block, readbytes); } static apr_status_t h2_filter_stream_output(ap_filter_t* filter, apr_bucket_brigade* brigade) { - h2_task_env *env = filter->ctx; - AP_DEBUG_ASSERT(env); - if (!env->output) { + h2_task *task = filter->ctx; + AP_DEBUG_ASSERT(task); + if (!task->output) { return APR_ECONNABORTED; } - return h2_task_output_write(env->output, filter, brigade); + return h2_task_output_write(task->output, filter, brigade); } static apr_status_t h2_filter_read_response(ap_filter_t* f, apr_bucket_brigade* bb) { - h2_task_env *env = f->ctx; - AP_DEBUG_ASSERT(env); - if (!env->output || !env->output->from_h1) { + h2_task *task = f->ctx; + AP_DEBUG_ASSERT(task); + if (!task->output || !task->output->from_h1) { return APR_ECONNABORTED; } - return h2_from_h1_read_response(env->output->from_h1, f, bb); + return h2_from_h1_read_response(task->output->from_h1, f, bb); } +static apr_status_t h2_task_process_request(h2_task *task); + /******************************************************************************* * Register various hooks */ @@ -119,15 +121,15 @@ static int h2_task_pre_conn(conn_rec* c, void *arg) (void)arg; if (h2_ctx_is_task(ctx)) { - h2_task_env *env = h2_ctx_get_task(ctx); + h2_task *task = h2_ctx_get_task(ctx); ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c, "h2_h2, pre_connection, found stream task"); /* Add our own, network level in- and output filters. */ - ap_add_input_filter("H2_TO_H1", env, NULL, c); - ap_add_output_filter("H1_TO_H2", env, NULL, c); + ap_add_input_filter("H2_TO_H1", task, NULL, c); + ap_add_output_filter("H1_TO_H2", task, NULL, c); } return OK; } @@ -137,14 +139,14 @@ static int h2_task_process_conn(conn_rec* c) h2_ctx *ctx = h2_ctx_get(c); if (h2_ctx_is_task(ctx)) { - if (!ctx->task_env->serialize_headers) { + if (!ctx->task->serialize_headers) { ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c, "h2_h2, processing request directly"); - h2_task_process_request(ctx->task_env); + h2_task_process_request(ctx->task); return DONE; } ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c, - "h2_task(%s), serialized handling", ctx->task_env->id); + "h2_task(%s), serialized handling", ctx->task->id); } return DECLINED; } @@ -200,121 +202,73 @@ apr_status_t h2_task_do(h2_task *task, h2_worker *worker) { apr_status_t status = APR_SUCCESS; h2_config *cfg = h2_config_get(task->mplx->c); - h2_task_env env; AP_DEBUG_ASSERT(task); - memset(&env, 0, sizeof(env)); - - env.id = task->id; - env.stream_id = task->stream_id; - env.mplx = task->mplx; - task->mplx = NULL; - - env.input_eos = task->input_eos; - env.serialize_headers = h2_config_geti(cfg, H2_CONF_SER_HEADERS); + task->serialize_headers = h2_config_geti(cfg, H2_CONF_SER_HEADERS); /* Create a subpool from the worker one to be used for all things - * with life-time of this task_env execution. + * with life-time of this task execution. */ - apr_pool_create(&env.pool, h2_worker_get_pool(worker)); + apr_pool_create(&task->pool, h2_worker_get_pool(worker)); - /* Link the env to the worker which provides useful things such + /* Link the task to the worker which provides useful things such * as mutex, a socket etc. */ - env.io = h2_worker_get_cond(worker); + task->io = h2_worker_get_cond(worker); - /* Clone fields, so that lifetimes become (more) independent. */ - env.method = apr_pstrdup(env.pool, task->method); - env.scheme = apr_pstrdup(env.pool, task->scheme); - env.authority = apr_pstrdup(env.pool, task->authority); - env.path = apr_pstrdup(env.pool, task->path); - env.headers = apr_table_clone(env.pool, task->headers); - - /* Setup the pseudo connection to use our own pool and bucket_alloc */ - env.c = *task->c; - task->c = NULL; - status = h2_conn_setup(&env, worker); + status = h2_conn_setup(task, worker); /* save in connection that this one is a pseudo connection, prevents * other hooks from messing with it. */ - h2_ctx_create_for(&env.c, &env); + h2_ctx_create_for(task->c, task); if (status == APR_SUCCESS) { - env.input = h2_task_input_create(&env, env.pool, - env.c.bucket_alloc); - env.output = h2_task_output_create(&env, env.pool, - env.c.bucket_alloc); - status = h2_conn_process(&env.c, h2_worker_get_socket(worker)); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, &env.c, - "h2_task(%s): processing done", env.id); + task->input = h2_task_input_create(task, task->pool, + task->c->bucket_alloc); + task->output = h2_task_output_create(task, task->pool, + task->c->bucket_alloc); + status = h2_conn_process(task->c, h2_worker_get_socket(worker)); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, task->c, + "h2_task(%s): processing done", task->id); } else { - ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, &env.c, - APLOGNO(02957) "h2_task(%s): error setting up h2_task_env", - env.id); + ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, task->c, + APLOGNO(02957) "h2_task(%s): error setting up h2_task", + task->id); } - if (env.input) { - h2_task_input_destroy(env.input); - env.input = NULL; + if (task->input) { + h2_task_input_destroy(task->input); + task->input = NULL; } - if (env.output) { - h2_task_output_close(env.output); - h2_task_output_destroy(env.output); - env.output = NULL; + if (task->output) { + h2_task_output_close(task->output); + h2_task_output_destroy(task->output); + task->output = NULL; } - h2_task_set_finished(task); - if (env.io) { - apr_thread_cond_signal(env.io); + if (task->io) { + apr_thread_cond_signal(task->io); } - if (env.pool) { - apr_pool_destroy(env.pool); - env.pool = NULL; + if (task->pool) { + apr_pool_destroy(task->pool); + task->pool = NULL; } - if (env.c.id) { - h2_conn_post(&env.c, worker); + if (task->c->id) { + h2_conn_post(task->c, worker); } - h2_mplx_task_done(env.mplx, env.stream_id); + h2_mplx_task_done(task->mplx, task->stream_id); return status; } -int h2_task_has_started(h2_task *task) -{ - AP_DEBUG_ASSERT(task); - return apr_atomic_read32(&task->has_started); -} - -void h2_task_set_started(h2_task *task) -{ - AP_DEBUG_ASSERT(task); - apr_atomic_set32(&task->has_started, 1); -} - -int h2_task_has_finished(h2_task *task) -{ - return apr_atomic_read32(&task->has_finished); -} - -void h2_task_set_finished(h2_task *task) -{ - apr_atomic_set32(&task->has_finished, 1); -} - -void h2_task_die(h2_task_env *env, int status, request_rec *r) -{ - (void)env; - ap_die(status, r); -} - -static request_rec *h2_task_create_request(h2_task_env *env) +static request_rec *h2_task_create_request(h2_task *task) { - conn_rec *conn = &env->c; + conn_rec *conn = task->c; request_rec *r; apr_pool_t *p; int access_status = HTTP_OK; @@ -332,7 +286,7 @@ static request_rec *h2_task_create_request(h2_task_env *env) r->allowed_methods = ap_make_method_list(p, 2); - r->headers_in = apr_table_copy(r->pool, env->headers); + r->headers_in = apr_table_copy(r->pool, task->headers); r->trailers_in = apr_table_make(r->pool, 5); r->subprocess_env = apr_table_make(r->pool, 25); r->headers_out = apr_table_make(r->pool, 12); @@ -371,19 +325,19 @@ static request_rec *h2_task_create_request(h2_task_env *env) /* Time to populate r with the data we have. */ r->request_time = apr_time_now(); - r->method = env->method; + r->method = task->method; /* Provide quick information about the request method as soon as known */ r->method_number = ap_method_number_of(r->method); if (r->method_number == M_GET && r->method[0] == 'H') { r->header_only = 1; } - ap_parse_uri(r, env->path); + ap_parse_uri(r, task->path); r->protocol = (char*)"HTTP/2"; r->proto_num = HTTP_VERSION(2, 0); r->the_request = apr_psprintf(r->pool, "%s %s %s", - r->method, env->path, r->protocol); + r->method, task->path, r->protocol); /* update what we think the virtual host is based on the headers we've * now read. may update status. @@ -410,7 +364,7 @@ static request_rec *h2_task_create_request(h2_task_env *env) /* Request check post hooks failed. An example of this would be a * request for a vhost where h2 is disabled --> 421. */ - h2_task_die(env, access_status, r); + ap_die(access_status, r); ap_update_child_status(conn->sbh, SERVER_BUSY_LOG, r); ap_run_log_transaction(r); r = NULL; @@ -427,13 +381,13 @@ traceout: } -apr_status_t h2_task_process_request(h2_task_env *env) +static apr_status_t h2_task_process_request(h2_task *task) { - conn_rec *c = &env->c; + conn_rec *c = task->c; request_rec *r; conn_state_t *cs = c->cs; - r = h2_task_create_request(env); + r = h2_task_create_request(task); if (r && (r->status == HTTP_OK)) { ap_update_child_status(c->sbh, SERVER_BUSY_READ, r); diff --git a/modules/http2/h2_task.h b/modules/http2/h2_task.h index 4f914c4543..1877a3920b 100644 --- a/modules/http2/h2_task.h +++ b/modules/http2/h2_task.h @@ -49,9 +49,6 @@ struct h2_task { int stream_id; struct h2_mplx *mplx; - volatile apr_uint32_t has_started; - volatile apr_uint32_t has_finished; - const char *method; const char *scheme; const char *authority; @@ -59,29 +56,12 @@ struct h2_task { apr_table_t *headers; int input_eos; - struct conn_rec *c; -}; + int serialize_headers; -typedef struct h2_task_env h2_task_env; + struct conn_rec *c; -struct h2_task_env { - const char *id; - int stream_id; - struct h2_mplx *mplx; - apr_pool_t *pool; /* pool for task lifetime things */ apr_bucket_alloc_t *bucket_alloc; - - const char *method; - const char *scheme; - const char *authority; - const char *path; - apr_table_t *headers; - int input_eos; - - int serialize_headers; - - struct conn_rec c; struct h2_task_input *input; struct h2_task_output *output; @@ -103,14 +83,7 @@ void h2_task_set_request(h2_task *task, apr_status_t h2_task_do(h2_task *task, struct h2_worker *worker); -apr_status_t h2_task_process_request(h2_task_env *env); - -int h2_task_has_started(h2_task *task); -void h2_task_set_started(h2_task *task); -int h2_task_has_finished(h2_task *task); -void h2_task_set_finished(h2_task *task); void h2_task_register_hooks(void); -void h2_task_die(h2_task_env *env, int status, request_rec *r); #endif /* defined(__mod_h2__h2_task__) */ diff --git a/modules/http2/h2_task_input.c b/modules/http2/h2_task_input.c index cd6392e785..1eac749f42 100644 --- a/modules/http2/h2_task_input.c +++ b/modules/http2/h2_task_input.c @@ -42,28 +42,28 @@ static int ser_header(void *ctx, const char *name, const char *value) return 1; } -h2_task_input *h2_task_input_create(h2_task_env *env, apr_pool_t *pool, +h2_task_input *h2_task_input_create(h2_task *task, apr_pool_t *pool, apr_bucket_alloc_t *bucket_alloc) { h2_task_input *input = apr_pcalloc(pool, sizeof(h2_task_input)); if (input) { - input->env = env; + input->task = task; input->bb = NULL; - if (env->serialize_headers) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, &env->c, + if (task->serialize_headers) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c, "h2_task_input(%s): serialize request %s %s", - env->id, env->method, env->path); + task->id, task->method, task->path); input->bb = apr_brigade_create(pool, bucket_alloc); apr_brigade_printf(input->bb, NULL, NULL, "%s %s HTTP/1.1\r\n", - env->method, env->path); - apr_table_do(ser_header, input, env->headers, NULL); + task->method, task->path); + apr_table_do(ser_header, input, task->headers, NULL); apr_brigade_puts(input->bb, NULL, NULL, "\r\n"); - if (input->env->input_eos) { + if (input->task->input_eos) { APR_BRIGADE_INSERT_TAIL(input->bb, apr_bucket_eos_create(bucket_alloc)); } } - else if (!input->env->input_eos) { + else if (!input->task->input_eos) { input->bb = apr_brigade_create(pool, bucket_alloc); } else { @@ -71,7 +71,7 @@ h2_task_input *h2_task_input_create(h2_task_env *env, apr_pool_t *pool, * create a bucket brigade. */ } - if (APLOGcdebug(&env->c)) { + if (APLOGcdebug(task->c)) { char buffer[1024]; apr_size_t len = sizeof(buffer)-1; if (input->bb) { @@ -81,9 +81,9 @@ h2_task_input *h2_task_input_create(h2_task_env *env, apr_pool_t *pool, len = 0; } buffer[len] = 0; - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, &env->c, + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->c, "h2_task_input(%s): request is: %s", - env->id, buffer); + task->id, buffer); } } return input; @@ -106,12 +106,12 @@ apr_status_t h2_task_input_read(h2_task_input *input, ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c, "h2_task_input(%s): read, block=%d, mode=%d, readbytes=%ld", - input->env->id, block, mode, (long)readbytes); + input->task->id, block, mode, (long)readbytes); if (is_aborted(f)) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, "h2_task_input(%s): is aborted", - input->env->id); + input->task->id); return APR_ECONNABORTED; } @@ -124,12 +124,12 @@ apr_status_t h2_task_input_read(h2_task_input *input, if (status != APR_SUCCESS) { ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, f->c, APLOGNO(02958) "h2_task_input(%s): brigade length fail", - input->env->id); + input->task->id); return status; } } - if ((bblen == 0) && input->env->input_eos) { + if ((bblen == 0) && input->task->input_eos) { return APR_EOF; } @@ -139,19 +139,19 @@ apr_status_t h2_task_input_read(h2_task_input *input, ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, "h2_task_input(%s): get more data from mplx, block=%d, " "readbytes=%ld, queued=%ld", - input->env->id, block, + input->task->id, block, (long)readbytes, (long)bblen); /* Although we sometimes get called with APR_NONBLOCK_READs, we seem to fill our buffer blocking. Otherwise we get EAGAIN, return that to our caller and everyone throws up their hands, never calling us again. */ - status = h2_mplx_in_read(input->env->mplx, APR_BLOCK_READ, - input->env->stream_id, input->bb, - input->env->io); + status = h2_mplx_in_read(input->task->mplx, APR_BLOCK_READ, + input->task->stream_id, input->bb, + input->task->io); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, "h2_task_input(%s): mplx in read returned", - input->env->id); + input->task->id); if (status != APR_SUCCESS) { return status; } @@ -164,13 +164,13 @@ apr_status_t h2_task_input_read(h2_task_input *input, } ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, "h2_task_input(%s): mplx in read, %ld bytes in brigade", - input->env->id, (long)bblen); + input->task->id, (long)bblen); } ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, "h2_task_input(%s): read, mode=%d, block=%d, " "readbytes=%ld, queued=%ld", - input->env->id, mode, block, + input->task->id, mode, block, (long)readbytes, (long)bblen); if (!APR_BRIGADE_EMPTY(input->bb)) { @@ -199,7 +199,7 @@ apr_status_t h2_task_input_read(h2_task_input *input, buffer[len] = 0; ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, "h2_task_input(%s): getline: %s", - input->env->id, buffer); + input->task->id, buffer); } return status; } diff --git a/modules/http2/h2_task_input.h b/modules/http2/h2_task_input.h index 32adc1770d..ed0a99faba 100644 --- a/modules/http2/h2_task_input.h +++ b/modules/http2/h2_task_input.h @@ -22,16 +22,16 @@ */ struct apr_thread_cond_t; struct h2_mplx; -struct h2_task_env; +struct h2_task; typedef struct h2_task_input h2_task_input; struct h2_task_input { - struct h2_task_env *env; + struct h2_task *task; apr_bucket_brigade *bb; }; -h2_task_input *h2_task_input_create(struct h2_task_env *env, apr_pool_t *pool, +h2_task_input *h2_task_input_create(struct h2_task *task, apr_pool_t *pool, apr_bucket_alloc_t *bucket_alloc); void h2_task_input_destroy(h2_task_input *input); diff --git a/modules/http2/h2_task_output.c b/modules/http2/h2_task_output.c index 879cb5fa21..053e2d69e4 100644 --- a/modules/http2/h2_task_output.c +++ b/modules/http2/h2_task_output.c @@ -33,16 +33,16 @@ #include "h2_util.h" -h2_task_output *h2_task_output_create(h2_task_env *env, apr_pool_t *pool, +h2_task_output *h2_task_output_create(h2_task *task, apr_pool_t *pool, apr_bucket_alloc_t *bucket_alloc) { h2_task_output *output = apr_pcalloc(pool, sizeof(h2_task_output)); (void)bucket_alloc; if (output) { - output->env = env; + output->task = task; output->state = H2_TASK_OUT_INIT; - output->from_h1 = h2_from_h1_create(env->stream_id, pool); + output->from_h1 = h2_from_h1_create(task->stream_id, pool); if (!output->from_h1) { return NULL; } @@ -73,18 +73,18 @@ static apr_status_t open_if_needed(h2_task_output *output, ap_filter_t *f, ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c, "h2_task_output(%s): write without response " "for %s %s %s", - output->env->id, output->env->method, - output->env->authority, output->env->path); + output->task->id, output->task->method, + output->task->authority, output->task->path); f->c->aborted = 1; } - if (output->env->io) { - apr_thread_cond_broadcast(output->env->io); + if (output->task->io) { + apr_thread_cond_broadcast(output->task->io); } return APR_ECONNABORTED; } - return h2_mplx_out_open(output->env->mplx, output->env->stream_id, - response, f, bb, output->env->io); + return h2_mplx_out_open(output->task->mplx, output->task->stream_id, + response, f, bb, output->task->io); } return APR_EOF; } @@ -93,7 +93,7 @@ void h2_task_output_close(h2_task_output *output) { open_if_needed(output, NULL, NULL); if (output->state != H2_TASK_OUT_DONE) { - h2_mplx_out_close(output->env->mplx, output->env->stream_id); + h2_mplx_out_close(output->task->mplx, output->task->stream_id); output->state = H2_TASK_OUT_DONE; } } @@ -113,7 +113,7 @@ apr_status_t h2_task_output_write(h2_task_output *output, apr_status_t status; if (APR_BRIGADE_EMPTY(bb)) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, - "h2_task_output(%s): empty write", output->env->id); + "h2_task_output(%s): empty write", output->task->id); return APR_SUCCESS; } @@ -121,12 +121,12 @@ apr_status_t h2_task_output_write(h2_task_output *output, if (status != APR_EOF) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, "h2_task_output(%s): opened and passed brigade", - output->env->id); + output->task->id); return status; } ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, - "h2_task_output(%s): write brigade", output->env->id); - return h2_mplx_out_write(output->env->mplx, output->env->stream_id, - f, bb, output->env->io); + "h2_task_output(%s): write brigade", output->task->id); + return h2_mplx_out_write(output->task->mplx, output->task->stream_id, + f, bb, output->task->io); } diff --git a/modules/http2/h2_task_output.h b/modules/http2/h2_task_output.h index 86571a1e10..79cb6816c7 100644 --- a/modules/http2/h2_task_output.h +++ b/modules/http2/h2_task_output.h @@ -23,7 +23,7 @@ */ struct apr_thread_cond_t; struct h2_mplx; -struct h2_task_env; +struct h2_task; struct h2_from_h1; typedef enum { @@ -35,12 +35,12 @@ typedef enum { typedef struct h2_task_output h2_task_output; struct h2_task_output { - struct h2_task_env *env; + struct h2_task *task; h2_task_output_state_t state; struct h2_from_h1 *from_h1; }; -h2_task_output *h2_task_output_create(struct h2_task_env *env, apr_pool_t *pool, +h2_task_output *h2_task_output_create(struct h2_task *task, apr_pool_t *pool, apr_bucket_alloc_t *bucket_alloc); void h2_task_output_destroy(h2_task_output *output); diff --git a/modules/http2/h2_to_h1.c b/modules/http2/h2_to_h1.c index 8dacfe801f..159fde31dd 100644 --- a/modules/http2/h2_to_h1.c +++ b/modules/http2/h2_to_h1.c @@ -156,7 +156,7 @@ apr_status_t h2_to_h1_add_headers(h2_to_h1 *to_h1, apr_table_t *headers) return APR_SUCCESS; } -apr_status_t h2_to_h1_end_headers(h2_to_h1 *to_h1, h2_task *task, int eos) +apr_status_t h2_to_h1_end_headers(h2_to_h1 *to_h1, int eos) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, to_h1->m->c, "h2_to_h1(%ld-%d): end headers", @@ -189,23 +189,8 @@ apr_status_t h2_to_h1_end_headers(h2_to_h1 *to_h1, h2_task *task, int eos) apr_table_mergen(to_h1->headers, "Transfer-Encoding", "chunked"); } - h2_task_set_request(task, to_h1->method, - to_h1->scheme, - to_h1->authority, - to_h1->path, - to_h1->headers, eos); to_h1->eoh = 1; - if (eos) { - apr_status_t status = h2_to_h1_close(to_h1); - if (status != APR_SUCCESS) { - ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, to_h1->m->c, - APLOGNO(02960) - "h2_to_h1(%ld-%d): end headers, eos=%d", - to_h1->m->id, to_h1->stream_id, eos); - } - return status; - } return APR_SUCCESS; } diff --git a/modules/http2/h2_to_h1.h b/modules/http2/h2_to_h1.h index 74586e2b7e..6fc06fbf87 100644 --- a/modules/http2/h2_to_h1.h +++ b/modules/http2/h2_to_h1.h @@ -68,8 +68,7 @@ apr_status_t h2_to_h1_add_headers(h2_to_h1 *to_h1, apr_table_t *headers); /** End the request headers. */ -apr_status_t h2_to_h1_end_headers(h2_to_h1 *to_h1, - struct h2_task *task, int eos); +apr_status_t h2_to_h1_end_headers(h2_to_h1 *to_h1, int eos); /* Add request body data. */ diff --git a/modules/http2/h2_util.c b/modules/http2/h2_util.c index 0b4a6f446e..2713aeb2da 100644 --- a/modules/http2/h2_util.c +++ b/modules/http2/h2_util.c @@ -758,20 +758,20 @@ AP_DECLARE(apr_status_t) h2_transfer_brigade(apr_bucket_brigade *to, } rv = apr_bucket_setaside(e, p); - + /* If the bucket type does not implement setaside, then * (hopefully) morph it into a bucket type which does, and set * *that* aside... */ if (rv == APR_ENOTIMPL) { const char *s; apr_size_t n; - + rv = apr_bucket_read(e, &s, &n, APR_BLOCK_READ); if (rv == APR_SUCCESS) { rv = apr_bucket_setaside(e, p); } } - + if (rv != APR_SUCCESS) { /* Return an error but still save the brigade if * ->setaside() is really not implemented. */ -- 2.50.0