Changes with Apache 2.4.26
+ *) mod_proxy_http2: Fixed bug in re-attempting proxy requests after
+ connection error. Reliability of reconnect handling improved.
+ [Stefan Eissing]
+
*) mod_http2: better performance, eliminated need for nested locks and
thread privates. Moving request setups from the main connection to the
worker threads. Increase number of spare connections kept.
format from 2.2 in the Last Modified column. PR60846.
[Hank Ibell <hwibell gmail.com>]
- *) mod_http2: fixed PR60869 by making h2 workers exit explicitly waking up
- all threads to exit in a defined way. [Stefan Eissing]
-
*) core: Add %{REMOTE_PORT} to the expression parser. PR59938
[Hank Ibell <hwibell gmail.com>]
++transferred;
}
else {
+ /* let outside hook determine how bucket is beamed */
+ leave_yellow(beam, &bl);
brecv = h2_beam_bucket(beam, bb, bsender);
+ enter_yellow(beam, &bl);
+
while (brecv && brecv != APR_BRIGADE_SENTINEL(bb)) {
++transferred;
remain -= brecv->length;
static apr_status_t h2_status_insert(h2_task *task, apr_bucket *b)
{
- h2_mplx *m = task->mplx;
- h2_stream *stream = h2_mplx_stream_get(m, task->stream_id);
- h2_session *s;
- conn_rec *c;
-
+ conn_rec *c = task->c->master;
+ h2_ctx *h2ctx = h2_ctx_get(c, 0);
+ h2_session *session;
+ h2_stream *stream;
apr_bucket_brigade *bb;
apr_bucket *e;
int32_t connFlowIn, connFlowOut;
+
+ if (!h2ctx || (session = h2_ctx_session_get(h2ctx)) == NULL) {
+ return APR_SUCCESS;
+ }
+
+ stream = h2_session_stream_get(session, task->stream_id);
if (!stream) {
/* stream already done */
return APR_SUCCESS;
}
- s = stream->session;
- c = s->c;
bb = apr_brigade_create(stream->pool, c->bucket_alloc);
- connFlowIn = nghttp2_session_get_effective_local_window_size(s->ngh2);
- connFlowOut = nghttp2_session_get_remote_window_size(s->ngh2);
+ connFlowIn = nghttp2_session_get_effective_local_window_size(session->ngh2);
+ connFlowOut = nghttp2_session_get_remote_window_size(session->ngh2);
bbout(bb, "{\n");
bbout(bb, " \"version\": \"draft-01\",\n");
- add_settings(bb, s, 0);
- add_peer_settings(bb, s, 0);
+ add_settings(bb, session, 0);
+ add_peer_settings(bb, session, 0);
bbout(bb, " \"connFlowIn\": %d,\n", connFlowIn);
bbout(bb, " \"connFlowOut\": %d,\n", connFlowOut);
- bbout(bb, " \"sentGoAway\": %d,\n", s->local.shutdown);
+ bbout(bb, " \"sentGoAway\": %d,\n", session->local.shutdown);
- add_streams(bb, s, 0);
+ add_streams(bb, session, 0);
- add_stats(bb, s, stream, 1);
+ add_stats(bb, session, stream, 1);
bbout(bb, "}\n");
while ((e = APR_BRIGADE_FIRST(bb)) != APR_BRIGADE_SENTINEL(bb)) {
apr_time_t now;
} stream_iter_ctx;
-/* NULL or the mutex hold by this thread, used for recursive calls
- */
-static const int nested_lock = 0;
-
-static apr_threadkey_t *thread_lock;
-
apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s)
{
- if (nested_lock) {
- return apr_threadkey_private_create(&thread_lock, NULL, pool);
- }
return APR_SUCCESS;
}
-static apr_status_t enter_mutex(h2_mplx *m, int *pacquired)
-{
- apr_status_t status;
-
- if (nested_lock) {
- void *mutex = NULL;
- /* Enter the mutex if this thread already holds the lock or
- * if we can acquire it. Only on the later case do we unlock
- * onleaving the mutex.
- * This allow recursive entering of the mutex from the saem thread,
- * which is what we need in certain situations involving callbacks
- */
- apr_threadkey_private_get(&mutex, thread_lock);
- if (mutex == m->lock) {
- *pacquired = 0;
- ap_assert(NULL); /* nested, why? */
- return APR_SUCCESS;
- }
- }
- status = apr_thread_mutex_lock(m->lock);
- *pacquired = (status == APR_SUCCESS);
- if (nested_lock && *pacquired) {
- apr_threadkey_private_set(m->lock, thread_lock);
- }
- return status;
-}
+#define H2_MPLX_ENTER(m) \
+ do { apr_status_t rv; if ((rv = apr_thread_mutex_lock(m->lock)) != APR_SUCCESS) {\
+ return rv;\
+ } } while(0)
-static void leave_mutex(h2_mplx *m, int acquired)
-{
- if (acquired) {
- if (nested_lock) {
- apr_threadkey_private_set(NULL, thread_lock);
- }
- apr_thread_mutex_unlock(m->lock);
- }
-}
+#define H2_MPLX_LEAVE(m) \
+ apr_thread_mutex_unlock(m->lock)
+
+#define H2_MPLX_ENTER_ALWAYS(m) \
+ apr_thread_mutex_lock(m->lock)
-static void check_data_for(h2_mplx *m, int stream_id);
+#define H2_MPLX_ENTER_MAYBE(m, lock) \
+ if (lock) apr_thread_mutex_lock(m->lock)
+
+#define H2_MPLX_LEAVE_MAYBE(m, lock) \
+ if (lock) apr_thread_mutex_unlock(m->lock)
+
+static void check_data_for(h2_mplx *m, h2_stream *stream, int lock);
static void stream_output_consumed(void *ctx,
h2_bucket_beam *beam, apr_off_t length)
h2_stream_cleanup(stream);
h2_iq_remove(m->q, stream->id);
+ h2_fifo_remove(m->readyq, stream);
h2_ihash_remove(m->streams, stream->id);
h2_ihash_add(m->shold, stream);
m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id));
m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id));
m->q = h2_iq_create(m->pool, m->max_streams);
- m->readyq = h2_iq_create(m->pool, m->max_streams);
+
+ status = h2_fifo_set_create(&m->readyq, m->pool, m->max_streams);
+ if (status != APR_SUCCESS) {
+ apr_pool_destroy(m->pool);
+ return NULL;
+ }
m->workers = workers;
m->max_active = workers->max_workers;
int h2_mplx_shutdown(h2_mplx *m)
{
- int acquired, max_stream_started = 0;
+ int max_stream_started = 0;
- if (enter_mutex(m, &acquired) == APR_SUCCESS) {
- max_stream_started = m->max_stream_started;
- /* Clear schedule queue, disabling existing streams from starting */
- h2_iq_clear(m->q);
- leave_mutex(m, acquired);
- }
+ H2_MPLX_ENTER(m);
+
+ max_stream_started = m->max_stream_started;
+ /* Clear schedule queue, disabling existing streams from starting */
+ h2_iq_clear(m->q);
+
+ H2_MPLX_LEAVE(m);
return max_stream_started;
}
return 0;
}
-static void purge_streams(h2_mplx *m)
+static void purge_streams(h2_mplx *m, int lock)
{
if (!h2_ihash_empty(m->spurge)) {
+ H2_MPLX_ENTER_MAYBE(m, lock);
while (!h2_ihash_iter(m->spurge, stream_destroy_iter, m)) {
/* repeat until empty */
}
+ H2_MPLX_LEAVE_MAYBE(m, lock);
}
}
apr_status_t h2_mplx_stream_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx)
{
- apr_status_t status;
- int acquired;
+ stream_iter_ctx_t x;
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- stream_iter_ctx_t x;
- x.cb = cb;
- x.ctx = ctx;
- h2_ihash_iter(m->streams, stream_iter_wrap, &x);
+ H2_MPLX_ENTER(m);
+
+ x.cb = cb;
+ x.ctx = ctx;
+ h2_ihash_iter(m->streams, stream_iter_wrap, &x);
- leave_mutex(m, acquired);
- }
- return status;
+ H2_MPLX_LEAVE(m);
+ return APR_SUCCESS;
}
static int report_stream_iter(void *ctx, void *val) {
{
apr_status_t status;
int i, wait_secs = 60;
- int acquired;
/* How to shut down a h2 connection:
* 0. abort and tell the workers that no more tasks will come from us */
m->aborted = 1;
h2_workers_unregister(m->workers, m);
- enter_mutex(m, &acquired);
+ H2_MPLX_ENTER_ALWAYS(m);
/* How to shut down a h2 connection:
* 1. cancel all streams still active */
h2_ihash_iter(m->shold, unexpected_stream_iter, m);
}
- leave_mutex(m, acquired);
+ H2_MPLX_LEAVE(m);
/* 5. unregister again, now that our workers are done */
h2_workers_unregister(m->workers, m);
apr_status_t h2_mplx_stream_cleanup(h2_mplx *m, h2_stream *stream)
{
- apr_status_t status = APR_SUCCESS;
- int acquired;
+ H2_MPLX_ENTER(m);
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- H2_STRM_MSG(stream, "cleanup"));
- stream_cleanup(m, stream);
- leave_mutex(m, acquired);
- }
- return status;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ H2_STRM_MSG(stream, "cleanup"));
+ stream_cleanup(m, stream);
+
+ H2_MPLX_LEAVE(m);
+ return APR_SUCCESS;
}
h2_stream *h2_mplx_stream_get(h2_mplx *m, int id)
{
h2_stream *s = NULL;
- int acquired;
- if ((enter_mutex(m, &acquired)) == APR_SUCCESS) {
- s = h2_ihash_get(m->streams, id);
- leave_mutex(m, acquired);
- }
+ H2_MPLX_ENTER_ALWAYS(m);
+
+ s = h2_ihash_get(m->streams, id);
+
+ H2_MPLX_LEAVE(m);
return s;
}
static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
{
- h2_mplx *m = ctx;
- int acquired;
+ h2_stream *stream = ctx;
+ h2_mplx *m = stream->session->mplx;
- if (enter_mutex(m, &acquired) == APR_SUCCESS) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%ld-%d): output_produced", m->c->id, beam->id);
- check_data_for(m, beam->id);
- leave_mutex(m, acquired);
- }
+ check_data_for(m, stream, 1);
}
static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
}
h2_beam_on_consumed(stream->output, NULL, stream_output_consumed, stream);
- h2_beam_on_produced(stream->output, output_produced, m);
+ h2_beam_on_produced(stream->output, output_produced, stream);
if (stream->task->output.copy_files) {
h2_beam_on_file_beam(stream->output, h2_beam_no_files, NULL);
}
/* we might see some file buckets in the output, see
* if we have enough handles reserved. */
- check_data_for(m, stream->id);
+ check_data_for(m, stream, 0);
return status;
}
apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
{
apr_status_t status;
- int acquired;
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- if (m->aborted) {
- status = APR_ECONNABORTED;
- }
- else {
- status = out_open(m, stream_id, beam);
- }
- leave_mutex(m, acquired);
+ H2_MPLX_ENTER(m);
+
+ if (m->aborted) {
+ status = APR_ECONNABORTED;
}
+ else {
+ status = out_open(m, stream_id, beam);
+ }
+
+ H2_MPLX_LEAVE(m);
return status;
}
status = h2_beam_close(task->output.beam);
h2_beam_log(task->output.beam, m->c, APLOG_TRACE2, "out_close");
output_consumed_signal(m, task);
- check_data_for(m, task->stream_id);
+ check_data_for(m, stream, 0);
return status;
}
apr_thread_cond_t *iowait)
{
apr_status_t status;
- int acquired;
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- if (m->aborted) {
- status = APR_ECONNABORTED;
- }
- else if (apr_atomic_read32(&m->event_pending) > 0) {
- status = APR_SUCCESS;
- }
- else {
- purge_streams(m);
- h2_ihash_iter(m->streams, report_consumption_iter, m);
- m->added_output = iowait;
- status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
- if (APLOGctrace2(m->c)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%ld): trywait on data for %f ms)",
- m->id, timeout/1000.0);
- }
- m->added_output = NULL;
+ H2_MPLX_ENTER(m);
+
+ if (m->aborted) {
+ status = APR_ECONNABORTED;
+ }
+ else if (h2_mplx_has_master_events(m)) {
+ status = APR_SUCCESS;
+ }
+ else {
+ purge_streams(m, 0);
+ h2_ihash_iter(m->streams, report_consumption_iter, m);
+ m->added_output = iowait;
+ status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
+ if (APLOGctrace2(m->c)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%ld): trywait on data for %f ms)",
+ m->id, timeout/1000.0);
}
- leave_mutex(m, acquired);
+ m->added_output = NULL;
}
+
+ H2_MPLX_LEAVE(m);
return status;
}
-static void check_data_for(h2_mplx *m, int stream_id)
+static void check_data_for(h2_mplx *m, h2_stream *stream, int lock)
{
- ap_assert(m);
- h2_iq_append(m->readyq, stream_id);
- apr_atomic_set32(&m->event_pending, 1);
- if (m->added_output) {
- apr_thread_cond_signal(m->added_output);
+ if (h2_fifo_push(m->readyq, stream) == APR_SUCCESS) {
+ apr_atomic_set32(&m->event_pending, 1);
+ H2_MPLX_ENTER_MAYBE(m, lock);
+ if (m->added_output) {
+ apr_thread_cond_signal(m->added_output);
+ }
+ H2_MPLX_LEAVE_MAYBE(m, lock);
}
}
apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx)
{
apr_status_t status;
- int acquired;
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- if (m->aborted) {
- status = APR_ECONNABORTED;
- }
- else {
- h2_iq_sort(m->q, cmp, ctx);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): reprioritize tasks", m->id);
- }
- leave_mutex(m, acquired);
+ H2_MPLX_ENTER(m);
+
+ if (m->aborted) {
+ status = APR_ECONNABORTED;
+ }
+ else {
+ h2_iq_sort(m->q, cmp, ctx);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): reprioritize tasks", m->id);
+ status = APR_SUCCESS;
}
+
+ H2_MPLX_LEAVE(m);
return status;
}
h2_stream_pri_cmp *cmp, void *ctx)
{
apr_status_t status;
- int acquired;
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- if (m->aborted) {
- status = APR_ECONNABORTED;
+ H2_MPLX_ENTER(m);
+
+ if (m->aborted) {
+ status = APR_ECONNABORTED;
+ }
+ else {
+ status = APR_SUCCESS;
+ h2_ihash_add(m->streams, stream);
+ if (h2_stream_is_ready(stream)) {
+ /* already have a response */
+ check_data_for(m, stream, 0);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ H2_STRM_MSG(stream, "process, add to readyq"));
}
else {
- h2_ihash_add(m->streams, stream);
- if (h2_stream_is_ready(stream)) {
- /* already have a response */
- check_data_for(m, stream->id);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- H2_STRM_MSG(stream, "process, add to readyq"));
- }
- else {
- h2_iq_add(m->q, stream->id, cmp, ctx);
- register_if_needed(m);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- H2_STRM_MSG(stream, "process, added to q"));
- }
+ h2_iq_add(m->q, stream->id, cmp, ctx);
+ register_if_needed(m);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ H2_STRM_MSG(stream, "process, added to q"));
}
- leave_mutex(m, acquired);
}
+
+ H2_MPLX_LEAVE(m);
return status;
}
h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more)
{
h2_task *task = NULL;
- apr_status_t status;
- int acquired;
+ H2_MPLX_ENTER_ALWAYS(m);
+
*has_more = 0;
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- if (!m->aborted) {
- task = next_stream_task(m);
- if (task != NULL && !h2_iq_empty(m->q)) {
- *has_more = 1;
- }
- else {
- m->is_registered = 0; /* h2_workers will discard this mplx */
- }
+ if (!m->aborted) {
+ task = next_stream_task(m);
+ if (task != NULL && !h2_iq_empty(m->q)) {
+ *has_more = 1;
+ }
+ else {
+ m->is_registered = 0; /* h2_workers will discard this mplx */
}
- leave_mutex(m, acquired);
}
+
+ H2_MPLX_LEAVE(m);
return task;
}
if (task->engine) {
if (!m->aborted && !task->c->aborted
&& !h2_req_engine_is_shutdown(task->engine)) {
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(10022)
"h2_mplx(%ld): task(%s) has not-shutdown "
"engine(%s)", m->id, task->id,
h2_req_engine_get_id(task->engine));
}
stream = h2_ihash_get(m->streams, task->stream_id);
- if (stream && !m->aborted && h2_ihash_get(m->sredo, stream->id)) {
- /* reset and schedule again */
- h2_task_redo(task);
- h2_ihash_remove(m->sredo, stream->id);
- h2_iq_add(m->q, stream->id, NULL, NULL);
- return;
- }
-
if (stream) {
- /* stream not cleaned up, stay around */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- H2_STRM_MSG(stream, "task_done, stream open"));
- /* more data will not arrive, resume the stream */
- if (stream->input) {
- h2_beam_mutex_disable(stream->input);
- h2_beam_leave(stream->input);
+ /* stream not done yet. */
+ if (!m->aborted && h2_ihash_get(m->sredo, stream->id)) {
+ /* reset and schedule again */
+ h2_task_redo(task);
+ h2_ihash_remove(m->sredo, stream->id);
+ h2_iq_add(m->q, stream->id, NULL, NULL);
}
- if (stream->output) {
- h2_beam_mutex_disable(stream->output);
+ else {
+ /* stream not cleaned up, stay around */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ H2_STRM_MSG(stream, "task_done, stream open"));
+ /* more data will not arrive, resume the stream */
+ check_data_for(m, stream, 0);
+
+ if (stream->input) {
+ h2_beam_leave(stream->input);
+ h2_beam_mutex_disable(stream->input);
+ }
+ if (stream->output) {
+ h2_beam_mutex_disable(stream->output);
+ }
}
- check_data_for(m, stream->id);
}
else if ((stream = h2_ihash_get(m->shold, task->stream_id)) != NULL) {
+ /* stream is done, was just waiting for this. */
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
H2_STRM_MSG(stream, "task_done, in hold"));
- /* stream was just waiting for us. */
if (stream->input) {
- h2_beam_mutex_disable(stream->input);
h2_beam_leave(stream->input);
+ h2_beam_mutex_disable(stream->input);
}
if (stream->output) {
h2_beam_mutex_disable(stream->output);
void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask)
{
- int acquired;
+ H2_MPLX_ENTER_ALWAYS(m);
+
+ task_done(m, task, NULL);
+ --m->tasks_active;
- if (enter_mutex(m, &acquired) == APR_SUCCESS) {
- task_done(m, task, NULL);
- --m->tasks_active;
- if (m->join_wait) {
- apr_thread_cond_signal(m->join_wait);
- }
- if (ptask) {
- /* caller wants another task */
- *ptask = next_stream_task(m);
- }
- register_if_needed(m);
- leave_mutex(m, acquired);
+ if (m->join_wait) {
+ apr_thread_cond_signal(m->join_wait);
+ }
+ if (ptask) {
+ /* caller wants another task */
+ *ptask = next_stream_task(m);
}
+ register_if_needed(m);
+
+ H2_MPLX_LEAVE(m);
}
/*******************************************************************************
{
apr_status_t status = APR_SUCCESS;
apr_time_t now;
- int acquired;
+ apr_size_t scount;
- if (enter_mutex(m, &acquired) == APR_SUCCESS) {
- apr_size_t scount = h2_ihash_count(m->streams);
- if (scount > 0 && m->tasks_active) {
- /* If we have streams in connection state 'IDLE', meaning
- * all streams are ready to sent data out, but lack
- * WINDOW_UPDATEs.
- *
- * This is ok, unless we have streams that still occupy
- * h2 workers. As worker threads are a scarce resource,
- * we need to take measures that we do not get DoSed.
- *
- * This is what we call an 'idle block'. Limit the amount
- * of busy workers we allow for this connection until it
- * well behaves.
- */
- now = apr_time_now();
- m->last_idle_block = now;
- if (m->limit_active > 2
- && now - m->last_limit_change >= m->limit_change_interval) {
- if (m->limit_active > 16) {
- m->limit_active = 16;
- }
- else if (m->limit_active > 8) {
- m->limit_active = 8;
- }
- else if (m->limit_active > 4) {
- m->limit_active = 4;
- }
- else if (m->limit_active > 2) {
- m->limit_active = 2;
- }
- m->last_limit_change = now;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): decrease worker limit to %d",
- m->id, m->limit_active);
+ H2_MPLX_ENTER(m);
+
+ scount = h2_ihash_count(m->streams);
+ if (scount > 0 && m->tasks_active) {
+ /* If we have streams in connection state 'IDLE', meaning
+ * all streams are ready to sent data out, but lack
+ * WINDOW_UPDATEs.
+ *
+ * This is ok, unless we have streams that still occupy
+ * h2 workers. As worker threads are a scarce resource,
+ * we need to take measures that we do not get DoSed.
+ *
+ * This is what we call an 'idle block'. Limit the amount
+ * of busy workers we allow for this connection until it
+ * well behaves.
+ */
+ now = apr_time_now();
+ m->last_idle_block = now;
+ if (m->limit_active > 2
+ && now - m->last_limit_change >= m->limit_change_interval) {
+ if (m->limit_active > 16) {
+ m->limit_active = 16;
}
-
- if (m->tasks_active > m->limit_active) {
- status = unschedule_slow_tasks(m);
+ else if (m->limit_active > 8) {
+ m->limit_active = 8;
+ }
+ else if (m->limit_active > 4) {
+ m->limit_active = 4;
+ }
+ else if (m->limit_active > 2) {
+ m->limit_active = 2;
}
+ m->last_limit_change = now;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): decrease worker limit to %d",
+ m->id, m->limit_active);
+ }
+
+ if (m->tasks_active > m->limit_active) {
+ status = unschedule_slow_tasks(m);
}
- register_if_needed(m);
- leave_mutex(m, acquired);
}
+ register_if_needed(m);
+
+ H2_MPLX_LEAVE(m);
return status;
}
apr_status_t status;
h2_mplx *m;
h2_task *task;
- int acquired;
+ h2_stream *stream;
task = h2_ctx_rget_task(r);
if (!task) {
}
m = task->mplx;
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
-
- if (stream) {
- status = h2_ngn_shed_push_request(m->ngn_shed, ngn_type, r, einit);
- }
- else {
- status = APR_ECONNABORTED;
- }
- leave_mutex(m, acquired);
+ H2_MPLX_ENTER(m);
+
+ stream = h2_ihash_get(m->streams, task->stream_id);
+ if (stream) {
+ status = h2_ngn_shed_push_request(m->ngn_shed, ngn_type, r, einit);
+ }
+ else {
+ status = APR_ECONNABORTED;
}
+
+ H2_MPLX_LEAVE(m);
return status;
}
h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn);
h2_mplx *m = h2_ngn_shed_get_ctx(shed);
apr_status_t status;
- int acquired;
+ int want_shutdown;
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- int want_shutdown = (block == APR_BLOCK_READ);
+ H2_MPLX_ENTER(m);
- /* Take this opportunity to update output consummation
- * for this engine */
- ngn_out_update_windows(m, ngn);
-
- if (want_shutdown && !h2_iq_empty(m->q)) {
- /* For a blocking read, check first if requests are to be
- * had and, if not, wait a short while before doing the
- * blocking, and if unsuccessful, terminating read.
- */
+ want_shutdown = (block == APR_BLOCK_READ);
+
+ /* Take this opportunity to update output consummation
+ * for this engine */
+ ngn_out_update_windows(m, ngn);
+
+ if (want_shutdown && !h2_iq_empty(m->q)) {
+ /* For a blocking read, check first if requests are to be
+ * had and, if not, wait a short while before doing the
+ * blocking, and if unsuccessful, terminating read.
+ */
+ status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr);
+ if (APR_STATUS_IS_EAGAIN(status)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): start block engine pull", m->id);
+ apr_thread_cond_timedwait(m->task_thawed, m->lock,
+ apr_time_from_msec(20));
status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr);
- if (APR_STATUS_IS_EAGAIN(status)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): start block engine pull", m->id);
- apr_thread_cond_timedwait(m->task_thawed, m->lock,
- apr_time_from_msec(20));
- status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr);
- }
- }
- else {
- status = h2_ngn_shed_pull_request(shed, ngn, capacity,
- want_shutdown, pr);
}
- leave_mutex(m, acquired);
}
+ else {
+ status = h2_ngn_shed_pull_request(shed, ngn, capacity,
+ want_shutdown, pr);
+ }
+
+ H2_MPLX_LEAVE(m);
return status;
}
if (task) {
h2_mplx *m = task->mplx;
- int acquired;
+ h2_stream *stream;
- if (enter_mutex(m, &acquired) == APR_SUCCESS) {
- h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
-
- ngn_out_update_windows(m, ngn);
- h2_ngn_shed_done_task(m->ngn_shed, ngn, task);
-
- if (status != APR_SUCCESS && stream
- && h2_task_can_redo(task)
- && !h2_ihash_get(m->sredo, stream->id)) {
- h2_ihash_add(m->sredo, stream);
- }
- if (task->engine) {
- /* cannot report that as done until engine returns */
- }
- else {
- task_done(m, task, ngn);
- }
- /* Take this opportunity to update output consummation
- * for this engine */
- leave_mutex(m, acquired);
+ H2_MPLX_ENTER_ALWAYS(m);
+
+ stream = h2_ihash_get(m->streams, task->stream_id);
+
+ ngn_out_update_windows(m, ngn);
+ h2_ngn_shed_done_task(m->ngn_shed, ngn, task);
+
+ if (status != APR_SUCCESS && stream
+ && h2_task_can_redo(task)
+ && !h2_ihash_get(m->sredo, stream->id)) {
+ h2_ihash_add(m->sredo, stream);
+ }
+
+ if (task->engine) {
+ /* cannot report that as done until engine returns */
}
+ else {
+ task_done(m, task, ngn);
+ }
+
+ H2_MPLX_LEAVE(m);
}
}
stream_ev_callback *on_resume,
void *on_ctx)
{
- apr_status_t status;
- int acquired;
- int ids[100];
h2_stream *stream;
- size_t i, n;
+ int n;
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%ld): dispatch events", m->id);
- apr_atomic_set32(&m->event_pending, 0);
- purge_streams(m);
-
- /* update input windows for streams */
- h2_ihash_iter(m->streams, report_consumption_iter, m);
-
- if (!h2_iq_empty(m->readyq)) {
- n = h2_iq_mshift(m->readyq, ids, H2_ALEN(ids));
- for (i = 0; i < n; ++i) {
- stream = h2_ihash_get(m->streams, ids[i]);
- if (stream) {
- leave_mutex(m, acquired);
- on_resume(on_ctx, stream);
- enter_mutex(m, &acquired);
- }
- }
- }
- if (!h2_iq_empty(m->readyq)) {
- apr_atomic_set32(&m->event_pending, 1);
- }
- leave_mutex(m, acquired);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%ld): dispatch events", m->id);
+ apr_atomic_set32(&m->event_pending, 0);
+
+ /* update input windows for streams */
+ h2_ihash_iter(m->streams, report_consumption_iter, m);
+ purge_streams(m, 1);
+
+ n = h2_fifo_count(m->readyq);
+ while (n > 0
+ && (h2_fifo_try_pull(m->readyq, (void**)&stream) == APR_SUCCESS)) {
+ --n;
+ on_resume(on_ctx, stream);
}
- return status;
+
+ return APR_SUCCESS;
}
-apr_status_t h2_mplx_keep_active(h2_mplx *m, int stream_id)
+apr_status_t h2_mplx_keep_active(h2_mplx *m, h2_stream *stream)
{
- apr_status_t status;
- int acquired;
-
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- check_data_for(m, stream_id);
- leave_mutex(m, acquired);
- }
- return status;
+ check_data_for(m, stream, 1);
+ return APR_SUCCESS;
}
int h2_mplx_awaits_data(h2_mplx *m)
{
- apr_status_t status;
- int acquired, waiting = 1;
+ int waiting = 1;
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- if (h2_ihash_empty(m->streams)) {
- waiting = 0;
- }
- if (h2_iq_empty(m->readyq) && h2_iq_empty(m->q) && !m->tasks_active) {
- waiting = 0;
- }
- leave_mutex(m, acquired);
+ H2_MPLX_ENTER_ALWAYS(m);
+
+ if (h2_ihash_empty(m->streams)) {
+ waiting = 0;
}
+ if ((h2_fifo_count(m->readyq) == 0)
+ && h2_iq_empty(m->q) && !m->tasks_active) {
+ waiting = 0;
+ }
+
+ H2_MPLX_LEAVE(m);
return waiting;
}
struct h2_ihash_t *spurge; /* all streams done, ready for destroy */
struct h2_iqueue *q; /* all stream ids that need to be started */
- struct h2_iqueue *readyq; /* all stream ids ready for output */
+ struct h2_fifo *readyq; /* all streams ready for output */
struct h2_ihash_t *redo_tasks; /* all tasks that need to be redone */
apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
struct apr_thread_cond_t *iowait);
-apr_status_t h2_mplx_keep_active(h2_mplx *m, int stream_id);
+apr_status_t h2_mplx_keep_active(h2_mplx *m, struct h2_stream *stream);
/*******************************************************************************
* Stream processing.
entry->task = task;
entry->r = r;
H2_REQ_ENTRIES_INSERT_TAIL(&ngn->entries, entry);
+ ngn->no_assigned++;
}
task->assigned = NULL;
}
+ if (task->engine) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
+ "h2_ngn_shed(%ld): push task(%s) hosting engine %s "
+ "already with %d tasks",
+ shed->c->id, task->id, task->engine->id,
+ task->engine->no_assigned);
+ task->assigned = task->engine;
+ ngn_add_task(task->engine, task, r);
+ return APR_SUCCESS;
+ }
+
ngn = apr_hash_get(shed->ngns, ngn_type, APR_HASH_KEY_STRING);
if (ngn && !ngn->shutdown) {
/* this task will be processed in another thread,
h2_task_freeze(task);
}
ngn_add_task(ngn, task, r);
- ngn->no_assigned++;
return APR_SUCCESS;
}
status = einit(newngn, newngn->id, newngn->type, newngn->pool,
shed->req_buffer_size, r,
&newngn->out_consumed, &newngn->out_consumed_ctx);
+
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, task->c, APLOGNO(03395)
"h2_ngn_shed(%ld): create engine %s (%s)",
shed->c->id, newngn->id, newngn->type);
if (status == APR_SUCCESS) {
- ap_assert(task->engine == NULL);
newngn->task = task;
task->engine = newngn;
task->assigned = newngn;
static void process_proxy_header(h2_proxy_stream *stream, const char *n, const char *v)
{
- request_rec *r = stream->r;
static const struct {
const char *name;
ap_proxy_header_reverse_map_fn func;
{ "Set-Cookie", ap_proxy_cookie_reverse_map },
{ NULL, NULL }
};
+ request_rec *r = stream->r;
proxy_dir_conf *dconf;
int i;
- for (i = 0; transform_hdrs[i].name; ++i) {
- if (!ap_cstr_casecmp(transform_hdrs[i].name, n)) {
+ dconf = ap_get_module_config(r->per_dir_config, &proxy_module);
+ if (!dconf->preserve_host) {
+ for (i = 0; transform_hdrs[i].name; ++i) {
+ if (!ap_cstr_casecmp(transform_hdrs[i].name, n)) {
+ apr_table_add(r->headers_out, n,
+ (*transform_hdrs[i].func)(r, dconf, v));
+ return;
+ }
+ }
+ if (!ap_cstr_casecmp("Link", n)) {
dconf = ap_get_module_config(r->per_dir_config, &proxy_module);
apr_table_add(r->headers_out, n,
- (*transform_hdrs[i].func)(r, dconf, v));
+ h2_proxy_link_reverse_map(r, dconf,
+ stream->real_server_uri, stream->p_server_uri, v));
return;
- }
- }
- if (!ap_cstr_casecmp("Link", n)) {
- dconf = ap_get_module_config(r->per_dir_config, &proxy_module);
- apr_table_add(r->headers_out, n,
- h2_proxy_link_reverse_map(r, dconf,
- stream->real_server_uri, stream->p_server_uri, v));
- return;
+ }
}
apr_table_add(r->headers_out, n, v);
}
#include <assert.h>
#include <apr_lib.h>
#include <apr_strings.h>
+#include <apr_thread_mutex.h>
+#include <apr_thread_cond.h>
#include <httpd.h>
#include <http_core.h>
"link_reverse_map %s --> %s", s, ctx.s);
return ctx.s;
}
+
+/*******************************************************************************
+ * FIFO queue
+ ******************************************************************************/
+
+struct h2_proxy_fifo {
+ void **elems;
+ int nelems;
+ int set;
+ int head;
+ int count;
+ int aborted;
+ apr_thread_mutex_t *lock;
+ apr_thread_cond_t *not_empty;
+ apr_thread_cond_t *not_full;
+};
+
+static int nth_index(h2_proxy_fifo *fifo, int n)
+{
+ return (fifo->head + n) % fifo->nelems;
+}
+
+static apr_status_t fifo_destroy(void *data)
+{
+ h2_proxy_fifo *fifo = data;
+
+ apr_thread_cond_destroy(fifo->not_empty);
+ apr_thread_cond_destroy(fifo->not_full);
+ apr_thread_mutex_destroy(fifo->lock);
+
+ return APR_SUCCESS;
+}
+
+static int index_of(h2_proxy_fifo *fifo, void *elem)
+{
+ int i;
+
+ for (i = 0; i < fifo->count; ++i) {
+ if (elem == fifo->elems[nth_index(fifo, i)]) {
+ return i;
+ }
+ }
+ return -1;
+}
+
+static apr_status_t create_int(h2_proxy_fifo **pfifo, apr_pool_t *pool,
+ int capacity, int as_set)
+{
+ apr_status_t rv;
+ h2_proxy_fifo *fifo;
+
+ fifo = apr_pcalloc(pool, sizeof(*fifo));
+ if (fifo == NULL) {
+ return APR_ENOMEM;
+ }
+
+ rv = apr_thread_mutex_create(&fifo->lock,
+ APR_THREAD_MUTEX_UNNESTED, pool);
+ if (rv != APR_SUCCESS) {
+ return rv;
+ }
+
+ rv = apr_thread_cond_create(&fifo->not_empty, pool);
+ if (rv != APR_SUCCESS) {
+ return rv;
+ }
+
+ rv = apr_thread_cond_create(&fifo->not_full, pool);
+ if (rv != APR_SUCCESS) {
+ return rv;
+ }
+
+ fifo->elems = apr_pcalloc(pool, capacity * sizeof(void*));
+ if (fifo->elems == NULL) {
+ return APR_ENOMEM;
+ }
+ fifo->nelems = capacity;
+ fifo->set = as_set;
+
+ *pfifo = fifo;
+ apr_pool_cleanup_register(pool, fifo, fifo_destroy, apr_pool_cleanup_null);
+
+ return APR_SUCCESS;
+}
+
+apr_status_t h2_proxy_fifo_create(h2_proxy_fifo **pfifo, apr_pool_t *pool, int capacity)
+{
+ return create_int(pfifo, pool, capacity, 0);
+}
+
+apr_status_t h2_proxy_fifo_set_create(h2_proxy_fifo **pfifo, apr_pool_t *pool, int capacity)
+{
+ return create_int(pfifo, pool, capacity, 1);
+}
+
+apr_status_t h2_proxy_fifo_term(h2_proxy_fifo *fifo)
+{
+ apr_status_t rv;
+ if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
+ fifo->aborted = 1;
+ apr_thread_mutex_unlock(fifo->lock);
+ }
+ return rv;
+}
+
+apr_status_t h2_proxy_fifo_interrupt(h2_proxy_fifo *fifo)
+{
+ apr_status_t rv;
+ if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
+ apr_thread_cond_broadcast(fifo->not_empty);
+ apr_thread_cond_broadcast(fifo->not_full);
+ apr_thread_mutex_unlock(fifo->lock);
+ }
+ return rv;
+}
+
+int h2_proxy_fifo_count(h2_proxy_fifo *fifo)
+{
+ return fifo->count;
+}
+
+int h2_proxy_fifo_capacity(h2_proxy_fifo *fifo)
+{
+ return fifo->nelems;
+}
+
+static apr_status_t check_not_empty(h2_proxy_fifo *fifo, int block)
+{
+ if (fifo->count == 0) {
+ if (!block) {
+ return APR_EAGAIN;
+ }
+ while (fifo->count == 0) {
+ if (fifo->aborted) {
+ return APR_EOF;
+ }
+ apr_thread_cond_wait(fifo->not_empty, fifo->lock);
+ }
+ }
+ return APR_SUCCESS;
+}
+
+static apr_status_t fifo_push(h2_proxy_fifo *fifo, void *elem, int block)
+{
+ apr_status_t rv;
+
+ if (fifo->aborted) {
+ return APR_EOF;
+ }
+
+ if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
+ if (fifo->set && index_of(fifo, elem) >= 0) {
+ /* set mode, elem already member */
+ apr_thread_mutex_unlock(fifo->lock);
+ return APR_EEXIST;
+ }
+ else if (fifo->count == fifo->nelems) {
+ if (block) {
+ while (fifo->count == fifo->nelems) {
+ if (fifo->aborted) {
+ apr_thread_mutex_unlock(fifo->lock);
+ return APR_EOF;
+ }
+ apr_thread_cond_wait(fifo->not_full, fifo->lock);
+ }
+ }
+ else {
+ apr_thread_mutex_unlock(fifo->lock);
+ return APR_EAGAIN;
+ }
+ }
+
+ ap_assert(fifo->count < fifo->nelems);
+ fifo->elems[nth_index(fifo, fifo->count)] = elem;
+ ++fifo->count;
+ if (fifo->count == 1) {
+ apr_thread_cond_broadcast(fifo->not_empty);
+ }
+ apr_thread_mutex_unlock(fifo->lock);
+ }
+ return rv;
+}
+
+apr_status_t h2_proxy_fifo_push(h2_proxy_fifo *fifo, void *elem)
+{
+ return fifo_push(fifo, elem, 1);
+}
+
+apr_status_t h2_proxy_fifo_try_push(h2_proxy_fifo *fifo, void *elem)
+{
+ return fifo_push(fifo, elem, 0);
+}
+
+static void *pull_head(h2_proxy_fifo *fifo)
+{
+ void *elem;
+
+ ap_assert(fifo->count > 0);
+ elem = fifo->elems[fifo->head];
+ --fifo->count;
+ if (fifo->count > 0) {
+ fifo->head = nth_index(fifo, 1);
+ if (fifo->count+1 == fifo->nelems) {
+ apr_thread_cond_broadcast(fifo->not_full);
+ }
+ }
+ return elem;
+}
+
+static apr_status_t fifo_pull(h2_proxy_fifo *fifo, void **pelem, int block)
+{
+ apr_status_t rv;
+
+ if (fifo->aborted) {
+ return APR_EOF;
+ }
+
+ if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
+ if ((rv = check_not_empty(fifo, block)) != APR_SUCCESS) {
+ apr_thread_mutex_unlock(fifo->lock);
+ *pelem = NULL;
+ return rv;
+ }
+
+ ap_assert(fifo->count > 0);
+ *pelem = pull_head(fifo);
+
+ apr_thread_mutex_unlock(fifo->lock);
+ }
+ return rv;
+}
+
+apr_status_t h2_proxy_fifo_pull(h2_proxy_fifo *fifo, void **pelem)
+{
+ return fifo_pull(fifo, pelem, 1);
+}
+
+apr_status_t h2_proxy_fifo_try_pull(h2_proxy_fifo *fifo, void **pelem)
+{
+ return fifo_pull(fifo, pelem, 0);
+}
+
+apr_status_t h2_proxy_fifo_remove(h2_proxy_fifo *fifo, void *elem)
+{
+ apr_status_t rv;
+
+ if (fifo->aborted) {
+ return APR_EOF;
+ }
+
+ if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
+ int i, rc;
+ void *e;
+
+ rc = 0;
+ for (i = 0; i < fifo->count; ++i) {
+ e = fifo->elems[nth_index(fifo, i)];
+ if (e == elem) {
+ ++rc;
+ }
+ else if (rc) {
+ fifo->elems[nth_index(fifo, i-rc)] = e;
+ }
+ }
+ if (rc) {
+ fifo->count -= rc;
+ if (fifo->count + rc == fifo->nelems) {
+ apr_thread_cond_broadcast(fifo->not_full);
+ }
+ rv = APR_SUCCESS;
+ }
+ else {
+ rv = APR_EAGAIN;
+ }
+
+ apr_thread_mutex_unlock(fifo->lock);
+ }
+ return rv;
+}
const char *proxy_server_uri,
const char *s);
+/*******************************************************************************
+ * FIFO queue
+ ******************************************************************************/
+
+/**
+ * A thread-safe FIFO queue with some extra bells and whistles, if you
+ * do not need anything special, better use 'apr_queue'.
+ */
+typedef struct h2_proxy_fifo h2_proxy_fifo;
+
+/**
+ * Create a FIFO queue that can hold up to capacity elements. Elements can
+ * appear several times.
+ */
+apr_status_t h2_proxy_fifo_create(h2_proxy_fifo **pfifo, apr_pool_t *pool, int capacity);
+
+/**
+ * Create a FIFO set that can hold up to capacity elements. Elements only
+ * appear once. Pushing an element already present does not change the
+ * queue and is successful.
+ */
+apr_status_t h2_proxy_fifo_set_create(h2_proxy_fifo **pfifo, apr_pool_t *pool, int capacity);
+
+apr_status_t h2_proxy_fifo_term(h2_proxy_fifo *fifo);
+apr_status_t h2_proxy_fifo_interrupt(h2_proxy_fifo *fifo);
+
+int h2_proxy_fifo_capacity(h2_proxy_fifo *fifo);
+int h2_proxy_fifo_count(h2_proxy_fifo *fifo);
+
+/**
+ * Push en element into the queue. Blocks if there is no capacity left.
+ *
+ * @param fifo the FIFO queue
+ * @param elem the element to push
+ * @return APR_SUCCESS on push, APR_EAGAIN on try_push on a full queue,
+ * APR_EEXIST when in set mode and elem already there.
+ */
+apr_status_t h2_proxy_fifo_push(h2_proxy_fifo *fifo, void *elem);
+apr_status_t h2_proxy_fifo_try_push(h2_proxy_fifo *fifo, void *elem);
+
+apr_status_t h2_proxy_fifo_pull(h2_proxy_fifo *fifo, void **pelem);
+apr_status_t h2_proxy_fifo_try_pull(h2_proxy_fifo *fifo, void **pelem);
+
+/**
+ * Remove the elem from the queue, will remove multiple appearances.
+ * @param elem the element to remove
+ * @return APR_SUCCESS iff > 0 elems were removed, APR_EAGAIN otherwise.
+ */
+apr_status_t h2_proxy_fifo_remove(h2_proxy_fifo *fifo, void *elem);
+
+
#endif /* defined(__mod_h2__h2_proxy_util__) */
return NGHTTP2_ERR_PROTO;
}
-static h2_stream *get_stream(h2_session *session, int stream_id)
+h2_stream *h2_session_stream_get(h2_session *session, int stream_id)
{
return nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
}
h2_stream * stream;
int rv = 0;
- stream = get_stream(session, stream_id);
+ stream = h2_session_stream_get(session, stream_id);
if (stream) {
status = h2_stream_recv_DATA(stream, flags, data, len);
}
h2_stream *stream;
(void)ngh2;
- stream = get_stream(session, stream_id);
+ stream = h2_session_stream_get(session, stream_id);
if (stream) {
if (error_code) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
/* We may see HEADERs at the start of a stream or after all DATA
* streams to carry trailers. */
(void)ngh2;
- s = get_stream(session, frame->hd.stream_id);
+ s = h2_session_stream_get(session, frame->hd.stream_id);
if (s) {
/* nop */
}
apr_status_t status;
(void)flags;
- stream = get_stream(session, frame->hd.stream_id);
+ stream = h2_session_stream_get(session, frame->hd.stream_id);
if (!stream) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(02920)
"h2_stream(%ld-%d): on_header unknown stream",
/* This can be HEADERS for a new stream, defining the request,
* or HEADER may come after DATA at the end of a stream as in
* trailers */
- stream = get_stream(session, frame->hd.stream_id);
+ stream = h2_session_stream_get(session, frame->hd.stream_id);
if (stream) {
h2_stream_recv_frame(stream, NGHTTP2_HEADERS, frame->hd.flags);
}
break;
case NGHTTP2_DATA:
- stream = get_stream(session, frame->hd.stream_id);
+ stream = h2_session_stream_get(session, frame->hd.stream_id);
if (stream) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
H2_STRM_LOG(APLOGNO(02923), stream,
"h2_stream(%ld-%d): RST_STREAM by client, errror=%d",
session->id, (int)frame->hd.stream_id,
(int)frame->rst_stream.error_code);
- stream = get_stream(session, frame->hd.stream_id);
+ stream = h2_session_stream_get(session, frame->hd.stream_id);
if (stream && stream->initiated_on) {
++session->pushes_reset;
}
}
padlen = (unsigned char)frame->data.padlen;
- stream = get_stream(session, stream_id);
+ stream = h2_session_stream_get(session, stream_id);
if (!stream) {
ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_NOTFOUND, session->c,
APLOGNO(02924)
(long)session->frames_sent);
}
- stream = get_stream(session, stream_id);
+ stream = h2_session_stream_get(session, stream_id);
if (stream) {
h2_stream_send_frame(stream, frame->hd.type, frame->hd.flags);
}
apr_pstrndup(session->pool, (const char *)name, namelen),
apr_pstrndup(session->pool, (const char *)value, valuelen));
}
- stream = get_stream(session, frame->hd.stream_id);
+ stream = h2_session_stream_get(session, frame->hd.stream_id);
if (stream) {
h2_stream_rst(stream, NGHTTP2_PROTOCOL_ERROR);
}
(void)ng2s;
(void)buf;
(void)source;
- stream = get_stream(session, stream_id);
+ stream = h2_session_stream_get(session, stream_id);
if (!stream) {
ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c,
APLOGNO(02937)
int id;
while ((id = h2_iq_shift(session->in_process)) > 0) {
- h2_stream *stream = get_stream(session, id);
+ h2_stream *stream = h2_session_stream_get(session, id);
if (stream) {
ap_assert(!stream->scheduled);
if (h2_stream_prep_processing(stream) == APR_SUCCESS) {
}
while ((id = h2_iq_shift(session->in_pending)) > 0) {
- h2_stream *stream = get_stream(session, id);
+ h2_stream *stream = h2_session_stream_get(session, id);
if (stream) {
h2_stream_flush_input(stream);
}
*/
int h2_session_push_enabled(h2_session *session);
+/**
+ * Look up the stream in this session with the given id.
+ */
+struct h2_stream *h2_session_stream_get(h2_session *session, int stream_id);
+
/**
* Submit a push promise on the stream and schedule the new steam for
* processing..
return NULL;
}
+static apr_status_t add_data(h2_stream *stream, apr_off_t requested,
+ apr_off_t *plen, int *peos, int *complete,
+ h2_headers **pheaders)
+{
+ apr_bucket *b, *e;
+
+ *peos = 0;
+ *plen = 0;
+ *complete = 0;
+ if (pheaders) {
+ *pheaders = NULL;
+ }
+
+ H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "add_data");
+ b = APR_BRIGADE_FIRST(stream->out_buffer);
+ while (b != APR_BRIGADE_SENTINEL(stream->out_buffer)) {
+ e = APR_BUCKET_NEXT(b);
+ if (APR_BUCKET_IS_METADATA(b)) {
+ if (APR_BUCKET_IS_FLUSH(b)) {
+ APR_BUCKET_REMOVE(b);
+ apr_bucket_destroy(b);
+ }
+ else if (APR_BUCKET_IS_EOS(b)) {
+ *peos = 1;
+ return APR_SUCCESS;
+ }
+ else if (H2_BUCKET_IS_HEADERS(b)) {
+ if (*plen > 0) {
+ /* data before the response, can only return up to here */
+ return APR_SUCCESS;
+ }
+ else if (pheaders) {
+ *pheaders = h2_bucket_headers_get(b);
+ APR_BUCKET_REMOVE(b);
+ apr_bucket_destroy(b);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
+ H2_STRM_MSG(stream, "prep, -> response %d"),
+ (*pheaders)->status);
+ return APR_SUCCESS;
+ }
+ else {
+ return APR_EAGAIN;
+ }
+ }
+ }
+ else if (b->length == 0) {
+ APR_BUCKET_REMOVE(b);
+ apr_bucket_destroy(b);
+ }
+ else {
+ ap_assert(b->length != (apr_size_t)-1);
+ *plen += b->length;
+ if (*plen >= requested) {
+ *plen = requested;
+ return APR_SUCCESS;
+ }
+ }
+ b = e;
+ }
+ *complete = 1;
+ return APR_SUCCESS;
+}
+
apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen,
- int *peos, h2_headers **presponse)
+ int *peos, h2_headers **pheaders)
{
apr_status_t status = APR_SUCCESS;
- apr_off_t requested, max_chunk = H2_DATA_CHUNK_SIZE;
- apr_bucket *b, *e;
+ apr_off_t requested, missing, max_chunk = H2_DATA_CHUNK_SIZE;
conn_rec *c;
+ int complete;
- if (presponse) {
- *presponse = NULL;
- }
-
ap_assert(stream);
if (stream->rst_error) {
if (stream->session->io.write_size > 0) {
max_chunk = stream->session->io.write_size - 9; /* header bits */
}
- *plen = requested = (*plen > 0)? H2MIN(*plen, max_chunk) : max_chunk;
+ requested = (*plen > 0)? H2MIN(*plen, max_chunk) : max_chunk;
+
+ /* count the buffered data until eos or a headers bucket */
+ status = add_data(stream, requested, plen, peos, &complete, pheaders);
+
+ if (status == APR_EAGAIN) {
+ /* TODO: ugly, someone needs to retrieve the response first */
+ h2_mplx_keep_active(stream->session->mplx, stream);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+ H2_STRM_MSG(stream, "prep, response eagain"));
+ return status;
+ }
+ else if (status != APR_SUCCESS) {
+ return status;
+ }
- h2_util_bb_avail(stream->out_buffer, plen, peos);
- if (!*peos && *plen < requested && *plen < stream->max_mem) {
- H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "pre");
+ if (pheaders && *pheaders) {
+ return APR_SUCCESS;
+ }
+
+ missing = H2MIN(requested, stream->max_mem) - *plen;
+ if (complete && !*peos && missing > 0) {
if (stream->output) {
+ H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "pre");
status = h2_beam_receive(stream->output, stream->out_buffer,
APR_NONBLOCK_READ,
stream->max_mem - *plen);
+ H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "post");
}
else {
status = APR_EOF;
if (APR_STATUS_IS_EOF(status)) {
apr_bucket *eos = apr_bucket_eos_create(c->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(stream->out_buffer, eos);
+ *peos = 1;
status = APR_SUCCESS;
}
- else if (status == APR_EAGAIN) {
- status = APR_SUCCESS;
- }
- *plen = requested;
- h2_util_bb_avail(stream->out_buffer, plen, peos);
- H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "post");
- }
- else {
- H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "ok");
- }
-
- b = APR_BRIGADE_FIRST(stream->out_buffer);
- while (b != APR_BRIGADE_SENTINEL(stream->out_buffer)) {
- e = APR_BUCKET_NEXT(b);
- if (APR_BUCKET_IS_FLUSH(b)
- || (!APR_BUCKET_IS_METADATA(b) && b->length == 0)) {
- APR_BUCKET_REMOVE(b);
- apr_bucket_destroy(b);
- }
- else {
- break;
+ else if (status == APR_SUCCESS) {
+ /* do it again, now that we have gotten more */
+ status = add_data(stream, requested, plen, peos, &complete, pheaders);
}
- b = e;
}
-
- b = get_first_headers_bucket(stream->out_buffer);
- if (b) {
- /* there are HEADERS to submit */
- *peos = 0;
- *plen = 0;
- if (b == APR_BRIGADE_FIRST(stream->out_buffer)) {
- if (presponse) {
- *presponse = h2_bucket_headers_get(b);
- APR_BUCKET_REMOVE(b);
- apr_bucket_destroy(b);
- status = APR_SUCCESS;
- }
- else {
- /* someone needs to retrieve the response first */
- h2_mplx_keep_active(stream->session->mplx, stream->id);
- status = APR_EAGAIN;
- }
- }
- else {
- apr_bucket *e = APR_BRIGADE_FIRST(stream->out_buffer);
- while (e != APR_BRIGADE_SENTINEL(stream->out_buffer)) {
- if (e == b) {
- break;
- }
- else if (e->length != (apr_size_t)-1) {
- *plen += e->length;
- }
- e = APR_BUCKET_NEXT(e);
- }
- }
- }
-
+
if (status == APR_SUCCESS) {
- if (presponse && *presponse) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
- H2_STRM_MSG(stream, "prepare, response %d"),
- (*presponse)->status);
- }
- else if (*peos || *plen) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
+ if (*peos || *plen) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
H2_STRM_MSG(stream, "prepare, len=%ld eos=%d"),
(long)*plen, *peos);
}
else {
status = APR_EAGAIN;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
H2_STRM_MSG(stream, "prepare, no data"));
}
}
/* There are cases where we need to parse a serialized http/1.1
* response. One example is a 100-continue answer in serialized mode
* or via a mod_proxy setup */
- while (!task->output.sent_response) {
+ while (bb && !task->output.sent_response) {
status = h2_from_h1_parse_response(task, f, bb);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, f->c,
"h2_task(%s): parsed response", task->id);
}
-void h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx)
+int h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx)
{
int i;
if (h2_iq_contains(q, sid)) {
- return;
+ return 0;
}
if (q->nelts >= q->nalloc) {
iq_grow(q, q->nalloc * 2);
/* bubble it to the front of the queue */
iq_bubble_up(q, i, q->head, cmp, ctx);
}
+ return 1;
}
-void h2_iq_append(h2_iqueue *q, int sid)
+int h2_iq_append(h2_iqueue *q, int sid)
{
- h2_iq_add(q, sid, NULL, NULL);
+ return h2_iq_add(q, sid, NULL, NULL);
}
int h2_iq_remove(h2_iqueue *q, int sid)
struct h2_fifo {
void **elems;
int nelems;
+ int set;
int head;
int count;
int aborted;
return APR_SUCCESS;
}
-apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity)
+static int index_of(h2_fifo *fifo, void *elem)
+{
+ int i;
+
+ for (i = 0; i < fifo->count; ++i) {
+ if (elem == fifo->elems[nth_index(fifo, i)]) {
+ return i;
+ }
+ }
+ return -1;
+}
+
+static apr_status_t create_int(h2_fifo **pfifo, apr_pool_t *pool,
+ int capacity, int as_set)
{
apr_status_t rv;
h2_fifo *fifo;
return APR_ENOMEM;
}
fifo->nelems = capacity;
+ fifo->set = as_set;
*pfifo = fifo;
apr_pool_cleanup_register(pool, fifo, fifo_destroy, apr_pool_cleanup_null);
return APR_SUCCESS;
}
+apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity)
+{
+ return create_int(pfifo, pool, capacity, 0);
+}
+
+apr_status_t h2_fifo_set_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity)
+{
+ return create_int(pfifo, pool, capacity, 1);
+}
+
apr_status_t h2_fifo_term(h2_fifo *fifo)
{
apr_status_t rv;
}
if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
- if (fifo->count == fifo->nelems) {
+ if (fifo->set && index_of(fifo, elem) >= 0) {
+ /* set mode, elem already member */
+ apr_thread_mutex_unlock(fifo->lock);
+ return APR_EEXIST;
+ }
+ else if (fifo->count == fifo->nelems) {
if (block) {
while (fifo->count == fifo->nelems) {
if (fifo->aborted) {
return fifo_push(fifo, elem, 0);
}
+static void *pull_head(h2_fifo *fifo)
+{
+ void *elem;
+
+ ap_assert(fifo->count > 0);
+ elem = fifo->elems[fifo->head];
+ --fifo->count;
+ if (fifo->count > 0) {
+ fifo->head = nth_index(fifo, 1);
+ if (fifo->count+1 == fifo->nelems) {
+ apr_thread_cond_broadcast(fifo->not_full);
+ }
+ }
+ return elem;
+}
+
static apr_status_t fifo_pull(h2_fifo *fifo, void **pelem, int block)
{
apr_status_t rv;
}
ap_assert(fifo->count > 0);
- *pelem = fifo->elems[fifo->head];
- --fifo->count;
- if (fifo->count > 0) {
- fifo->head = nth_index(fifo, 1);
- if (fifo->count+1 == fifo->nelems) {
- apr_thread_cond_broadcast(fifo->not_full);
- }
- }
+ *pelem = pull_head(fifo);
+
apr_thread_mutex_unlock(fifo->lock);
}
return rv;
}
ap_assert(fifo->count > 0);
- elem = fifo->elems[fifo->head];
+ elem = pull_head(fifo);
+ apr_thread_mutex_unlock(fifo->lock);
+
switch (fn(elem, ctx)) {
case H2_FIFO_OP_PULL:
- --fifo->count;
- if (fifo->count > 0) {
- fifo->head = nth_index(fifo, 1);
- if (fifo->count+1 == fifo->nelems) {
- apr_thread_cond_broadcast(fifo->not_full);
- }
- }
break;
case H2_FIFO_OP_REPUSH:
- if (fifo->count > 1) {
- fifo->head = nth_index(fifo, 1);
- if (fifo->count < fifo->nelems) {
- fifo->elems[nth_index(fifo, fifo->count-1)] = elem;
- }
- }
+ return h2_fifo_push(fifo, elem);
break;
}
- apr_thread_mutex_unlock(fifo->lock);
}
return rv;
}
* @param q the queue to append the id to
* @param sid the stream id to add
* @param cmp the comparator for sorting
- * @param ctx user data for comparator
+ * @param ctx user data for comparator
+ * @return != 0 iff id was not already there
*/
-void h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx);
+int h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx);
/**
* Append the id to the queue if not already present.
*
* @param q the queue to append the id to
* @param sid the id to append
+ * @return != 0 iff id was not already there
*/
-void h2_iq_append(h2_iqueue *q, int sid);
+int h2_iq_append(h2_iqueue *q, int sid);
/**
* Remove the stream id from the queue. Return != 0 iff task
*/
typedef struct h2_fifo h2_fifo;
+/**
+ * Create a FIFO queue that can hold up to capacity elements. Elements can
+ * appear several times.
+ */
apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity);
+
+/**
+ * Create a FIFO set that can hold up to capacity elements. Elements only
+ * appear once. Pushing an element already present does not change the
+ * queue and is successful.
+ */
+apr_status_t h2_fifo_set_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity);
+
apr_status_t h2_fifo_term(h2_fifo *fifo);
apr_status_t h2_fifo_interrupt(h2_fifo *fifo);
int h2_fifo_count(h2_fifo *fifo);
+/**
+ * Push en element into the queue. Blocks if there is no capacity left.
+ *
+ * @param fifo the FIFO queue
+ * @param elem the element to push
+ * @return APR_SUCCESS on push, APR_EAGAIN on try_push on a full queue,
+ * APR_EEXIST when in set mode and elem already there.
+ */
apr_status_t h2_fifo_push(h2_fifo *fifo, void *elem);
apr_status_t h2_fifo_try_push(h2_fifo *fifo, void *elem);
* @macro
* Version number of the http2 module as c string
*/
-#define MOD_HTTP2_VERSION "1.10.0"
+#define MOD_HTTP2_VERSION "1.10.1"
/**
* @macro
* release. This is a 24 bit number with 8 bits for major number, 8 bits
* for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
*/
-#define MOD_HTTP2_VERSION_NUM 0x010a00
+#define MOD_HTTP2_VERSION_NUM 0x010a01
#endif /* mod_h2_h2_version_h */
int sticks;
h2_task *task;
apr_thread_t *thread;
+ apr_thread_mutex_t *lock;
apr_thread_cond_t *not_idle;
};
slot->workers = workers;
slot->aborted = 0;
slot->task = NULL;
+
+ if (!slot->lock) {
+ status = apr_thread_mutex_create(&slot->lock,
+ APR_THREAD_MUTEX_DEFAULT,
+ workers->pool);
+ if (status != APR_SUCCESS) {
+ push_slot(&workers->free, slot);
+ return status;
+ }
+ }
+
if (!slot->not_idle) {
status = apr_thread_cond_create(&slot->not_idle, workers->pool);
if (status != APR_SUCCESS) {
return APR_ENOMEM;
}
- ++workers->worker_count;
+ apr_atomic_inc32(&workers->worker_count);
return APR_SUCCESS;
}
{
h2_slot *slot = pop_slot(&workers->idle);
if (slot) {
- apr_thread_mutex_lock(workers->lock);
+ apr_thread_mutex_lock(slot->lock);
apr_thread_cond_signal(slot->not_idle);
- apr_thread_mutex_unlock(workers->lock);
+ apr_thread_mutex_unlock(slot->lock);
}
else if (workers->dynamic) {
add_worker(workers);
apr_thread_join(&status, slot->thread);
slot->thread = NULL;
}
- --workers->worker_count;
+ apr_atomic_dec32(&workers->worker_count);
push_slot(&workers->free, slot);
}
}
return APR_SUCCESS;
}
- apr_thread_mutex_lock(workers->lock);
cleanup_zombies(workers);
- ++workers->idle_workers;
+ apr_thread_mutex_lock(slot->lock);
push_slot(&workers->idle, slot);
- apr_thread_cond_wait(slot->not_idle, workers->lock);
- --workers->idle_workers;
-
- apr_thread_mutex_unlock(workers->lock);
+ apr_thread_cond_wait(slot->not_idle, slot->lock);
+ apr_thread_mutex_unlock(slot->lock);
}
return APR_EOF;
}
h2_slot *slot;
if (!workers->aborted) {
- apr_thread_mutex_lock(workers->lock);
workers->aborted = 1;
- /* before we go, cleanup any zombies and abort the rest */
- cleanup_zombies(workers);
+ /* abort all idle slots */
for (;;) {
slot = pop_slot(&workers->idle);
if (slot) {
+ apr_thread_mutex_lock(slot->lock);
slot->aborted = 1;
apr_thread_cond_signal(slot->not_idle);
+ apr_thread_mutex_unlock(slot->lock);
}
else {
break;
}
}
- apr_thread_mutex_unlock(workers->lock);
h2_fifo_term(workers->mplxs);
h2_fifo_interrupt(workers->mplxs);
+
+ cleanup_zombies(workers);
}
return APR_SUCCESS;
}
int next_worker_id;
int min_workers;
int max_workers;
- int worker_count;
- int idle_workers;
int max_idle_secs;
int aborted;
int nslots;
struct h2_slot *slots;
+ volatile apr_uint32_t worker_count;
+
struct h2_slot *free;
struct h2_slot *idle;
struct h2_slot *zombies;
#include "h2_version.h"
#include "h2_proxy_session.h"
+#define H2MIN(x,y) ((x) < (y) ? (x) : (y))
+
static void register_hook(apr_pool_t *p);
AP_DECLARE_MODULE(proxy_http2) = {
const char *engine_type;
apr_pool_t *engine_pool;
apr_size_t req_buffer_size;
- request_rec *next;
+ h2_proxy_fifo *requests;
int capacity;
unsigned standalone : 1;
{
h2_proxy_ctx *ctx = ap_get_module_config(r->connection->conn_config,
&proxy_http2_module);
- if (ctx) {
- conn_rec *c = ctx->owner;
- h2_proxy_ctx *nctx;
-
- /* we need another lifetime for this. If we do not host
- * an engine, the context lives in r->pool. Since we expect
- * to server more than r, we need to live longer */
- nctx = apr_pcalloc(pool, sizeof(*nctx));
- if (nctx == NULL) {
- return APR_ENOMEM;
- }
- memcpy(nctx, ctx, sizeof(*nctx));
- ctx = nctx;
- ctx->pool = pool;
- ctx->engine = engine;
- ctx->engine_id = id;
- ctx->engine_type = type;
- ctx->engine_pool = pool;
- ctx->req_buffer_size = req_buffer_size;
- ctx->capacity = 100;
-
- ap_set_module_config(c->conn_config, &proxy_http2_module, ctx);
-
- *pconsumed = out_consumed;
- *pctx = ctx;
- return APR_SUCCESS;
+ if (!ctx) {
+ ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, r, APLOGNO(03368)
+ "h2_proxy_session, engine init, no ctx found");
+ return APR_ENOTIMPL;
}
- ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, r, APLOGNO(03368)
- "h2_proxy_session, engine init, no ctx found");
- return APR_ENOTIMPL;
+
+ ctx->pool = pool;
+ ctx->engine = engine;
+ ctx->engine_id = id;
+ ctx->engine_type = type;
+ ctx->engine_pool = pool;
+ ctx->req_buffer_size = req_buffer_size;
+ ctx->capacity = H2MIN(100, h2_proxy_fifo_capacity(ctx->requests));
+
+ *pconsumed = out_consumed;
+ *pctx = ctx;
+ return APR_SUCCESS;
}
static apr_status_t add_request(h2_proxy_session *session, request_rec *r)
return status;
}
-static void request_done(h2_proxy_session *session, request_rec *r,
+static void request_done(h2_proxy_ctx *ctx, request_rec *r,
apr_status_t status, int touched)
{
- h2_proxy_ctx *ctx = session->user_data;
const char *task_id = apr_table_get(r->connection->notes, H2_TASK_ID_NOTE);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, r->connection,
if (status != APR_SUCCESS) {
if (!touched) {
/* untouched request, need rescheduling */
- if (req_engine_push && is_h2 && is_h2(ctx->owner)) {
- if (req_engine_push(ctx->engine_type, r, NULL) == APR_SUCCESS) {
- /* push to engine */
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r->connection,
- APLOGNO(03369)
- "h2_proxy_session(%s): rescheduled request %s",
- ctx->engine_id, task_id);
- return;
- }
- }
- else if (!ctx->next) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, r->connection,
- "h2_proxy_session(%s): retry untouched request",
- ctx->engine_id);
- ctx->next = r;
- }
+ status = h2_proxy_fifo_push(ctx->requests, r);
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, r->connection,
+ APLOGNO(03369)
+ "h2_proxy_session(%s): rescheduled request %s",
+ ctx->engine_id, task_id);
+ return;
}
else {
const char *uri;
uri = apr_uri_unparse(r->pool, &r->parsed_uri, 0);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, r->connection,
APLOGNO(03471) "h2_proxy_session(%s): request %s -> %s "
- "not complete, was touched",
+ "not complete, cannot repeat",
ctx->engine_id, task_id, uri);
}
}
if (r == ctx->rbase) {
- ctx->r_status = (status == APR_SUCCESS)? APR_SUCCESS : HTTP_SERVICE_UNAVAILABLE;
+ ctx->r_status = ((status == APR_SUCCESS)? APR_SUCCESS
+ : HTTP_SERVICE_UNAVAILABLE);
}
if (req_engine_done && ctx->engine) {
}
}
+static void session_req_done(h2_proxy_session *session, request_rec *r,
+ apr_status_t status, int touched)
+{
+ request_done(session->user_data, r, status, touched);
+}
+
static apr_status_t next_request(h2_proxy_ctx *ctx, int before_leave)
{
- if (ctx->next) {
+ if (h2_proxy_fifo_count(ctx->requests) > 0) {
return APR_SUCCESS;
}
else if (req_engine_pull && ctx->engine) {
apr_status_t status;
+ request_rec *r = NULL;
+
status = req_engine_pull(ctx->engine, before_leave?
APR_BLOCK_READ: APR_NONBLOCK_READ,
- ctx->capacity, &ctx->next);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, ctx->owner,
- "h2_proxy_engine(%s): pulled request (%s) %s",
- ctx->engine_id,
- before_leave? "before leave" : "regular",
- (ctx->next? ctx->next->the_request : "NULL"));
+ ctx->capacity, &r);
+ if (status == APR_SUCCESS && r) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, ctx->owner,
+ "h2_proxy_engine(%s): pulled request (%s) %s",
+ ctx->engine_id,
+ before_leave? "before leave" : "regular",
+ r->the_request);
+ h2_proxy_fifo_push(ctx->requests, r);
+ }
return APR_STATUS_IS_EAGAIN(status)? APR_SUCCESS : status;
}
return APR_EOF;
static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) {
apr_status_t status = OK;
int h2_front;
+ request_rec *r;
/* Step Four: Send the Request in a new HTTP/2 stream and
* loop until we got the response or encounter errors.
ctx->session = h2_proxy_session_setup(ctx->engine_id, ctx->p_conn, ctx->conf,
h2_front, 30,
h2_proxy_log2((int)ctx->req_buffer_size),
- request_done);
+ session_req_done);
if (!ctx->session) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner,
APLOGNO(03372) "session unavailable");
"eng(%s): run session %s", ctx->engine_id, ctx->session->id);
ctx->session->user_data = ctx;
- while (1) {
- if (ctx->next) {
- add_request(ctx->session, ctx->next);
- ctx->next = NULL;
+ while (!ctx->owner->aborted) {
+ if (APR_SUCCESS == h2_proxy_fifo_try_pull(ctx->requests, (void**)&r)) {
+ add_request(ctx->session, r);
}
status = h2_proxy_session_process(ctx->session);
/* ongoing processing, call again */
if (ctx->session->remote_max_concurrent > 0
&& ctx->session->remote_max_concurrent != ctx->capacity) {
- ctx->capacity = (int)ctx->session->remote_max_concurrent;
+ ctx->capacity = H2MIN((int)ctx->session->remote_max_concurrent,
+ h2_proxy_fifo_capacity(ctx->requests));
}
s2 = next_request(ctx, 0);
if (s2 == APR_ECONNABORTED) {
status = ctx->r_status = APR_SUCCESS;
break;
}
- if (!ctx->next && h2_proxy_ihash_empty(ctx->session->streams)) {
+ if ((h2_proxy_fifo_count(ctx->requests) == 0)
+ && h2_proxy_ihash_empty(ctx->session->streams)) {
break;
}
}
* a) be reopened on the new session iff safe to do so
* b) reported as done (failed) otherwise
*/
- h2_proxy_session_cleanup(ctx->session, request_done);
+ h2_proxy_session_cleanup(ctx->session, session_req_done);
break;
}
}
return status;
}
-static h2_proxy_ctx *push_request_somewhere(h2_proxy_ctx *ctx)
+static apr_status_t push_request_somewhere(h2_proxy_ctx *ctx, request_rec *r)
{
conn_rec *c = ctx->owner;
const char *engine_type, *hostname;
engine_type = apr_psprintf(ctx->pool, "proxy_http2 %s%s", hostname,
ctx->server_portstr);
- if (c->master && req_engine_push && ctx->next && is_h2 && is_h2(c)) {
+ if (c->master && req_engine_push && r && is_h2 && is_h2(c)) {
/* If we are have req_engine capabilities, push the handling of this
* request (e.g. slave connection) to a proxy_http2 engine which
* uses the same backend. We may be called to create an engine
* ourself. */
- if (req_engine_push(engine_type, ctx->next, proxy_engine_init)
- == APR_SUCCESS) {
- /* to renew the lifetime, we might have set a new ctx */
- ctx = ap_get_module_config(c->conn_config, &proxy_http2_module);
+ if (req_engine_push(engine_type, r, proxy_engine_init) == APR_SUCCESS) {
if (ctx->engine == NULL) {
- /* Another engine instance has taken over processing of this
- * request. */
- ctx->r_status = SUSPENDED;
- ctx->next = NULL;
- return ctx;
+ /* request has been assigned to an engine in another thread */
+ return SUSPENDED;
}
}
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"H2: hosting engine %s", ctx->engine_id);
}
- return ctx;
+
+ return h2_proxy_fifo_push(ctx->requests, r);
}
static int proxy_http2_handler(request_rec *r,
apr_status_t status;
h2_proxy_ctx *ctx;
apr_uri_t uri;
- int reconnected = 0;
+ int reconnects = 0;
/* find the scheme */
if ((url[0] != 'h' && url[0] != 'H') || url[1] != '2') {
default:
return DECLINED;
}
+
ctx = apr_pcalloc(r->pool, sizeof(*ctx));
ctx->owner = r->connection;
ctx->pool = r->pool;
ctx->conf = conf;
ctx->flushall = apr_table_get(r->subprocess_env, "proxy-flushall")? 1 : 0;
ctx->r_status = HTTP_SERVICE_UNAVAILABLE;
- ctx->next = r;
- r = NULL;
+
+ h2_proxy_fifo_set_create(&ctx->requests, ctx->pool, 100);
+
ap_set_module_config(ctx->owner->conn_config, &proxy_http2_module, ctx);
/* scheme says, this is for us. */
/* If we are not already hosting an engine, try to push the request
* to an already existing engine or host a new engine here. */
- if (!ctx->engine) {
- ctx = push_request_somewhere(ctx);
+ if (r && !ctx->engine) {
+ ctx->r_status = push_request_somewhere(ctx, r);
+ r = NULL;
if (ctx->r_status == SUSPENDED) {
- /* request was pushed to another engine */
+ /* request was pushed to another thread, leave processing here */
goto cleanup;
}
}
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner, APLOGNO(03352)
"H2: failed to make connection to backend: %s",
ctx->p_conn->hostname);
- goto cleanup;
+ goto reconnect;
}
/* Step Three: Create conn_rec for the socket we have open now. */
"setup new connection: is_ssl=%d %s %s %s",
ctx->p_conn->is_ssl, ctx->p_conn->ssl_hostname,
locurl, ctx->p_conn->hostname);
- goto cleanup;
+ goto reconnect;
}
if (!ctx->p_conn->data) {
ctx->engine = NULL;
}
-cleanup:
- if (!reconnected && next_request(ctx, 1) == APR_SUCCESS) {
+reconnect:
+ if (next_request(ctx, 1) == APR_SUCCESS) {
/* Still more to do, tear down old conn and start over */
if (ctx->p_conn) {
ctx->p_conn->close = 1;
ap_proxy_release_connection(ctx->proxy_func, ctx->p_conn, ctx->server);
ctx->p_conn = NULL;
}
- reconnected = 1; /* we do this only once, then fail */
- goto run_connect;
+ ++reconnects;
+ if (reconnects < 5 && !ctx->owner->aborted) {
+ goto run_connect;
+ }
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner, APLOGNO(10023)
+ "giving up after %d reconnects, %d requests todo",
+ reconnects, h2_proxy_fifo_count(ctx->requests));
}
+cleanup:
if (ctx->p_conn) {
if (status != APR_SUCCESS) {
/* close socket when errors happened or session shut down (EOF) */
ctx->p_conn = NULL;
}
+ /* Any requests will still have need to fail */
+ while (APR_SUCCESS == h2_proxy_fifo_try_pull(ctx->requests, (void**)&r)) {
+ request_done(ctx, r, HTTP_SERVICE_UNAVAILABLE, 1);
+ }
+
ap_set_module_config(ctx->owner->conn_config, &proxy_http2_module, NULL);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, ctx->owner,
APLOGNO(03377) "leaving handler");