-*- coding: utf-8 -*-
Changes with Apache 2.5.0
+ *) mod_http2: stream timeouts now change to vhost values once the request
+ is parsed and processing starts. Initial values are taken from base
+ server or SNI host as before. [Stefan Eissing]
+
*) mod_proxy_http2: fixed retry behaviour when frontend connection uses
http/1.1. [Stefan Eissing]
apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *pool,
int id, const char *tag,
h2_beam_owner_t owner,
- apr_size_t max_buf_size)
+ apr_size_t max_buf_size,
+ apr_interval_time_t timeout)
{
h2_bucket_beam *beam;
apr_status_t status = APR_SUCCESS;
H2_BPROXY_LIST_INIT(&beam->proxies);
beam->tx_mem_limits = 1;
beam->max_buf_size = max_buf_size;
+ beam->timeout = timeout;
status = apr_thread_mutex_create(&beam->lock, APR_THREAD_MUTEX_DEFAULT,
pool);
bb->p, bb->bucket_alloc);
}
}
+ else if (bsender->length == 0) {
+ APR_BUCKET_REMOVE(bsender);
+ H2_BLIST_INSERT_TAIL(&beam->hold_list, bsender);
+ continue;
+ }
else if (APR_BUCKET_IS_FILE(bsender)) {
/* This is set aside into the target brigade pool so that
* any read operation messes with that pool and not
* the pool owner is using this beam for sending or receiving
* @param buffer_size maximum memory footprint of buckets buffered in beam, or
* 0 for no limitation
+ * @param timeout timeout for blocking operations
*/
apr_status_t h2_beam_create(h2_bucket_beam **pbeam,
apr_pool_t *pool,
int id, const char *tag,
h2_beam_owner_t owner,
- apr_size_t buffer_size);
+ apr_size_t buffer_size,
+ apr_interval_time_t timeout);
/**
* Destroys the beam immediately without cleanup.
}
}
+static void check_data_for(h2_mplx *m, int stream_id);
+
static void stream_output_consumed(void *ctx,
h2_bucket_beam *beam, apr_off_t length)
{
static void stream_input_ev(void *ctx, h2_bucket_beam *beam)
{
h2_mplx *m = ctx;
- apr_atomic_set32(&m->event_pending, 1);
+ apr_atomic_set32(&m->event_pending, 1);
}
static void stream_input_consumed(void *ctx,
return 0;
}
-static void have_out_data_for(h2_mplx *m, h2_stream *stream);
-
static void check_tx_reservation(h2_mplx *m)
{
if (m->tx_handles_reserved <= 0) {
*/
h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
const h2_config *conf,
- apr_interval_time_t stream_timeout,
h2_workers *workers)
{
apr_status_t status = APR_SUCCESS;
m->q = h2_iq_create(m->pool, m->max_streams);
m->readyq = h2_iq_create(m->pool, m->max_streams);
- m->stream_timeout = stream_timeout;
m->workers = workers;
m->workers_max = workers->max_workers;
m->workers_limit = 6; /* the original h1 max parallel connections */
static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
{
h2_mplx *m = ctx;
- apr_status_t status;
- h2_stream *stream;
int acquired;
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- stream = h2_ihash_get(m->streams, beam->id);
- if (stream) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
- "h2_mplx(%s): output_produced", stream->task->id);
- have_out_data_for(m, stream);
- }
+ if (enter_mutex(m, &acquired) == APR_SUCCESS) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%ld-%d): output_produced", m->c->id, beam->id);
+ check_data_for(m, beam->id);
leave_mutex(m, acquired);
}
}
/* we might see some file buckets in the output, see
* if we have enough handles reserved. */
check_tx_reservation(m);
- have_out_data_for(m, stream);
+ check_data_for(m, stream->id);
return status;
}
status = h2_beam_close(task->output.beam);
h2_beam_log(task->output.beam, m->c, APLOG_TRACE2, "out_close");
output_consumed_signal(m, task);
- have_out_data_for(m, stream);
+ check_data_for(m, task->stream_id);
return status;
}
return status;
}
-static void have_out_data_for(h2_mplx *m, h2_stream *stream)
+static void check_data_for(h2_mplx *m, int stream_id)
{
ap_assert(m);
- ap_assert(stream);
- h2_iq_append(m->readyq, stream->id);
+ h2_iq_append(m->readyq, stream_id);
apr_atomic_set32(&m->event_pending, 1);
if (m->added_output) {
apr_thread_cond_signal(m->added_output);
h2_ihash_add(m->streams, stream);
if (h2_stream_is_ready(stream)) {
/* already have a response */
- apr_atomic_set32(&m->event_pending, 1);
- h2_iq_append(m->readyq, stream->id);
+ check_data_for(m, stream->id);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
H2_STRM_MSG(stream, "process, add to readyq"));
}
}
if (stream->input) {
- h2_beam_timeout_set(stream->input, m->stream_timeout);
h2_beam_on_consumed(stream->input, stream_input_ev,
stream_input_consumed, m);
h2_beam_on_file_beam(stream->input, can_beam_file, m);
}
h2_beam_buffer_size_set(stream->output, m->stream_max_mem);
- h2_beam_timeout_set(stream->output, m->stream_timeout);
}
stream->task->worker_started = 1;
stream->task->started_at = apr_time_now();
h2_beam_leave(stream->input);
}
h2_beam_mutex_disable(stream->output);
- have_out_data_for(m, stream);
+ check_data_for(m, stream->id);
}
else if ((stream = h2_ihash_get(m->shold, task->stream_id)) != NULL) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
stream_iter_ctx *ctx = data;
h2_stream *stream = val;
if (stream->task && !stream->task->worker_done
- && (ctx->now - stream->task->started_at) > ctx->m->stream_timeout) {
+ && (ctx->now - stream->task->started_at) > stream->task->timeout) {
/* timed out stream occupying a worker, found */
ctx->stream = stream;
return 0;
h2_stream *stream;
size_t i, n;
- if (!h2_mplx_has_master_events(m)) {
- return APR_EAGAIN;
- }
-
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_mplx(%ld): dispatch events", m->id);
int acquired;
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_stream *s = h2_ihash_get(m->streams, stream_id);
- if (s) {
- h2_iq_append(m->readyq, stream_id);
- apr_atomic_set32(&m->event_pending, 1);
- }
+ check_data_for(m, stream_id);
leave_mutex(m, acquired);
}
return status;
struct apr_thread_cond_t *join_wait;
apr_size_t stream_max_mem;
- apr_interval_time_t stream_timeout;
apr_pool_t *spare_io_pool;
apr_array_header_t *spare_slaves; /* spare slave connections */
*/
h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *master,
const struct h2_config *conf,
- apr_interval_time_t stream_timeout,
struct h2_workers *workers);
/**
#include "h2_private.h"
#include "h2.h"
+#include "h2_bucket_beam.h"
#include "h2_bucket_eos.h"
#include "h2_config.h"
#include "h2_ctx.h"
static void update_window(void *ctx, int stream_id, apr_off_t bytes_read)
{
h2_session *session = (h2_session*)ctx;
- nghttp2_session_consume(session->ngh2, stream_id, bytes_read);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
- "h2_stream(%ld-%d): consumed %ld bytes",
- session->id, stream_id, (long)bytes_read);
+ while (bytes_read > 0) {
+ int len = (bytes_read > INT_MAX)? INT_MAX : bytes_read;
+ nghttp2_session_consume(session->ngh2, stream_id, (int)bytes_read);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ "h2_stream(%ld-%d): consumed %d bytes",
+ session->id, stream_id, len);
+ bytes_read -= len;
+ }
}
static apr_status_t h2_session_receive(void *ctx,
session->monitor->on_state_event = on_stream_state_event;
session->mplx = h2_mplx_create(c, session->pool, session->config,
- session->s->timeout, workers);
+ workers);
h2_mplx_set_consumed_cb(session->mplx, update_window, session);
ap_assert(stream);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
H2_STRM_MSG(stream, "on_resume"));
-
+
send_headers:
headers = NULL;
status = h2_stream_out_prepare(stream, &len, &eos, &headers);
static apr_status_t setup_input(h2_stream *stream) {
if (stream->input == NULL && !stream->input_eof) {
h2_beam_create(&stream->input, stream->pool, stream->id,
- "input", H2_BEAM_OWNER_SEND, 0);
+ "input", H2_BEAM_OWNER_SEND, 0,
+ stream->session->s->timeout);
h2_beam_send_from(stream->input, stream->pool);
}
return APR_SUCCESS;
stream->monitor = monitor;
stream->max_mem = session->max_stream_mem;
- h2_beam_create(&stream->output, pool, id, "output", H2_BEAM_OWNER_RECV, 0);
+ h2_beam_create(&stream->output, pool, id, "output", H2_BEAM_OWNER_RECV, 0,
+ session->s->timeout);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
H2_STRM_LOG(APLOGNO(03082), stream, "created"));
task->request = stream->request;
task->input.beam = stream->input;
task->output.beam = stream->output;
+ task->timeout = stream->session->s->timeout;
h2_beam_send_from(stream->output, task->pool);
h2_ctx_create_for(slave, task);
"h2_task(%s): create request_rec", task->id);
r = h2_request_create_rec(req, c);
if (r && (r->status == HTTP_OK)) {
+ /* set timeouts for virtual host of request */
+ if (task->timeout != r->server->timeout) {
+ task->timeout = r->server->timeout;
+ h2_beam_timeout_set(task->output.beam, task->timeout);
+ if (task->input.beam) {
+ h2_beam_timeout_set(task->input.beam, task->timeout);
+ }
+ }
+
ap_update_child_status(c->sbh, SERVER_BUSY_WRITE, r);
if (cs) {
apr_pool_t *pool;
const struct h2_request *request;
+ apr_interval_time_t timeout;
int rst_error; /* h2 related stream abort error */
struct {