-*- coding: utf-8 -*-
Changes with Apache 2.5.0
+ *) mod_http2: merge of some 2.4.x adaptions re filters on slave connections.
+ Small fixes in bucket beams when forwarding file buckets. Output handling
+ on master connection uses less FLUSH and passes automatically when more
+ than half of H2StreamMaxMemSize bytes have accumulated.
+ Workaround for http: when forwarding partial file buckets to keep the
+ output filter from closing these too early.
+
*) mod_http2: elimination of fixed master connectin buffer for TLS
connections. New scratch bucket handling optimized for TLS write sizes.
File bucket data read directly into scratch buffers, avoiding one
static apr_status_t enter_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl)
{
- if (beam->m_enter) {
- return beam->m_enter(beam->m_ctx, pbl);
+ h2_beam_mutex_enter *enter = beam->m_enter;
+ if (enter) {
+ void *ctx = beam->m_ctx;
+ if (ctx) {
+ return enter(ctx, pbl);
+ }
}
pbl->mutex = NULL;
pbl->leave = NULL;
#endif
remain -= bred->length;
++transferred;
+ APR_BUCKET_REMOVE(bred);
+ H2_BLIST_INSERT_TAIL(&beam->hold, bred);
+ ++transferred;
+ continue;
}
else {
/* create a "green" standin bucket. we took care about the
* which seems to create less TCP packets overall
*/
#define WRITE_SIZE_MAX (TLS_DATA_MAX - 100)
-#define WRITE_BUFFER_SIZE (5*WRITE_SIZE_MAX)
static void h2_conn_io_bb_log(conn_rec *c, int stream_id, int level,
io->output = apr_brigade_create(c->pool, c->bucket_alloc);
io->is_tls = h2_h2_is_tls(c);
io->buffer_output = io->is_tls;
+ io->pass_threshold = h2_config_geti64(cfg, H2_CONF_STREAM_MAX_MEM) / 2;
if (io->is_tls) {
/* This is what we start with,
return status;
}
-int h2_conn_io_is_buffered(h2_conn_io *io)
-{
- return io->buffer_output;
-}
-
-typedef struct {
- conn_rec *c;
- h2_conn_io *io;
-} pass_out_ctx;
-
-static apr_status_t pass_out(apr_bucket_brigade *bb, void *ctx)
-{
- pass_out_ctx *pctx = ctx;
- conn_rec *c = pctx->c;
- apr_status_t status;
- apr_off_t bblen;
-
- if (APR_BRIGADE_EMPTY(bb)) {
- return APR_SUCCESS;
- }
-
- ap_update_child_status(c->sbh, SERVER_BUSY_WRITE, NULL);
- apr_brigade_length(bb, 0, &bblen);
- h2_conn_io_bb_log(c, 0, APLOG_TRACE2, "master conn pass", bb);
- status = ap_pass_brigade(c->output_filters, bb);
- if (status == APR_SUCCESS && pctx->io) {
- pctx->io->bytes_written += (apr_size_t)bblen;
- pctx->io->last_write = apr_time_now();
- }
- if (status != APR_SUCCESS) {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c, APLOGNO(03044)
- "h2_conn_io(%ld): pass_out brigade %ld bytes",
- c->id, (long)bblen);
- }
- apr_brigade_cleanup(bb);
- return status;
-}
-
static void check_write_size(h2_conn_io *io)
{
if (io->write_size > WRITE_SIZE_INITIAL
}
}
-static apr_status_t h2_conn_io_flush_int(h2_conn_io *io, int flush, int eoc)
+static apr_status_t pass_output(h2_conn_io *io, int flush, int eoc)
{
- pass_out_ctx ctx;
+ conn_rec *c = io->c;
apr_bucket *b;
+ apr_off_t bblen;
+ apr_status_t status;
append_scratch(io);
- if (APR_BRIGADE_EMPTY(io->output)) {
- return APR_SUCCESS;
- }
-
if (flush) {
- b = apr_bucket_flush_create(io->c->bucket_alloc);
+ b = apr_bucket_flush_create(c->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(io->output, b);
}
- ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c, "h2_conn_io: flush");
- ctx.c = io->c;
- ctx.io = eoc? NULL : io;
+ if (APR_BRIGADE_EMPTY(io->output)) {
+ return APR_SUCCESS;
+ }
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, c, "h2_conn_io: pass_output");
+ ap_update_child_status(c->sbh, SERVER_BUSY_WRITE, NULL);
+ apr_brigade_length(io->output, 0, &bblen);
- return pass_out(io->output, &ctx);
- /* no more access after this, as we might have flushed an EOC bucket
+ h2_conn_io_bb_log(c, 0, APLOG_TRACE2, "master conn pass", io->output);
+ status = ap_pass_brigade(c->output_filters, io->output);
+
+ /* careful with access after this, as we might have flushed an EOC bucket
* that de-allocated us all. */
+ if (!eoc) {
+ apr_brigade_cleanup(io->output);
+ if (status == APR_SUCCESS) {
+ io->bytes_written += (apr_size_t)bblen;
+ io->last_write = apr_time_now();
+ }
+ }
+
+ if (status != APR_SUCCESS) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c, APLOGNO(03044)
+ "h2_conn_io(%ld): pass_out brigade %ld bytes",
+ c->id, (long)bblen);
+ }
+ return status;
}
apr_status_t h2_conn_io_flush(h2_conn_io *io)
{
- return h2_conn_io_flush_int(io, 1, 0);
-}
-
-apr_status_t h2_conn_io_consider_pass(h2_conn_io *io)
-{
- apr_off_t len = 0;
-
- if (!APR_BRIGADE_EMPTY(io->output)) {
- len = h2_brigade_mem_size(io->output);
- if (len >= WRITE_BUFFER_SIZE) {
- return h2_conn_io_flush_int(io, 1, 0);
- }
- }
- return APR_SUCCESS;
+ return pass_output(io, 1, 0);
}
apr_status_t h2_conn_io_write_eoc(h2_conn_io *io, h2_session *session)
{
apr_bucket *b = h2_bucket_eoc_create(io->c->bucket_alloc, session);
APR_BRIGADE_INSERT_TAIL(io->output, b);
- return h2_conn_io_flush_int(io, 1, 1);
+ return pass_output(io, 1, 1);
}
apr_status_t h2_conn_io_write(h2_conn_io *io, const char *data, size_t length)
append_scratch(io);
APR_BUCKET_REMOVE(b);
APR_BRIGADE_INSERT_TAIL(io->output, b);
-
- if (APR_BUCKET_IS_FLUSH(b)) {
- status = h2_conn_io_flush_int(io, 0, 0);
- }
}
else if (io->buffer_output) {
apr_size_t remain = assure_scratch_space(io);
APR_BRIGADE_INSERT_TAIL(io->output, b);
}
}
+
if (status == APR_SUCCESS) {
- return h2_conn_io_consider_pass(io);
+ if (!APR_BRIGADE_EMPTY(io->output)) {
+ apr_off_t len = h2_brigade_mem_size(io->output);
+ if (len >= io->pass_threshold) {
+ return pass_output(io, 0, 0);
+ }
+ }
}
return status;
}
apr_int64_t bytes_written;
int buffer_output;
+ apr_size_t pass_threshold;
+
char *scratch;
apr_size_t ssize;
apr_size_t slen;
apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c,
const struct h2_config *cfg);
-int h2_conn_io_is_buffered(h2_conn_io *io);
-
/**
* Append data to the buffered output.
* @param buf the data to append
*/
apr_status_t h2_conn_io_flush(h2_conn_io *io);
-/**
- * Check the amount of buffered output and pass it on if enough has accumulated.
- * @param io the connection io
- * @param flush if a flush bucket should be appended to any output
- */
-apr_status_t h2_conn_io_consider_pass(h2_conn_io *io);
-
#endif /* defined(__mod_h2__h2_conn_io__) */
/*******************************************************************************
* The optional mod_ssl functions we need.
*/
+static APR_OPTIONAL_FN_TYPE(ssl_engine_disable) *opt_ssl_engine_disable;
static APR_OPTIONAL_FN_TYPE(ssl_is_https) *opt_ssl_is_https;
static APR_OPTIONAL_FN_TYPE(ssl_var_lookup) *opt_ssl_var_lookup;
{
(void)pool;
ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, s, "h2_h2, child_init");
+ opt_ssl_engine_disable = APR_RETRIEVE_OPTIONAL_FN(ssl_engine_disable);
opt_ssl_is_https = APR_RETRIEVE_OPTIONAL_FN(ssl_is_https);
opt_ssl_var_lookup = APR_RETRIEVE_OPTIONAL_FN(ssl_var_lookup);
* This allow recursive entering of the mutex from the saem thread,
* which is what we need in certain situations involving callbacks
*/
+ AP_DEBUG_ASSERT(m);
apr_threadkey_private_get(&mutex, thread_lock);
if (mutex == m->lock) {
*pacquired = 0;
int reuse_slave = 0;
apr_status_t status;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
+ "h2_task(%s): destroy", task->id);
/* cleanup any buffered input */
status = h2_task_shutdown(task, 0);
if (status != APR_SUCCESS){
{
h2_task *task;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
+ "h2_stream(%ld-%d): done", m->c->id, stream->id);
/* Situation: we are, on the master connection, done with processing
* the stream. Either we have handled it successfully, or the stream
* was reset by the client or the connection is gone and we are
stream = h2_ihash_get(m->streams, sid);
if (stream) {
conn_rec *slave, **pslave;
+ int new_conn = 0;
pslave = (conn_rec **)apr_array_pop(m->spare_slaves);
if (pslave) {
}
else {
slave = h2_slave_create(m->c, m->pool, NULL);
- h2_slave_run_pre_connection(slave, ap_get_conn_socket(slave));
+ new_conn = 1;
}
slave->sbh = m->c->sbh;
+ slave->aborted = 0;
task = h2_task_create(slave, stream->request, stream->input, m);
h2_ihash_add(m->tasks, task);
m->c->keepalives++;
apr_table_setn(slave->notes, H2_TASK_ID_NOTE, task->id);
-
+ if (new_conn) {
+ h2_slave_run_pre_connection(slave, ap_get_conn_socket(slave));
+ }
task->worker_started = 1;
task->started_at = apr_time_now();
if (sid > m->max_stream_started) {
#include <nghttp2/nghttp2.h>
-#ifdef IS_MOD_PROXY_HTTP2
-extern module AP_MODULE_DECLARE_DATA proxy_http2_module;
-APLOG_USE_MODULE(proxy_http2);
-#else
extern module AP_MODULE_DECLARE_DATA http2_module;
+
APLOG_USE_MODULE(http2);
-#endif
#endif
apr_brigade_cleanup(session->bbtmp);
if (status == APR_SUCCESS) {
stream->data_frames_sent++;
- h2_conn_io_consider_pass(&session->io);
return 0;
}
else {
* CPU cycles. Ideally, we'd like to do a blocking read, but that
* is not possible if we have scheduled tasks and wait
* for them to produce something. */
- if (h2_conn_io_flush(&session->io) != APR_SUCCESS) {
- dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
- }
if (!session->open_streams) {
if (!is_accepting_streams(session)) {
/* We are no longer accepting new streams and have
* new output data from task processing,
* switch to blocking reads. We are probably waiting on
* window updates. */
+ if (h2_conn_io_flush(&session->io) != APR_SUCCESS) {
+ dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+ return;
+ }
transit(session, "no io", H2_SESSION_ST_IDLE);
session->idle_until = apr_time_now() + session->s->timeout;
session->keep_sync_until = session->idle_until;
session->start_wait = apr_time_now();
if (h2_conn_io_flush(&session->io) != APR_SUCCESS) {
dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+ break;
}
}
else if ((apr_time_now() - session->start_wait) >= session->s->timeout) {
session->wait_us = 0;
dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
}
- else if (status == APR_TIMEUP) {
+ else if (APR_STATUS_IS_TIMEUP(status)) {
/* go back to checking all inputs again */
transit(session, "wait cycle", session->local.accepting?
H2_SESSION_ST_BUSY : H2_SESSION_ST_LOCAL_SHUTDOWN);
}
+ else if (APR_STATUS_IS_ECONNRESET(status)
+ || APR_STATUS_IS_ECONNABORTED(status)) {
+ dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+ }
else {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, c,
"h2_session(%ld): waiting on conditional",
ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
"h2_session(%ld): [%s] process returns",
session->id, state_name(session->state));
-
+
if ((session->state != H2_SESSION_ST_DONE)
&& (APR_STATUS_IS_EOF(status)
|| APR_STATUS_IS_ECONNRESET(status)
}
}
+static apr_status_t stream_pool_cleanup(void *ctx)
+{
+ h2_stream *stream = ctx;
+ apr_status_t status;
+
+ if (stream->input) {
+ h2_beam_destroy(stream->input);
+ stream->input = NULL;
+ }
+ if (stream->files) {
+ apr_file_t *file;
+ int i;
+ for (i = 0; i < stream->files->nelts; ++i) {
+ file = APR_ARRAY_IDX(stream->files, i, apr_file_t*);
+ status = apr_file_close(file);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, stream->session->c,
+ "h2_stream(%ld-%d): destroy, closed file %d",
+ stream->session->id, stream->id, i);
+ }
+ stream->files = NULL;
+ }
+ return APR_SUCCESS;
+}
+
h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session,
int initiated_on, const h2_request *creq)
{
}
stream->request = req;
+ apr_pool_cleanup_register(pool, stream, stream_pool_cleanup,
+ apr_pool_cleanup_null);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03082)
"h2_stream(%ld-%d): opened", session->id, stream->id);
return stream;
void h2_stream_destroy(h2_stream *stream)
{
AP_DEBUG_ASSERT(stream);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, stream->session->c,
"h2_stream(%ld-%d): destroy",
stream->session->id, stream->id);
- if (stream->input) {
- h2_beam_destroy(stream->input);
- stream->input = NULL;
- }
if (stream->pool) {
apr_pool_destroy(stream->pool);
}
static apr_status_t fill_buffer(h2_stream *stream, apr_size_t amount)
{
+ conn_rec *c = stream->session->c;
+ apr_bucket *b;
+ apr_status_t status;
+
if (!stream->output) {
return APR_EOF;
}
- return h2_beam_receive(stream->output, stream->buffer,
- APR_NONBLOCK_READ, amount);
+ status = h2_beam_receive(stream->output, stream->buffer,
+ APR_NONBLOCK_READ, amount);
+ /* The buckets we reveive are using the stream->buffer pool as
+ * lifetime which is exactly what we want since this is stream->pool.
+ *
+ * However: when we send these buckets down the core output filters, the
+ * filter might decide to setaside them into a pool of its own. And it
+ * might decide, after having sent the buckets, to clear its pool.
+ *
+ * This is problematic for file buckets because it then closed the contained
+ * file. Any split off buckets we sent afterwards will result in a
+ * APR_EBADF.
+ */
+ for (b = APR_BRIGADE_FIRST(stream->buffer);
+ b != APR_BRIGADE_SENTINEL(stream->buffer);
+ b = APR_BUCKET_NEXT(b)) {
+ if (APR_BUCKET_IS_FILE(b)) {
+ apr_bucket_file *f = (apr_bucket_file *)b->data;
+ apr_pool_t *fpool = apr_file_pool_get(f->fd);
+ if (fpool != c->pool) {
+ apr_bucket_setaside(b, c->pool);
+ if (!stream->files) {
+ stream->files = apr_array_make(stream->pool,
+ 5, sizeof(apr_file_t*));
+ }
+ APR_ARRAY_PUSH(stream->files, apr_file_t*) = f->fd;
+ }
+ }
+ }
+ return status;
}
apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response,
struct h2_bucket_beam *output;
apr_bucket_brigade *buffer;
apr_bucket_brigade *tmp;
+ apr_array_header_t *files; /* apr_file_t* we collected during I/O */
int rst_error; /* stream error for RST_STREAM */
unsigned int aborted : 1; /* was aborted */
unsigned int submitted : 1; /* response HEADER has been sent */
apr_off_t input_remaining; /* remaining bytes on input as advertised via content-length */
-
apr_off_t data_frames_sent; /* # of DATA frames sent out for this stream */
};
apr_read_type_e block,
apr_off_t readbytes)
{
- h2_task *task = filter->ctx;
+ h2_task *task = h2_ctx_cget_task(filter->c);
AP_DEBUG_ASSERT(task);
return input_read(task, filter, brigade, mode, block, readbytes);
}
static apr_status_t h2_filter_stream_output(ap_filter_t* filter,
apr_bucket_brigade* brigade)
{
- h2_task *task = filter->ctx;
+ h2_task *task = h2_ctx_cget_task(filter->c);
AP_DEBUG_ASSERT(task);
return output_write(task, filter, brigade);
}
-static apr_status_t h2_filter_read_response(ap_filter_t* f,
+static apr_status_t h2_filter_read_response(ap_filter_t* filter,
apr_bucket_brigade* bb)
{
- h2_task *task = f->ctx;
+ h2_task *task = h2_ctx_cget_task(filter->c);
AP_DEBUG_ASSERT(task);
if (!task->output.from_h1) {
return APR_ECONNABORTED;
}
- return h2_from_h1_read_response(task->output.from_h1, f, bb);
+ return h2_from_h1_read_response(task->output.from_h1, filter, bb);
}
/*******************************************************************************
if (task->output.beam) {
h2_beam_abort(task->output.beam);
}
+ if (task->c) {
+ task->c->aborted = 1;
+ }
}
apr_status_t h2_task_shutdown(h2_task *task, int block)
/*******************************************************************************
* Register various hooks
*/
+static const char *const mod_ssl[] = { "mod_ssl.c", NULL};
+static int h2_task_pre_conn(conn_rec* c, void *arg);
static int h2_task_process_conn(conn_rec* c);
APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_in) *h2_task_logio_add_bytes_in;
void h2_task_register_hooks(void)
{
+ /* This hook runs on new connections before mod_ssl has a say.
+ * Its purpose is to prevent mod_ssl from touching our pseudo-connections
+ * for streams.
+ */
+ ap_hook_pre_connection(h2_task_pre_conn,
+ NULL, mod_ssl, APR_HOOK_FIRST);
/* When the connection processing actually starts, we might
* take over, if the connection is for a task.
*/
return APR_SUCCESS;
}
+static int h2_task_pre_conn(conn_rec* c, void *arg)
+{
+ h2_ctx *ctx;
+
+ if (!c->master) {
+ return OK;
+ }
+
+ ctx = h2_ctx_get(c, 0);
+ (void)arg;
+ if (h2_ctx_is_task(ctx)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c,
+ "h2_h2, pre_connection, found stream task");
+
+ /* Add our own, network level in- and output filters.
+ */
+ ap_add_input_filter("H2_TO_H1", NULL, NULL, c);
+ ap_add_output_filter("H1_TO_H2", NULL, NULL, c);
+ }
+ return OK;
+}
+
h2_task *h2_task_create(conn_rec *c, const h2_request *req,
h2_bucket_beam *input, h2_mplx *mplx)
{
apr_thread_cond_create(&task->cond, pool);
h2_ctx_create_for(c, task);
- /* Add our own, network level in- and output filters. */
- ap_add_input_filter("H2_TO_H1", task, NULL, c);
- ap_add_output_filter("H1_TO_H2", task, NULL, c);
-
return task;
}
void h2_task_destroy(h2_task *task)
{
- ap_remove_input_filter_byhandle(task->c->input_filters, "H2_TO_H1");
- ap_remove_output_filter_byhandle(task->c->output_filters, "H1_TO_H2");
if (task->output.beam) {
h2_beam_destroy(task->output.beam);
task->output.beam = NULL;
if (APR_BUCKET_IS_METADATA(e)) {
if (APR_BUCKET_IS_EOS(e)) {
*peos = 1;
+ apr_bucket_delete(e);
+ continue;
}
}
else {
apr_port_t port, def_port;
/* ap_port_of_scheme() */
- if (ap_casecmpstrn(url, "h2c:", 4) == 0) {
+ if (h2_casecmpstrn(url, "h2c:", 4) == 0) {
url += 4;
scheme = "h2c";
http_scheme = "http";
}
- else if (ap_casecmpstrn(url, "h2:", 3) == 0) {
+ else if (h2_casecmpstrn(url, "h2:", 3) == 0) {
url += 3;
scheme = "h2";
http_scheme = "https";