From: Stefan Eissing Date: Tue, 4 Apr 2017 13:45:09 +0000 (+0000) Subject: On the trunk: X-Git-Tag: 2.5.0-alpha~501 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=68b05db12e155cdae64dc4129676ccaf660d96d1;p=apache On the trunk: mod_http2: code cleanup after eliminating nested locks, giving worker slots their own mutex. git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1790113 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index 04fbbd05cb..d044c19eab 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -55,56 +55,22 @@ typedef struct { apr_time_t now; } stream_iter_ctx; -/* NULL or the mutex hold by this thread, used for recursive calls - */ -static const int nested_lock = 0; - -static apr_threadkey_t *thread_lock; - apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s) { - if (nested_lock) { - return apr_threadkey_private_create(&thread_lock, NULL, pool); - } return APR_SUCCESS; } -static apr_status_t enter_mutex(h2_mplx *m, int *pacquired) -{ - apr_status_t status; - - if (nested_lock) { - void *mutex = NULL; - /* Enter the mutex if this thread already holds the lock or - * if we can acquire it. Only on the later case do we unlock - * onleaving the mutex. - * This allow recursive entering of the mutex from the saem thread, - * which is what we need in certain situations involving callbacks - */ - apr_threadkey_private_get(&mutex, thread_lock); - if (mutex == m->lock) { - *pacquired = 0; - ap_assert(NULL); /* nested, why? */ - return APR_SUCCESS; - } - } - status = apr_thread_mutex_lock(m->lock); - *pacquired = (status == APR_SUCCESS); - if (nested_lock && *pacquired) { - apr_threadkey_private_set(m->lock, thread_lock); - } - return status; -} +#define H2_MPLX_ENTER(m) \ + do { apr_status_t rv; if ((rv = apr_thread_mutex_lock(m->lock)) != APR_SUCCESS) {\ + return rv;\ + } } while(0) -static void leave_mutex(h2_mplx *m, int acquired) -{ - if (acquired) { - if (nested_lock) { - apr_threadkey_private_set(NULL, thread_lock); - } - apr_thread_mutex_unlock(m->lock); - } -} +#define H2_MPLX_ENTER_ALWAYS(m) \ + apr_thread_mutex_lock(m->lock) + +#define H2_MPLX_LEAVE(m) \ + apr_thread_mutex_unlock(m->lock) + static void check_data_for(h2_mplx *m, int stream_id); @@ -259,14 +225,15 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, int h2_mplx_shutdown(h2_mplx *m) { - int acquired, max_stream_started = 0; + int max_stream_started = 0; - if (enter_mutex(m, &acquired) == APR_SUCCESS) { - max_stream_started = m->max_stream_started; - /* Clear schedule queue, disabling existing streams from starting */ - h2_iq_clear(m->q); - leave_mutex(m, acquired); - } + H2_MPLX_ENTER(m); + + max_stream_started = m->max_stream_started; + /* Clear schedule queue, disabling existing streams from starting */ + h2_iq_clear(m->q); + + H2_MPLX_LEAVE(m); return max_stream_started; } @@ -363,18 +330,16 @@ static int stream_iter_wrap(void *ctx, void *stream) apr_status_t h2_mplx_stream_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx) { - apr_status_t status; - int acquired; + stream_iter_ctx_t x; - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - stream_iter_ctx_t x; - x.cb = cb; - x.ctx = ctx; - h2_ihash_iter(m->streams, stream_iter_wrap, &x); + H2_MPLX_ENTER(m); + + x.cb = cb; + x.ctx = ctx; + h2_ihash_iter(m->streams, stream_iter_wrap, &x); - leave_mutex(m, acquired); - } - return status; + H2_MPLX_LEAVE(m); + return APR_SUCCESS; } static int report_stream_iter(void *ctx, void *val) { @@ -430,14 +395,13 @@ void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) { apr_status_t status; int i, wait_secs = 60; - int acquired; /* How to shut down a h2 connection: * 0. abort and tell the workers that no more tasks will come from us */ m->aborted = 1; h2_workers_unregister(m->workers, m); - enter_mutex(m, &acquired); + H2_MPLX_ENTER_ALWAYS(m); /* How to shut down a h2 connection: * 1. cancel all streams still active */ @@ -482,7 +446,7 @@ void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) h2_ihash_iter(m->shold, unexpected_stream_iter, m); } - leave_mutex(m, acquired); + H2_MPLX_LEAVE(m); /* 5. unregister again, now that our workers are done */ h2_workers_unregister(m->workers, m); @@ -493,41 +457,39 @@ void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) apr_status_t h2_mplx_stream_cleanup(h2_mplx *m, h2_stream *stream) { - apr_status_t status = APR_SUCCESS; - int acquired; + H2_MPLX_ENTER(m); - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - H2_STRM_MSG(stream, "cleanup")); - stream_cleanup(m, stream); - leave_mutex(m, acquired); - } - return status; + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + H2_STRM_MSG(stream, "cleanup")); + stream_cleanup(m, stream); + + H2_MPLX_LEAVE(m); + return APR_SUCCESS; } h2_stream *h2_mplx_stream_get(h2_mplx *m, int id) { h2_stream *s = NULL; - int acquired; - if ((enter_mutex(m, &acquired)) == APR_SUCCESS) { - s = h2_ihash_get(m->streams, id); - leave_mutex(m, acquired); - } + H2_MPLX_ENTER_ALWAYS(m); + + s = h2_ihash_get(m->streams, id); + + H2_MPLX_LEAVE(m); return s; } static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes) { h2_mplx *m = ctx; - int acquired; - if (enter_mutex(m, &acquired) == APR_SUCCESS) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - "h2_mplx(%ld-%d): output_produced", m->c->id, beam->id); - check_data_for(m, beam->id); - leave_mutex(m, acquired); - } + H2_MPLX_ENTER_ALWAYS(m); + + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%ld-%d): output_produced", m->c->id, beam->id); + check_data_for(m, beam->id); + + H2_MPLX_LEAVE(m); } static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam) @@ -568,17 +530,17 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam) apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam) { apr_status_t status; - int acquired; - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - if (m->aborted) { - status = APR_ECONNABORTED; - } - else { - status = out_open(m, stream_id, beam); - } - leave_mutex(m, acquired); + H2_MPLX_ENTER(m); + + if (m->aborted) { + status = APR_ECONNABORTED; + } + else { + status = out_open(m, stream_id, beam); } + + H2_MPLX_LEAVE(m); return status; } @@ -609,29 +571,29 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout, apr_thread_cond_t *iowait) { apr_status_t status; - int acquired; - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - if (m->aborted) { - status = APR_ECONNABORTED; - } - else if (apr_atomic_read32(&m->event_pending) > 0) { - status = APR_SUCCESS; - } - else { - purge_streams(m); - h2_ihash_iter(m->streams, report_consumption_iter, m); - m->added_output = iowait; - status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout); - if (APLOGctrace2(m->c)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - "h2_mplx(%ld): trywait on data for %f ms)", - m->id, timeout/1000.0); - } - m->added_output = NULL; + H2_MPLX_ENTER(m); + + if (m->aborted) { + status = APR_ECONNABORTED; + } + else if (apr_atomic_read32(&m->event_pending) > 0) { + status = APR_SUCCESS; + } + else { + purge_streams(m); + h2_ihash_iter(m->streams, report_consumption_iter, m); + m->added_output = iowait; + status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout); + if (APLOGctrace2(m->c)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%ld): trywait on data for %f ms)", + m->id, timeout/1000.0); } - leave_mutex(m, acquired); + m->added_output = NULL; } + + H2_MPLX_LEAVE(m); return status; } @@ -648,19 +610,20 @@ static void check_data_for(h2_mplx *m, int stream_id) apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx) { apr_status_t status; - int acquired; - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - if (m->aborted) { - status = APR_ECONNABORTED; - } - else { - h2_iq_sort(m->q, cmp, ctx); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): reprioritize tasks", m->id); - } - leave_mutex(m, acquired); + H2_MPLX_ENTER(m); + + if (m->aborted) { + status = APR_ECONNABORTED; } + else { + h2_iq_sort(m->q, cmp, ctx); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + "h2_mplx(%ld): reprioritize tasks", m->id); + status = APR_SUCCESS; + } + + H2_MPLX_LEAVE(m); return status; } @@ -682,29 +645,30 @@ apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream, h2_stream_pri_cmp *cmp, void *ctx) { apr_status_t status; - int acquired; - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - if (m->aborted) { - status = APR_ECONNABORTED; + H2_MPLX_ENTER(m); + + if (m->aborted) { + status = APR_ECONNABORTED; + } + else { + status = APR_SUCCESS; + h2_ihash_add(m->streams, stream); + if (h2_stream_is_ready(stream)) { + /* already have a response */ + check_data_for(m, stream->id); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + H2_STRM_MSG(stream, "process, add to readyq")); } else { - h2_ihash_add(m->streams, stream); - if (h2_stream_is_ready(stream)) { - /* already have a response */ - check_data_for(m, stream->id); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - H2_STRM_MSG(stream, "process, add to readyq")); - } - else { - h2_iq_add(m->q, stream->id, cmp, ctx); - register_if_needed(m); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - H2_STRM_MSG(stream, "process, added to q")); - } + h2_iq_add(m->q, stream->id, cmp, ctx); + register_if_needed(m); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + H2_STRM_MSG(stream, "process, added to q")); } - leave_mutex(m, acquired); } + + H2_MPLX_LEAVE(m); return status; } @@ -762,22 +726,21 @@ static h2_task *next_stream_task(h2_mplx *m) h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more) { h2_task *task = NULL; - apr_status_t status; - int acquired; + H2_MPLX_ENTER_ALWAYS(m); + *has_more = 0; - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - if (!m->aborted) { - task = next_stream_task(m); - if (task != NULL && !h2_iq_empty(m->q)) { - *has_more = 1; - } - else { - m->is_registered = 0; /* h2_workers will discard this mplx */ - } + if (!m->aborted) { + task = next_stream_task(m); + if (task != NULL && !h2_iq_empty(m->q)) { + *has_more = 1; + } + else { + m->is_registered = 0; /* h2_workers will discard this mplx */ } - leave_mutex(m, acquired); } + + H2_MPLX_LEAVE(m); return task; } @@ -895,21 +858,20 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask) { - int acquired; - - if (enter_mutex(m, &acquired) == APR_SUCCESS) { - task_done(m, task, NULL); - --m->tasks_active; - if (m->join_wait) { - apr_thread_cond_signal(m->join_wait); - } - if (ptask) { - /* caller wants another task */ - *ptask = next_stream_task(m); - } - register_if_needed(m); - leave_mutex(m, acquired); + H2_MPLX_ENTER_ALWAYS(m); + + task_done(m, task, NULL); + --m->tasks_active; + if (m->join_wait) { + apr_thread_cond_signal(m->join_wait); } + if (ptask) { + /* caller wants another task */ + *ptask = next_stream_task(m); + } + register_if_needed(m); + + H2_MPLX_LEAVE(m); } /******************************************************************************* @@ -1001,52 +963,53 @@ apr_status_t h2_mplx_idle(h2_mplx *m) { apr_status_t status = APR_SUCCESS; apr_time_t now; - int acquired; + apr_size_t scount; - if (enter_mutex(m, &acquired) == APR_SUCCESS) { - apr_size_t scount = h2_ihash_count(m->streams); - if (scount > 0 && m->tasks_active) { - /* If we have streams in connection state 'IDLE', meaning - * all streams are ready to sent data out, but lack - * WINDOW_UPDATEs. - * - * This is ok, unless we have streams that still occupy - * h2 workers. As worker threads are a scarce resource, - * we need to take measures that we do not get DoSed. - * - * This is what we call an 'idle block'. Limit the amount - * of busy workers we allow for this connection until it - * well behaves. - */ - now = apr_time_now(); - m->last_idle_block = now; - if (m->limit_active > 2 - && now - m->last_limit_change >= m->limit_change_interval) { - if (m->limit_active > 16) { - m->limit_active = 16; - } - else if (m->limit_active > 8) { - m->limit_active = 8; - } - else if (m->limit_active > 4) { - m->limit_active = 4; - } - else if (m->limit_active > 2) { - m->limit_active = 2; - } - m->last_limit_change = now; - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): decrease worker limit to %d", - m->id, m->limit_active); + H2_MPLX_ENTER(m); + + scount = h2_ihash_count(m->streams); + if (scount > 0 && m->tasks_active) { + /* If we have streams in connection state 'IDLE', meaning + * all streams are ready to sent data out, but lack + * WINDOW_UPDATEs. + * + * This is ok, unless we have streams that still occupy + * h2 workers. As worker threads are a scarce resource, + * we need to take measures that we do not get DoSed. + * + * This is what we call an 'idle block'. Limit the amount + * of busy workers we allow for this connection until it + * well behaves. + */ + now = apr_time_now(); + m->last_idle_block = now; + if (m->limit_active > 2 + && now - m->last_limit_change >= m->limit_change_interval) { + if (m->limit_active > 16) { + m->limit_active = 16; } - - if (m->tasks_active > m->limit_active) { - status = unschedule_slow_tasks(m); + else if (m->limit_active > 8) { + m->limit_active = 8; } + else if (m->limit_active > 4) { + m->limit_active = 4; + } + else if (m->limit_active > 2) { + m->limit_active = 2; + } + m->last_limit_change = now; + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + "h2_mplx(%ld): decrease worker limit to %d", + m->id, m->limit_active); + } + + if (m->tasks_active > m->limit_active) { + status = unschedule_slow_tasks(m); } - register_if_needed(m); - leave_mutex(m, acquired); } + register_if_needed(m); + + H2_MPLX_LEAVE(m); return status; } @@ -1090,7 +1053,7 @@ apr_status_t h2_mplx_req_engine_push(const char *ngn_type, apr_status_t status; h2_mplx *m; h2_task *task; - int acquired; + h2_stream *stream; task = h2_ctx_rget_task(r); if (!task) { @@ -1098,17 +1061,17 @@ apr_status_t h2_mplx_req_engine_push(const char *ngn_type, } m = task->mplx; - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_stream *stream = h2_ihash_get(m->streams, task->stream_id); - - if (stream) { - status = h2_ngn_shed_push_request(m->ngn_shed, ngn_type, r, einit); - } - else { - status = APR_ECONNABORTED; - } - leave_mutex(m, acquired); + H2_MPLX_ENTER(m); + + stream = h2_ihash_get(m->streams, task->stream_id); + if (stream) { + status = h2_ngn_shed_push_request(m->ngn_shed, ngn_type, r, einit); } + else { + status = APR_ECONNABORTED; + } + + H2_MPLX_LEAVE(m); return status; } @@ -1120,35 +1083,36 @@ apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn, h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn); h2_mplx *m = h2_ngn_shed_get_ctx(shed); apr_status_t status; - int acquired; + int want_shutdown; - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - int want_shutdown = (block == APR_BLOCK_READ); + H2_MPLX_ENTER(m); - /* Take this opportunity to update output consummation - * for this engine */ - ngn_out_update_windows(m, ngn); - - if (want_shutdown && !h2_iq_empty(m->q)) { - /* For a blocking read, check first if requests are to be - * had and, if not, wait a short while before doing the - * blocking, and if unsuccessful, terminating read. - */ + want_shutdown = (block == APR_BLOCK_READ); + + /* Take this opportunity to update output consummation + * for this engine */ + ngn_out_update_windows(m, ngn); + + if (want_shutdown && !h2_iq_empty(m->q)) { + /* For a blocking read, check first if requests are to be + * had and, if not, wait a short while before doing the + * blocking, and if unsuccessful, terminating read. + */ + status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr); + if (APR_STATUS_IS_EAGAIN(status)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + "h2_mplx(%ld): start block engine pull", m->id); + apr_thread_cond_timedwait(m->task_thawed, m->lock, + apr_time_from_msec(20)); status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr); - if (APR_STATUS_IS_EAGAIN(status)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): start block engine pull", m->id); - apr_thread_cond_timedwait(m->task_thawed, m->lock, - apr_time_from_msec(20)); - status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr); - } } - else { - status = h2_ngn_shed_pull_request(shed, ngn, capacity, - want_shutdown, pr); - } - leave_mutex(m, acquired); } + else { + status = h2_ngn_shed_pull_request(shed, ngn, capacity, + want_shutdown, pr); + } + + H2_MPLX_LEAVE(m); return status; } @@ -1159,29 +1123,28 @@ void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn, if (task) { h2_mplx *m = task->mplx; - int acquired; + h2_stream *stream; - if (enter_mutex(m, &acquired) == APR_SUCCESS) { - h2_stream *stream = h2_ihash_get(m->streams, task->stream_id); - - ngn_out_update_windows(m, ngn); - h2_ngn_shed_done_task(m->ngn_shed, ngn, task); - - if (status != APR_SUCCESS && stream - && h2_task_can_redo(task) - && !h2_ihash_get(m->sredo, stream->id)) { - h2_ihash_add(m->sredo, stream); - } - if (task->engine) { - /* cannot report that as done until engine returns */ - } - else { - task_done(m, task, ngn); - } - /* Take this opportunity to update output consummation - * for this engine */ - leave_mutex(m, acquired); + H2_MPLX_ENTER_ALWAYS(m); + + stream = h2_ihash_get(m->streams, task->stream_id); + + ngn_out_update_windows(m, ngn); + h2_ngn_shed_done_task(m->ngn_shed, ngn, task); + + if (status != APR_SUCCESS && stream + && h2_task_can_redo(task) + && !h2_ihash_get(m->sredo, stream->id)) { + h2_ihash_add(m->sredo, stream); + } + if (task->engine) { + /* cannot report that as done until engine returns */ } + else { + task_done(m, task, ngn); + } + + H2_MPLX_LEAVE(m); } } @@ -1198,65 +1161,64 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, stream_ev_callback *on_resume, void *on_ctx) { - apr_status_t status; - int acquired; int ids[100]; h2_stream *stream; size_t i, n; - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - "h2_mplx(%ld): dispatch events", m->id); - apr_atomic_set32(&m->event_pending, 0); - purge_streams(m); - - /* update input windows for streams */ - h2_ihash_iter(m->streams, report_consumption_iter, m); - - if (!h2_iq_empty(m->readyq)) { - n = h2_iq_mshift(m->readyq, ids, H2_ALEN(ids)); - for (i = 0; i < n; ++i) { - stream = h2_ihash_get(m->streams, ids[i]); - if (stream) { - leave_mutex(m, acquired); - on_resume(on_ctx, stream); - enter_mutex(m, &acquired); - } + H2_MPLX_ENTER(m); + + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%ld): dispatch events", m->id); + apr_atomic_set32(&m->event_pending, 0); + purge_streams(m); + + /* update input windows for streams */ + h2_ihash_iter(m->streams, report_consumption_iter, m); + + if (!h2_iq_empty(m->readyq)) { + n = h2_iq_mshift(m->readyq, ids, H2_ALEN(ids)); + for (i = 0; i < n; ++i) { + stream = h2_ihash_get(m->streams, ids[i]); + if (stream) { + H2_MPLX_LEAVE(m); + + on_resume(on_ctx, stream); + + H2_MPLX_ENTER(m); } } - if (!h2_iq_empty(m->readyq)) { - apr_atomic_set32(&m->event_pending, 1); - } - leave_mutex(m, acquired); } - return status; + if (!h2_iq_empty(m->readyq)) { + apr_atomic_set32(&m->event_pending, 1); + } + + H2_MPLX_LEAVE(m); + return APR_SUCCESS; } apr_status_t h2_mplx_keep_active(h2_mplx *m, int stream_id) { - apr_status_t status; - int acquired; - - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - check_data_for(m, stream_id); - leave_mutex(m, acquired); - } - return status; + H2_MPLX_ENTER(m); + + check_data_for(m, stream_id); + + H2_MPLX_LEAVE(m); + return APR_SUCCESS; } int h2_mplx_awaits_data(h2_mplx *m) { - apr_status_t status; - int acquired, waiting = 1; + int waiting = 1; - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - if (h2_ihash_empty(m->streams)) { - waiting = 0; - } - if (h2_iq_empty(m->readyq) && h2_iq_empty(m->q) && !m->tasks_active) { - waiting = 0; - } - leave_mutex(m, acquired); + H2_MPLX_ENTER_ALWAYS(m); + + if (h2_ihash_empty(m->streams)) { + waiting = 0; } + if (h2_iq_empty(m->readyq) && h2_iq_empty(m->q) && !m->tasks_active) { + waiting = 0; + } + + H2_MPLX_LEAVE(m); return waiting; } diff --git a/modules/http2/h2_workers.c b/modules/http2/h2_workers.c index fa395255e9..8eea35caea 100644 --- a/modules/http2/h2_workers.c +++ b/modules/http2/h2_workers.c @@ -39,6 +39,7 @@ struct h2_slot { int sticks; h2_task *task; apr_thread_t *thread; + apr_thread_mutex_t *lock; apr_thread_cond_t *not_idle; }; @@ -78,6 +79,17 @@ static apr_status_t activate_slot(h2_workers *workers, h2_slot *slot) slot->workers = workers; slot->aborted = 0; slot->task = NULL; + + if (!slot->lock) { + status = apr_thread_mutex_create(&slot->lock, + APR_THREAD_MUTEX_DEFAULT, + workers->pool); + if (status != APR_SUCCESS) { + push_slot(&workers->free, slot); + return status; + } + } + if (!slot->not_idle) { status = apr_thread_cond_create(&slot->not_idle, workers->pool); if (status != APR_SUCCESS) { @@ -112,9 +124,9 @@ static void wake_idle_worker(h2_workers *workers) { h2_slot *slot = pop_slot(&workers->idle); if (slot) { - apr_thread_mutex_lock(workers->lock); + apr_thread_mutex_lock(slot->lock); apr_thread_cond_signal(slot->not_idle); - apr_thread_mutex_unlock(workers->lock); + apr_thread_mutex_unlock(slot->lock); } else if (workers->dynamic) { add_worker(workers); @@ -185,15 +197,16 @@ static apr_status_t get_next(h2_slot *slot) return APR_SUCCESS; } - apr_thread_mutex_lock(workers->lock); cleanup_zombies(workers); ++workers->idle_workers; + + apr_thread_mutex_lock(slot->lock); push_slot(&workers->idle, slot); - apr_thread_cond_wait(slot->not_idle, workers->lock); - --workers->idle_workers; + apr_thread_cond_wait(slot->not_idle, slot->lock); + apr_thread_mutex_unlock(slot->lock); - apr_thread_mutex_unlock(workers->lock); + --workers->idle_workers; } return APR_EOF; } @@ -239,21 +252,21 @@ static apr_status_t workers_pool_cleanup(void *data) h2_slot *slot; if (!workers->aborted) { - apr_thread_mutex_lock(workers->lock); workers->aborted = 1; /* before we go, cleanup any zombies and abort the rest */ cleanup_zombies(workers); for (;;) { slot = pop_slot(&workers->idle); if (slot) { + apr_thread_mutex_lock(slot->lock); slot->aborted = 1; apr_thread_cond_signal(slot->not_idle); + apr_thread_mutex_unlock(slot->lock); } else { break; } } - apr_thread_mutex_unlock(workers->lock); h2_fifo_term(workers->mplxs); h2_fifo_interrupt(workers->mplxs);