apr_time_t now;
} stream_iter_ctx;
-/* NULL or the mutex hold by this thread, used for recursive calls
- */
-static const int nested_lock = 0;
-
-static apr_threadkey_t *thread_lock;
-
apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s)
{
- if (nested_lock) {
- return apr_threadkey_private_create(&thread_lock, NULL, pool);
- }
return APR_SUCCESS;
}
-static apr_status_t enter_mutex(h2_mplx *m, int *pacquired)
-{
- apr_status_t status;
-
- if (nested_lock) {
- void *mutex = NULL;
- /* Enter the mutex if this thread already holds the lock or
- * if we can acquire it. Only on the later case do we unlock
- * onleaving the mutex.
- * This allow recursive entering of the mutex from the saem thread,
- * which is what we need in certain situations involving callbacks
- */
- apr_threadkey_private_get(&mutex, thread_lock);
- if (mutex == m->lock) {
- *pacquired = 0;
- ap_assert(NULL); /* nested, why? */
- return APR_SUCCESS;
- }
- }
- status = apr_thread_mutex_lock(m->lock);
- *pacquired = (status == APR_SUCCESS);
- if (nested_lock && *pacquired) {
- apr_threadkey_private_set(m->lock, thread_lock);
- }
- return status;
-}
+#define H2_MPLX_ENTER(m) \
+ do { apr_status_t rv; if ((rv = apr_thread_mutex_lock(m->lock)) != APR_SUCCESS) {\
+ return rv;\
+ } } while(0)
-static void leave_mutex(h2_mplx *m, int acquired)
-{
- if (acquired) {
- if (nested_lock) {
- apr_threadkey_private_set(NULL, thread_lock);
- }
- apr_thread_mutex_unlock(m->lock);
- }
-}
+#define H2_MPLX_ENTER_ALWAYS(m) \
+ apr_thread_mutex_lock(m->lock)
+
+#define H2_MPLX_LEAVE(m) \
+ apr_thread_mutex_unlock(m->lock)
+
static void check_data_for(h2_mplx *m, int stream_id);
int h2_mplx_shutdown(h2_mplx *m)
{
- int acquired, max_stream_started = 0;
+ int max_stream_started = 0;
- if (enter_mutex(m, &acquired) == APR_SUCCESS) {
- max_stream_started = m->max_stream_started;
- /* Clear schedule queue, disabling existing streams from starting */
- h2_iq_clear(m->q);
- leave_mutex(m, acquired);
- }
+ H2_MPLX_ENTER(m);
+
+ max_stream_started = m->max_stream_started;
+ /* Clear schedule queue, disabling existing streams from starting */
+ h2_iq_clear(m->q);
+
+ H2_MPLX_LEAVE(m);
return max_stream_started;
}
apr_status_t h2_mplx_stream_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx)
{
- apr_status_t status;
- int acquired;
+ stream_iter_ctx_t x;
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- stream_iter_ctx_t x;
- x.cb = cb;
- x.ctx = ctx;
- h2_ihash_iter(m->streams, stream_iter_wrap, &x);
+ H2_MPLX_ENTER(m);
+
+ x.cb = cb;
+ x.ctx = ctx;
+ h2_ihash_iter(m->streams, stream_iter_wrap, &x);
- leave_mutex(m, acquired);
- }
- return status;
+ H2_MPLX_LEAVE(m);
+ return APR_SUCCESS;
}
static int report_stream_iter(void *ctx, void *val) {
{
apr_status_t status;
int i, wait_secs = 60;
- int acquired;
/* How to shut down a h2 connection:
* 0. abort and tell the workers that no more tasks will come from us */
m->aborted = 1;
h2_workers_unregister(m->workers, m);
- enter_mutex(m, &acquired);
+ H2_MPLX_ENTER_ALWAYS(m);
/* How to shut down a h2 connection:
* 1. cancel all streams still active */
h2_ihash_iter(m->shold, unexpected_stream_iter, m);
}
- leave_mutex(m, acquired);
+ H2_MPLX_LEAVE(m);
/* 5. unregister again, now that our workers are done */
h2_workers_unregister(m->workers, m);
apr_status_t h2_mplx_stream_cleanup(h2_mplx *m, h2_stream *stream)
{
- apr_status_t status = APR_SUCCESS;
- int acquired;
+ H2_MPLX_ENTER(m);
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- H2_STRM_MSG(stream, "cleanup"));
- stream_cleanup(m, stream);
- leave_mutex(m, acquired);
- }
- return status;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ H2_STRM_MSG(stream, "cleanup"));
+ stream_cleanup(m, stream);
+
+ H2_MPLX_LEAVE(m);
+ return APR_SUCCESS;
}
h2_stream *h2_mplx_stream_get(h2_mplx *m, int id)
{
h2_stream *s = NULL;
- int acquired;
- if ((enter_mutex(m, &acquired)) == APR_SUCCESS) {
- s = h2_ihash_get(m->streams, id);
- leave_mutex(m, acquired);
- }
+ H2_MPLX_ENTER_ALWAYS(m);
+
+ s = h2_ihash_get(m->streams, id);
+
+ H2_MPLX_LEAVE(m);
return s;
}
static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
{
h2_mplx *m = ctx;
- int acquired;
- if (enter_mutex(m, &acquired) == APR_SUCCESS) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%ld-%d): output_produced", m->c->id, beam->id);
- check_data_for(m, beam->id);
- leave_mutex(m, acquired);
- }
+ H2_MPLX_ENTER_ALWAYS(m);
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%ld-%d): output_produced", m->c->id, beam->id);
+ check_data_for(m, beam->id);
+
+ H2_MPLX_LEAVE(m);
}
static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
{
apr_status_t status;
- int acquired;
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- if (m->aborted) {
- status = APR_ECONNABORTED;
- }
- else {
- status = out_open(m, stream_id, beam);
- }
- leave_mutex(m, acquired);
+ H2_MPLX_ENTER(m);
+
+ if (m->aborted) {
+ status = APR_ECONNABORTED;
+ }
+ else {
+ status = out_open(m, stream_id, beam);
}
+
+ H2_MPLX_LEAVE(m);
return status;
}
apr_thread_cond_t *iowait)
{
apr_status_t status;
- int acquired;
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- if (m->aborted) {
- status = APR_ECONNABORTED;
- }
- else if (apr_atomic_read32(&m->event_pending) > 0) {
- status = APR_SUCCESS;
- }
- else {
- purge_streams(m);
- h2_ihash_iter(m->streams, report_consumption_iter, m);
- m->added_output = iowait;
- status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
- if (APLOGctrace2(m->c)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%ld): trywait on data for %f ms)",
- m->id, timeout/1000.0);
- }
- m->added_output = NULL;
+ H2_MPLX_ENTER(m);
+
+ if (m->aborted) {
+ status = APR_ECONNABORTED;
+ }
+ else if (apr_atomic_read32(&m->event_pending) > 0) {
+ status = APR_SUCCESS;
+ }
+ else {
+ purge_streams(m);
+ h2_ihash_iter(m->streams, report_consumption_iter, m);
+ m->added_output = iowait;
+ status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
+ if (APLOGctrace2(m->c)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%ld): trywait on data for %f ms)",
+ m->id, timeout/1000.0);
}
- leave_mutex(m, acquired);
+ m->added_output = NULL;
}
+
+ H2_MPLX_LEAVE(m);
return status;
}
apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx)
{
apr_status_t status;
- int acquired;
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- if (m->aborted) {
- status = APR_ECONNABORTED;
- }
- else {
- h2_iq_sort(m->q, cmp, ctx);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): reprioritize tasks", m->id);
- }
- leave_mutex(m, acquired);
+ H2_MPLX_ENTER(m);
+
+ if (m->aborted) {
+ status = APR_ECONNABORTED;
}
+ else {
+ h2_iq_sort(m->q, cmp, ctx);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): reprioritize tasks", m->id);
+ status = APR_SUCCESS;
+ }
+
+ H2_MPLX_LEAVE(m);
return status;
}
h2_stream_pri_cmp *cmp, void *ctx)
{
apr_status_t status;
- int acquired;
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- if (m->aborted) {
- status = APR_ECONNABORTED;
+ H2_MPLX_ENTER(m);
+
+ if (m->aborted) {
+ status = APR_ECONNABORTED;
+ }
+ else {
+ status = APR_SUCCESS;
+ h2_ihash_add(m->streams, stream);
+ if (h2_stream_is_ready(stream)) {
+ /* already have a response */
+ check_data_for(m, stream->id);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ H2_STRM_MSG(stream, "process, add to readyq"));
}
else {
- h2_ihash_add(m->streams, stream);
- if (h2_stream_is_ready(stream)) {
- /* already have a response */
- check_data_for(m, stream->id);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- H2_STRM_MSG(stream, "process, add to readyq"));
- }
- else {
- h2_iq_add(m->q, stream->id, cmp, ctx);
- register_if_needed(m);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- H2_STRM_MSG(stream, "process, added to q"));
- }
+ h2_iq_add(m->q, stream->id, cmp, ctx);
+ register_if_needed(m);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ H2_STRM_MSG(stream, "process, added to q"));
}
- leave_mutex(m, acquired);
}
+
+ H2_MPLX_LEAVE(m);
return status;
}
h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more)
{
h2_task *task = NULL;
- apr_status_t status;
- int acquired;
+ H2_MPLX_ENTER_ALWAYS(m);
+
*has_more = 0;
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- if (!m->aborted) {
- task = next_stream_task(m);
- if (task != NULL && !h2_iq_empty(m->q)) {
- *has_more = 1;
- }
- else {
- m->is_registered = 0; /* h2_workers will discard this mplx */
- }
+ if (!m->aborted) {
+ task = next_stream_task(m);
+ if (task != NULL && !h2_iq_empty(m->q)) {
+ *has_more = 1;
+ }
+ else {
+ m->is_registered = 0; /* h2_workers will discard this mplx */
}
- leave_mutex(m, acquired);
}
+
+ H2_MPLX_LEAVE(m);
return task;
}
void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask)
{
- int acquired;
-
- if (enter_mutex(m, &acquired) == APR_SUCCESS) {
- task_done(m, task, NULL);
- --m->tasks_active;
- if (m->join_wait) {
- apr_thread_cond_signal(m->join_wait);
- }
- if (ptask) {
- /* caller wants another task */
- *ptask = next_stream_task(m);
- }
- register_if_needed(m);
- leave_mutex(m, acquired);
+ H2_MPLX_ENTER_ALWAYS(m);
+
+ task_done(m, task, NULL);
+ --m->tasks_active;
+ if (m->join_wait) {
+ apr_thread_cond_signal(m->join_wait);
}
+ if (ptask) {
+ /* caller wants another task */
+ *ptask = next_stream_task(m);
+ }
+ register_if_needed(m);
+
+ H2_MPLX_LEAVE(m);
}
/*******************************************************************************
{
apr_status_t status = APR_SUCCESS;
apr_time_t now;
- int acquired;
+ apr_size_t scount;
- if (enter_mutex(m, &acquired) == APR_SUCCESS) {
- apr_size_t scount = h2_ihash_count(m->streams);
- if (scount > 0 && m->tasks_active) {
- /* If we have streams in connection state 'IDLE', meaning
- * all streams are ready to sent data out, but lack
- * WINDOW_UPDATEs.
- *
- * This is ok, unless we have streams that still occupy
- * h2 workers. As worker threads are a scarce resource,
- * we need to take measures that we do not get DoSed.
- *
- * This is what we call an 'idle block'. Limit the amount
- * of busy workers we allow for this connection until it
- * well behaves.
- */
- now = apr_time_now();
- m->last_idle_block = now;
- if (m->limit_active > 2
- && now - m->last_limit_change >= m->limit_change_interval) {
- if (m->limit_active > 16) {
- m->limit_active = 16;
- }
- else if (m->limit_active > 8) {
- m->limit_active = 8;
- }
- else if (m->limit_active > 4) {
- m->limit_active = 4;
- }
- else if (m->limit_active > 2) {
- m->limit_active = 2;
- }
- m->last_limit_change = now;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): decrease worker limit to %d",
- m->id, m->limit_active);
+ H2_MPLX_ENTER(m);
+
+ scount = h2_ihash_count(m->streams);
+ if (scount > 0 && m->tasks_active) {
+ /* If we have streams in connection state 'IDLE', meaning
+ * all streams are ready to sent data out, but lack
+ * WINDOW_UPDATEs.
+ *
+ * This is ok, unless we have streams that still occupy
+ * h2 workers. As worker threads are a scarce resource,
+ * we need to take measures that we do not get DoSed.
+ *
+ * This is what we call an 'idle block'. Limit the amount
+ * of busy workers we allow for this connection until it
+ * well behaves.
+ */
+ now = apr_time_now();
+ m->last_idle_block = now;
+ if (m->limit_active > 2
+ && now - m->last_limit_change >= m->limit_change_interval) {
+ if (m->limit_active > 16) {
+ m->limit_active = 16;
}
-
- if (m->tasks_active > m->limit_active) {
- status = unschedule_slow_tasks(m);
+ else if (m->limit_active > 8) {
+ m->limit_active = 8;
}
+ else if (m->limit_active > 4) {
+ m->limit_active = 4;
+ }
+ else if (m->limit_active > 2) {
+ m->limit_active = 2;
+ }
+ m->last_limit_change = now;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): decrease worker limit to %d",
+ m->id, m->limit_active);
+ }
+
+ if (m->tasks_active > m->limit_active) {
+ status = unschedule_slow_tasks(m);
}
- register_if_needed(m);
- leave_mutex(m, acquired);
}
+ register_if_needed(m);
+
+ H2_MPLX_LEAVE(m);
return status;
}
apr_status_t status;
h2_mplx *m;
h2_task *task;
- int acquired;
+ h2_stream *stream;
task = h2_ctx_rget_task(r);
if (!task) {
}
m = task->mplx;
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
-
- if (stream) {
- status = h2_ngn_shed_push_request(m->ngn_shed, ngn_type, r, einit);
- }
- else {
- status = APR_ECONNABORTED;
- }
- leave_mutex(m, acquired);
+ H2_MPLX_ENTER(m);
+
+ stream = h2_ihash_get(m->streams, task->stream_id);
+ if (stream) {
+ status = h2_ngn_shed_push_request(m->ngn_shed, ngn_type, r, einit);
}
+ else {
+ status = APR_ECONNABORTED;
+ }
+
+ H2_MPLX_LEAVE(m);
return status;
}
h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn);
h2_mplx *m = h2_ngn_shed_get_ctx(shed);
apr_status_t status;
- int acquired;
+ int want_shutdown;
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- int want_shutdown = (block == APR_BLOCK_READ);
+ H2_MPLX_ENTER(m);
- /* Take this opportunity to update output consummation
- * for this engine */
- ngn_out_update_windows(m, ngn);
-
- if (want_shutdown && !h2_iq_empty(m->q)) {
- /* For a blocking read, check first if requests are to be
- * had and, if not, wait a short while before doing the
- * blocking, and if unsuccessful, terminating read.
- */
+ want_shutdown = (block == APR_BLOCK_READ);
+
+ /* Take this opportunity to update output consummation
+ * for this engine */
+ ngn_out_update_windows(m, ngn);
+
+ if (want_shutdown && !h2_iq_empty(m->q)) {
+ /* For a blocking read, check first if requests are to be
+ * had and, if not, wait a short while before doing the
+ * blocking, and if unsuccessful, terminating read.
+ */
+ status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr);
+ if (APR_STATUS_IS_EAGAIN(status)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): start block engine pull", m->id);
+ apr_thread_cond_timedwait(m->task_thawed, m->lock,
+ apr_time_from_msec(20));
status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr);
- if (APR_STATUS_IS_EAGAIN(status)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): start block engine pull", m->id);
- apr_thread_cond_timedwait(m->task_thawed, m->lock,
- apr_time_from_msec(20));
- status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr);
- }
}
- else {
- status = h2_ngn_shed_pull_request(shed, ngn, capacity,
- want_shutdown, pr);
- }
- leave_mutex(m, acquired);
}
+ else {
+ status = h2_ngn_shed_pull_request(shed, ngn, capacity,
+ want_shutdown, pr);
+ }
+
+ H2_MPLX_LEAVE(m);
return status;
}
if (task) {
h2_mplx *m = task->mplx;
- int acquired;
+ h2_stream *stream;
- if (enter_mutex(m, &acquired) == APR_SUCCESS) {
- h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
-
- ngn_out_update_windows(m, ngn);
- h2_ngn_shed_done_task(m->ngn_shed, ngn, task);
-
- if (status != APR_SUCCESS && stream
- && h2_task_can_redo(task)
- && !h2_ihash_get(m->sredo, stream->id)) {
- h2_ihash_add(m->sredo, stream);
- }
- if (task->engine) {
- /* cannot report that as done until engine returns */
- }
- else {
- task_done(m, task, ngn);
- }
- /* Take this opportunity to update output consummation
- * for this engine */
- leave_mutex(m, acquired);
+ H2_MPLX_ENTER_ALWAYS(m);
+
+ stream = h2_ihash_get(m->streams, task->stream_id);
+
+ ngn_out_update_windows(m, ngn);
+ h2_ngn_shed_done_task(m->ngn_shed, ngn, task);
+
+ if (status != APR_SUCCESS && stream
+ && h2_task_can_redo(task)
+ && !h2_ihash_get(m->sredo, stream->id)) {
+ h2_ihash_add(m->sredo, stream);
+ }
+ if (task->engine) {
+ /* cannot report that as done until engine returns */
}
+ else {
+ task_done(m, task, ngn);
+ }
+
+ H2_MPLX_LEAVE(m);
}
}
stream_ev_callback *on_resume,
void *on_ctx)
{
- apr_status_t status;
- int acquired;
int ids[100];
h2_stream *stream;
size_t i, n;
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%ld): dispatch events", m->id);
- apr_atomic_set32(&m->event_pending, 0);
- purge_streams(m);
-
- /* update input windows for streams */
- h2_ihash_iter(m->streams, report_consumption_iter, m);
-
- if (!h2_iq_empty(m->readyq)) {
- n = h2_iq_mshift(m->readyq, ids, H2_ALEN(ids));
- for (i = 0; i < n; ++i) {
- stream = h2_ihash_get(m->streams, ids[i]);
- if (stream) {
- leave_mutex(m, acquired);
- on_resume(on_ctx, stream);
- enter_mutex(m, &acquired);
- }
+ H2_MPLX_ENTER(m);
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%ld): dispatch events", m->id);
+ apr_atomic_set32(&m->event_pending, 0);
+ purge_streams(m);
+
+ /* update input windows for streams */
+ h2_ihash_iter(m->streams, report_consumption_iter, m);
+
+ if (!h2_iq_empty(m->readyq)) {
+ n = h2_iq_mshift(m->readyq, ids, H2_ALEN(ids));
+ for (i = 0; i < n; ++i) {
+ stream = h2_ihash_get(m->streams, ids[i]);
+ if (stream) {
+ H2_MPLX_LEAVE(m);
+
+ on_resume(on_ctx, stream);
+
+ H2_MPLX_ENTER(m);
}
}
- if (!h2_iq_empty(m->readyq)) {
- apr_atomic_set32(&m->event_pending, 1);
- }
- leave_mutex(m, acquired);
}
- return status;
+ if (!h2_iq_empty(m->readyq)) {
+ apr_atomic_set32(&m->event_pending, 1);
+ }
+
+ H2_MPLX_LEAVE(m);
+ return APR_SUCCESS;
}
apr_status_t h2_mplx_keep_active(h2_mplx *m, int stream_id)
{
- apr_status_t status;
- int acquired;
-
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- check_data_for(m, stream_id);
- leave_mutex(m, acquired);
- }
- return status;
+ H2_MPLX_ENTER(m);
+
+ check_data_for(m, stream_id);
+
+ H2_MPLX_LEAVE(m);
+ return APR_SUCCESS;
}
int h2_mplx_awaits_data(h2_mplx *m)
{
- apr_status_t status;
- int acquired, waiting = 1;
+ int waiting = 1;
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- if (h2_ihash_empty(m->streams)) {
- waiting = 0;
- }
- if (h2_iq_empty(m->readyq) && h2_iq_empty(m->q) && !m->tasks_active) {
- waiting = 0;
- }
- leave_mutex(m, acquired);
+ H2_MPLX_ENTER_ALWAYS(m);
+
+ if (h2_ihash_empty(m->streams)) {
+ waiting = 0;
}
+ if (h2_iq_empty(m->readyq) && h2_iq_empty(m->q) && !m->tasks_active) {
+ waiting = 0;
+ }
+
+ H2_MPLX_LEAVE(m);
return waiting;
}