-*- coding: utf-8 -*-
Changes with Apache 2.5.0
+ *) mod_http2: separate mutex instances for each bucket beam, resulting in
+ less lock contention. input beams only created when necessary.
+ [Stefan Eissing]
+
*) mod_syslog: Support use of optional "tag" in syslog entries.
PR 60525. [Ben Rubson <ben.rubson gmail.com>, Jim Jagielski]
* bucket beam that can transport buckets across threads
******************************************************************************/
+static void mutex_leave(void *ctx, apr_thread_mutex_t *lock)
+{
+ apr_thread_mutex_unlock(lock);
+}
+
+static apr_status_t mutex_enter(void *ctx, h2_beam_lock *pbl)
+{
+ h2_bucket_beam *beam = ctx;
+ pbl->mutex = beam->lock;
+ pbl->leave = mutex_leave;
+ return apr_thread_mutex_lock(pbl->mutex);
+}
+
static apr_status_t enter_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl)
{
h2_beam_mutex_enter *enter = beam->m_enter;
}
}
-static int report_consumption(h2_bucket_beam *beam)
+static int report_consumption(h2_bucket_beam *beam, h2_beam_lock *pbl)
{
int rv = 0;
- if (beam->cons_io_cb) {
- beam->cons_io_cb(beam->cons_ctx, beam, beam->received_bytes
- - beam->cons_bytes_reported);
+ apr_off_t len = beam->received_bytes - beam->cons_bytes_reported;
+ h2_beam_io_callback *cb = beam->cons_io_cb;
+
+ if (cb) {
+ void *ctx = beam->cons_ctx;
+
+ if (pbl) leave_yellow(beam, pbl);
+ cb(ctx, beam, len);
+ if (pbl) enter_yellow(beam, pbl);
rv = 1;
}
- beam->cons_bytes_reported = beam->received_bytes;
+ beam->cons_bytes_reported += len;
return rv;
}
-static void report_prod_io(h2_bucket_beam *beam, int force)
+static void report_prod_io(h2_bucket_beam *beam, int force, h2_beam_lock *pbl)
{
- if (force || beam->prod_bytes_reported != beam->sent_bytes) {
- if (beam->prod_io_cb) {
- beam->prod_io_cb(beam->prod_ctx, beam, beam->sent_bytes
- - beam->prod_bytes_reported);
+ apr_off_t len = beam->sent_bytes - beam->prod_bytes_reported;
+ if (force || len > 0) {
+ h2_beam_io_callback *cb = beam->prod_io_cb;
+ if (cb) {
+ void *ctx = beam->prod_ctx;
+
+ leave_yellow(beam, pbl);
+ cb(ctx, beam, len);
+ enter_yellow(beam, pbl);
}
- beam->prod_bytes_reported = beam->sent_bytes;
+ beam->prod_bytes_reported += len;
}
}
static apr_status_t wait_cond(h2_bucket_beam *beam, apr_thread_mutex_t *lock)
{
if (beam->timeout > 0) {
- return apr_thread_cond_timedwait(beam->m_cond, lock, beam->timeout);
+ return apr_thread_cond_timedwait(beam->cond, lock, beam->timeout);
}
else {
- return apr_thread_cond_wait(beam->m_cond, lock);
+ return apr_thread_cond_wait(beam->cond, lock);
}
}
while (!beam->aborted && *premain <= 0
&& (block == APR_BLOCK_READ) && pbl->mutex) {
apr_status_t status;
- report_prod_io(beam, 1);
+ report_prod_io(beam, 1, pbl);
status = wait_cond(beam, pbl->mutex);
if (APR_STATUS_IS_TIMEUP(status)) {
return status;
if (!bl.mutex) {
r_purge_sent(beam);
}
- else if (beam->m_cond) {
- apr_thread_cond_broadcast(beam->m_cond);
+ else if (beam->cond) {
+ apr_thread_cond_broadcast(beam->cond);
}
leave_yellow(beam, &bl);
}
{
if (!beam->closed) {
beam->closed = 1;
- if (beam->m_cond) {
- apr_thread_cond_broadcast(beam->m_cond);
+ if (beam->cond) {
+ apr_thread_cond_broadcast(beam->cond);
}
}
return APR_SUCCESS;
/* sender is going away, clear up all references to its memory */
r_purge_sent(beam);
h2_blist_cleanup(&beam->send_list);
- report_consumption(beam);
+ report_consumption(beam, NULL);
while (!H2_BPROXY_LIST_EMPTY(&beam->proxies)) {
h2_beam_proxy *proxy = H2_BPROXY_LIST_FIRST(&beam->proxies);
H2_BPROXY_REMOVE(proxy);
H2_BPROXY_LIST_INIT(&beam->proxies);
beam->tx_mem_limits = 1;
beam->max_buf_size = max_buf_size;
- apr_pool_pre_cleanup_register(pool, beam, beam_cleanup);
- *pbeam = beam;
-
+ status = apr_thread_mutex_create(&beam->lock, APR_THREAD_MUTEX_DEFAULT,
+ pool);
+ if (status == APR_SUCCESS) {
+ status = apr_thread_cond_create(&beam->cond, pool);
+ if (status == APR_SUCCESS) {
+ apr_pool_pre_cleanup_register(pool, beam, beam_cleanup);
+ *pbeam = beam;
+ }
+ }
return status;
}
void h2_beam_mutex_set(h2_bucket_beam *beam,
h2_beam_mutex_enter m_enter,
- apr_thread_cond_t *cond,
void *m_ctx)
{
h2_beam_lock bl;
if (enter_yellow(beam, &bl) == APR_SUCCESS) {
beam->m_enter = m_enter;
beam->m_ctx = m_ctx;
- beam->m_cond = cond;
leave_yellow(beam, &bl);
}
}
+void h2_beam_mutex_enable(h2_bucket_beam *beam)
+{
+ h2_beam_mutex_set(beam, mutex_enter, beam);
+}
+
+void h2_beam_mutex_disable(h2_bucket_beam *beam)
+{
+ h2_beam_mutex_set(beam, NULL, NULL);
+}
+
void h2_beam_timeout_set(h2_bucket_beam *beam, apr_interval_time_t timeout)
{
h2_beam_lock bl;
beam->aborted = 1;
r_purge_sent(beam);
h2_blist_cleanup(&beam->send_list);
- report_consumption(beam);
+ report_consumption(beam, &bl);
}
- if (beam->m_cond) {
- apr_thread_cond_broadcast(beam->m_cond);
+ if (beam->cond) {
+ apr_thread_cond_broadcast(beam->cond);
}
leave_yellow(beam, &bl);
}
if (enter_yellow(beam, &bl) == APR_SUCCESS) {
r_purge_sent(beam);
beam_close(beam);
- report_consumption(beam);
+ report_consumption(beam, &bl);
leave_yellow(beam, &bl);
}
return beam->aborted? APR_ECONNABORTED : APR_SUCCESS;
status = APR_EAGAIN;
break;
}
- if (beam->m_cond) {
- apr_thread_cond_broadcast(beam->m_cond);
+ if (beam->cond) {
+ apr_thread_cond_broadcast(beam->cond);
}
status = wait_cond(beam, bl.mutex);
}
b = APR_BRIGADE_FIRST(sender_bb);
status = append_bucket(beam, b, block, &bl);
}
- report_prod_io(beam, force_report);
- if (beam->m_cond) {
- apr_thread_cond_broadcast(beam->m_cond);
+ report_prod_io(beam, force_report, &bl);
+ if (beam->cond) {
+ apr_thread_cond_broadcast(beam->cond);
}
}
- report_consumption(beam);
+ report_consumption(beam, &bl);
leave_yellow(beam, &bl);
}
return status;
}
if (transferred) {
- if (beam->m_cond) {
- apr_thread_cond_broadcast(beam->m_cond);
+ if (beam->cond) {
+ apr_thread_cond_broadcast(beam->cond);
}
status = APR_SUCCESS;
}
else if (beam->closed) {
status = APR_EOF;
}
- else if (block == APR_BLOCK_READ && bl.mutex && beam->m_cond) {
+ else if (block == APR_BLOCK_READ && bl.mutex && beam->cond) {
status = wait_cond(beam, bl.mutex);
if (status != APR_SUCCESS) {
goto leave;
goto transfer;
}
else {
- if (beam->m_cond) {
- apr_thread_cond_broadcast(beam->m_cond);
+ if (beam->cond) {
+ apr_thread_cond_broadcast(beam->cond);
}
status = APR_EAGAIN;
}
h2_beam_lock bl;
int rv = 0;
if (enter_yellow(beam, &bl) == APR_SUCCESS) {
- rv = report_consumption(beam);
+ rv = report_consumption(beam, &bl);
leave_yellow(beam, &bl);
}
return rv;
unsigned int close_sent : 1;
unsigned int tx_mem_limits : 1; /* only memory size counts on transfers */
+ struct apr_thread_mutex_t *lock;
+ struct apr_thread_cond_t *cond;
void *m_ctx;
h2_beam_mutex_enter *m_enter;
- struct apr_thread_cond_t *m_cond;
apr_off_t cons_bytes_reported; /* amount of bytes reported as consumed */
h2_beam_ev_callback *cons_ev_cb;
void h2_beam_mutex_set(h2_bucket_beam *beam,
h2_beam_mutex_enter m_enter,
- struct apr_thread_cond_t *cond,
void *m_ctx);
+void h2_beam_mutex_enable(h2_bucket_beam *beam);
+void h2_beam_mutex_disable(h2_bucket_beam *beam);
+
/**
* Set/get the timeout for blocking read/write operations. Only works
* if a mutex has been set for the beam.
}
}
-static void beam_leave(void *ctx, apr_thread_mutex_t *lock)
-{
- leave_mutex(ctx, 1);
-}
-
-static apr_status_t beam_enter(void *ctx, h2_beam_lock *pbl)
-{
- h2_mplx *m = ctx;
- int acquired;
- apr_status_t status;
-
- status = enter_mutex(m, &acquired);
- if (status == APR_SUCCESS) {
- pbl->mutex = m->lock;
- pbl->leave = acquired? beam_leave : NULL;
- pbl->leave_ctx = m;
- }
- return status;
-}
-
static void stream_output_consumed(void *ctx,
h2_bucket_beam *beam, apr_off_t length)
{
h2_stream *stream = ctx;
+ h2_mplx *m = stream->session->mplx;
h2_task *task = stream->task;
+ int acquired;
+
if (length > 0 && task && task->assigned) {
- h2_req_engine_out_consumed(task->assigned, task->c, length);
+ if (enter_mutex(m, &acquired) == APR_SUCCESS) {
+ h2_req_engine_out_consumed(task->assigned, task->c, length);
+ leave_mutex(m, acquired);
+ }
}
}
static void stream_input_consumed(void *ctx,
h2_bucket_beam *beam, apr_off_t length)
{
- h2_mplx *m = ctx;
- if (m->input_consumed && length) {
- m->input_consumed(m->input_consumed_ctx, beam->id, length);
+ if (length > 0) {
+ h2_mplx *m = ctx;
+ int acquired;
+
+ if (enter_mutex(m, &acquired) == APR_SUCCESS) {
+ if (m->input_consumed) {
+ m->input_consumed(m->input_consumed_ctx, beam->id, length);
+ }
+ leave_mutex(m, acquired);
+ }
}
}
h2_ihash_remove(m->shold, stream->id);
h2_ihash_add(m->spurge, stream);
- m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input);
+ if (stream->input) {
+ m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input);
+ }
m->tx_handles_reserved += h2_beam_get_files_beamed(stream->output);
}
static void stream_cleanup(h2_mplx *m, h2_stream *stream)
{
ap_assert(stream->state == H2_SS_CLEANUP);
-
+
+ if (stream->input) {
+ h2_beam_on_consumed(stream->input, NULL, NULL, NULL);
+ h2_beam_abort(stream->input);
+ }
h2_beam_on_produced(stream->output, NULL, NULL);
- h2_beam_on_consumed(stream->input, NULL, NULL, NULL);
- h2_beam_abort(stream->input);
h2_beam_leave(stream->output);
h2_stream_cleanup(stream);
h2_ihash_remove(m->spurge, stream->id);
ap_assert(stream->state == H2_SS_CLEANUP);
- if (stream->input == NULL || stream->output == NULL) {
+ if (stream->output == NULL) {
ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, m->c,
H2_STRM_MSG(stream, "already with beams==NULL"));
return 0;
}
- /* Process outstanding events before destruction */
- input_consumed_signal(m, stream);
- h2_beam_log(stream->input, m->c, APLOG_TRACE2, "stream_destroy");
+ if (stream->input) {
+ /* Process outstanding events before destruction */
+ input_consumed_signal(m, stream);
+ h2_beam_log(stream->input, m->c, APLOG_TRACE2, "stream_destroy");
+ h2_beam_destroy(stream->input);
+ stream->input = NULL;
+ }
+
h2_beam_log(stream->output, m->c, APLOG_TRACE2, "stream_destroy");
- h2_beam_destroy(stream->input);
- stream->input = NULL;
h2_beam_destroy(stream->output);
stream->output = NULL;
if (stream->task) {
}
/* time to protect the beam against multi-threaded use */
- h2_beam_mutex_set(stream->output, beam_enter, stream->task->cond, m);
+ h2_beam_mutex_enable(stream->output);
/* we might see some file buckets in the output, see
* if we have enough handles reserved. */
m->max_stream_started = sid;
}
- h2_beam_timeout_set(stream->input, m->stream_timeout);
- h2_beam_on_consumed(stream->input, stream_input_ev,
- stream_input_consumed, m);
- h2_beam_on_file_beam(stream->input, can_beam_file, m);
- h2_beam_mutex_set(stream->input, beam_enter, stream->task->cond, m);
+ if (stream->input) {
+ h2_beam_timeout_set(stream->input, m->stream_timeout);
+ h2_beam_on_consumed(stream->input, stream_input_ev,
+ stream_input_consumed, m);
+ h2_beam_on_file_beam(stream->input, can_beam_file, m);
+ h2_beam_mutex_enable(stream->input);
+ }
h2_beam_buffer_size_set(stream->output, m->stream_max_mem);
h2_beam_timeout_set(stream->output, m->stream_timeout);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
H2_STRM_MSG(stream, "task_done, stream open"));
/* more data will not arrive, resume the stream */
- h2_beam_mutex_set(stream->input, NULL, NULL, NULL);
- h2_beam_mutex_set(stream->output, NULL, NULL, NULL);
- h2_beam_leave(stream->input);
+ if (stream->input) {
+ h2_beam_mutex_disable(stream->input);
+ h2_beam_leave(stream->input);
+ }
+ h2_beam_mutex_disable(stream->output);
have_out_data_for(m, stream);
}
else if ((stream = h2_ihash_get(m->shold, task->stream_id)) != NULL) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
H2_STRM_MSG(stream, "task_done, in hold"));
/* stream was just waiting for us. */
- h2_beam_mutex_set(stream->input, NULL, NULL, NULL);
- h2_beam_mutex_set(stream->output, NULL, NULL, NULL);
- h2_beam_leave(stream->input);
+ if (stream->input) {
+ h2_beam_mutex_disable(stream->input);
+ h2_beam_leave(stream->input);
+ }
+ h2_beam_mutex_disable(stream->output);
stream_joined(m, stream);
}
else if ((stream = h2_ihash_get(m->spurge, task->stream_id)) != NULL) {
}
ap_assert(!stream->scheduled);
- if (stream->request) {
- const h2_request *r = stream->request;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
- H2_STRM_MSG(stream, "schedule %s %s://%s%s chunked=%d"),
- r->method, r->scheme, r->authority, r->path, r->chunked);
- stream->scheduled = 1;
+ if (h2_stream_prep_processing(stream) == APR_SUCCESS) {
h2_mplx_process(session->mplx, stream, stream_pri_cmp, session);
}
else {
}
}
+static apr_status_t setup_input(h2_stream *stream) {
+ if (stream->input == NULL && !stream->input_eof) {
+ h2_beam_create(&stream->input, stream->pool, stream->id,
+ "input", H2_BEAM_OWNER_SEND, 0);
+ h2_beam_send_from(stream->input, stream->pool);
+ }
+ return APR_SUCCESS;
+}
+
static apr_status_t close_input(h2_stream *stream)
{
conn_rec *c = stream->session->c;
- apr_status_t status;
- apr_bucket_brigade *tmp;
- apr_bucket *b;
+ apr_status_t status = APR_SUCCESS;
- if (h2_beam_is_closed(stream->input)) {
+ stream->input_eof = 1;
+ if (stream->input && h2_beam_is_closed(stream->input)) {
return APR_SUCCESS;
}
return APR_ECONNRESET;
}
- tmp = apr_brigade_create(stream->pool, c->bucket_alloc);
if (stream->trailers && !apr_is_empty_table(stream->trailers)) {
- h2_headers *r = h2_headers_create(HTTP_OK, stream->trailers,
- NULL, stream->pool);
+ apr_bucket_brigade *tmp;
+ apr_bucket *b;
+ h2_headers *r;
+
+ tmp = apr_brigade_create(stream->pool, c->bucket_alloc);
+
+ r = h2_headers_create(HTTP_OK, stream->trailers, NULL, stream->pool);
+ stream->trailers = NULL;
b = h2_bucket_headers_create(c->bucket_alloc, r);
APR_BRIGADE_INSERT_TAIL(tmp, b);
- stream->trailers = NULL;
+
+ b = apr_bucket_eos_create(c->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(tmp, b);
+
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
H2_STRM_MSG(stream, "added trailers"));
+ setup_input(stream);
+ status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ);
+ apr_brigade_destroy(tmp);
+ }
+ if (stream->input) {
+ return h2_beam_close(stream->input);
}
-
- b = apr_bucket_eos_create(c->bucket_alloc);
- APR_BRIGADE_INSERT_TAIL(tmp, b);
- status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ);
- apr_brigade_destroy(tmp);
- h2_beam_close(stream->input);
return status;
}
apr_bucket_brigade *tmp;
ap_assert(stream);
- if (!stream->input) {
- return APR_EOF;
+ if (len > 0) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
+ H2_STRM_MSG(stream, "recv DATA, len=%d"), (int)len);
+
+ tmp = apr_brigade_create(stream->pool, session->c->bucket_alloc);
+ apr_brigade_write(tmp, NULL, NULL, (const char *)data, len);
+ setup_input(stream);
+ status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ);
+ apr_brigade_destroy(tmp);
}
-
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
- H2_STRM_MSG(stream, "recv DATA, len=%d"), (int)len);
-
- tmp = apr_brigade_create(stream->pool, session->c->bucket_alloc);
- apr_brigade_write(tmp, NULL, NULL, (const char *)data, len);
- status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ);
- apr_brigade_destroy(tmp);
-
stream->in_data_frames++;
stream->in_data_octets += len;
return status;
stream->monitor = monitor;
stream->max_mem = session->max_stream_mem;
- h2_beam_create(&stream->input, pool, id, "input", H2_BEAM_OWNER_SEND, 0);
- h2_beam_send_from(stream->input, stream->pool);
h2_beam_create(&stream->output, pool, id, "output", H2_BEAM_OWNER_RECV, 0);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
* references into request pools */
apr_brigade_cleanup(stream->out_buffer);
}
- h2_beam_abort(stream->input);
- status = h2_beam_wait_empty(stream->input, APR_NONBLOCK_READ);
- if (status == APR_EAGAIN) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
- H2_STRM_MSG(stream, "wait on input drain"));
- status = h2_beam_wait_empty(stream->input, APR_BLOCK_READ);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c,
- H2_STRM_MSG(stream, "input drain returned"));
+ if (stream->input) {
+ h2_beam_abort(stream->input);
+ status = h2_beam_wait_empty(stream->input, APR_NONBLOCK_READ);
+ if (status == APR_EAGAIN) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
+ H2_STRM_MSG(stream, "wait on input drain"));
+ status = h2_beam_wait_empty(stream->input, APR_BLOCK_READ);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c,
+ H2_STRM_MSG(stream, "input drain returned"));
+ }
}
}
return pool;
}
+apr_status_t h2_stream_prep_processing(h2_stream *stream)
+{
+ if (stream->request) {
+ const h2_request *r = stream->request;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
+ H2_STRM_MSG(stream, "schedule %s %s://%s%s chunked=%d"),
+ r->method, r->scheme, r->authority, r->path, r->chunked);
+ setup_input(stream);
+ stream->scheduled = 1;
+ return APR_SUCCESS;
+ }
+ return APR_EINVAL;
+}
+
void h2_stream_rst(h2_stream *stream, int error_code)
{
stream->rst_error = error_code;
- h2_beam_abort(stream->input);
+ if (stream->input) {
+ h2_beam_abort(stream->input);
+ }
h2_beam_leave(stream->output);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
H2_STRM_MSG(stream, "reset, error=%d"), error_code);
unsigned int aborted : 1; /* was aborted */
unsigned int scheduled : 1; /* stream has been scheduled */
unsigned int has_response : 1; /* response headers are known */
+ unsigned int input_eof : 1; /* no more request data coming */
unsigned int push_policy; /* which push policy to use for this request */
struct h2_task *task; /* assigned task to fullfill request */
*/
void h2_stream_destroy(h2_stream *stream);
+apr_status_t h2_stream_prep_processing(h2_stream *stream);
+
/*
* Set a new monitor for this stream, replacing any existing one. Can
* be called with NULL to have no monitor installed.
#include <stddef.h>
#include <apr_atomic.h>
-#include <apr_thread_cond.h>
#include <apr_strings.h>
#include <httpd.h>
******************************************************************************/
int h2_task_can_redo(h2_task *task) {
- if (h2_beam_was_received(task->input.beam)) {
+ if (task->input.beam && h2_beam_was_received(task->input.beam)) {
/* cannot repeat that. */
return 0;
}
void h2_task_rst(h2_task *task, int error)
{
task->rst_error = error;
- h2_beam_leave(task->input.beam);
+ if (task->input.beam) {
+ h2_beam_leave(task->input.beam);
+ }
if (!task->worker_done) {
h2_beam_abort(task->output.beam);
}
task->output.beam = stream->output;
h2_beam_send_from(stream->output, task->pool);
- apr_thread_cond_create(&task->cond, pool);
h2_ctx_create_for(slave, task);
return task;
* of our own to disble those.
*/
-struct apr_thread_cond_t;
struct h2_bucket_beam;
struct h2_conn;
struct h2_mplx;
} output;
struct h2_mplx *mplx;
- struct apr_thread_cond_t *cond;
unsigned int filters_set : 1;
unsigned int frozen : 1;
* @macro
* Version number of the http2 module as c string
*/
-#define MOD_HTTP2_VERSION "1.9.2-DEV"
+#define MOD_HTTP2_VERSION "1.9.3-DEV"
/**
* @macro
* release. This is a 24 bit number with 8 bits for major number, 8 bits
* for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
*/
-#define MOD_HTTP2_VERSION_NUM 0x010901
+#define MOD_HTTP2_VERSION_NUM 0x010903
#endif /* mod_h2_h2_version_h */