]> granicus.if.org Git - apache/commitdiff
On the trunk:
authorStefan Eissing <icing@apache.org>
Tue, 4 Apr 2017 13:45:09 +0000 (13:45 +0000)
committerStefan Eissing <icing@apache.org>
Tue, 4 Apr 2017 13:45:09 +0000 (13:45 +0000)
mod_http2: code cleanup after eliminating nested locks, giving worker slots their own mutex.

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1790113 13f79535-47bb-0310-9956-ffa450edef68

modules/http2/h2_mplx.c
modules/http2/h2_workers.c

index 04fbbd05cbce638a631cb21014d65c34eeff169b..d044c19eabb2a230706b015727d5f60f7dd11dbc 100644 (file)
@@ -55,56 +55,22 @@ typedef struct {
     apr_time_t now;
 } stream_iter_ctx;
 
-/* NULL or the mutex hold by this thread, used for recursive calls
- */
-static const int nested_lock = 0;
-
-static apr_threadkey_t *thread_lock;
-
 apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s)
 {
-    if (nested_lock) {
-        return apr_threadkey_private_create(&thread_lock, NULL, pool);
-    }
     return APR_SUCCESS;
 }
 
-static apr_status_t enter_mutex(h2_mplx *m, int *pacquired)
-{
-    apr_status_t status;
-    
-    if (nested_lock) {
-        void *mutex = NULL;
-        /* Enter the mutex if this thread already holds the lock or
-         * if we can acquire it. Only on the later case do we unlock
-         * onleaving the mutex.
-         * This allow recursive entering of the mutex from the saem thread,
-         * which is what we need in certain situations involving callbacks
-         */
-        apr_threadkey_private_get(&mutex, thread_lock);
-        if (mutex == m->lock) {
-            *pacquired = 0;
-            ap_assert(NULL); /* nested, why? */
-            return APR_SUCCESS;
-        }
-    }
-    status = apr_thread_mutex_lock(m->lock);
-    *pacquired = (status == APR_SUCCESS);
-    if (nested_lock && *pacquired) {
-        apr_threadkey_private_set(m->lock, thread_lock);
-    }
-    return status;
-}
+#define H2_MPLX_ENTER(m)    \
+    do { apr_status_t rv; if ((rv = apr_thread_mutex_lock(m->lock)) != APR_SUCCESS) {\
+        return rv;\
+    } } while(0)
 
-static void leave_mutex(h2_mplx *m, int acquired)
-{
-    if (acquired) {
-        if (nested_lock) {
-            apr_threadkey_private_set(NULL, thread_lock);
-        }
-        apr_thread_mutex_unlock(m->lock);
-    }
-}
+#define H2_MPLX_ENTER_ALWAYS(m)    \
+    apr_thread_mutex_lock(m->lock)
+
+#define H2_MPLX_LEAVE(m)    \
+    apr_thread_mutex_unlock(m->lock)
 
 static void check_data_for(h2_mplx *m, int stream_id);
 
@@ -259,14 +225,15 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
 
 int h2_mplx_shutdown(h2_mplx *m)
 {
-    int acquired, max_stream_started = 0;
+    int max_stream_started = 0;
     
-    if (enter_mutex(m, &acquired) == APR_SUCCESS) {
-        max_stream_started = m->max_stream_started;
-        /* Clear schedule queue, disabling existing streams from starting */ 
-        h2_iq_clear(m->q);
-        leave_mutex(m, acquired);
-    }
+    H2_MPLX_ENTER(m);
+
+    max_stream_started = m->max_stream_started;
+    /* Clear schedule queue, disabling existing streams from starting */ 
+    h2_iq_clear(m->q);
+
+    H2_MPLX_LEAVE(m);
     return max_stream_started;
 }
 
@@ -363,18 +330,16 @@ static int stream_iter_wrap(void *ctx, void *stream)
 
 apr_status_t h2_mplx_stream_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx)
 {
-    apr_status_t status;
-    int acquired;
+    stream_iter_ctx_t x;
     
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        stream_iter_ctx_t x;
-        x.cb = cb;
-        x.ctx = ctx;
-        h2_ihash_iter(m->streams, stream_iter_wrap, &x);
+    H2_MPLX_ENTER(m);
+
+    x.cb = cb;
+    x.ctx = ctx;
+    h2_ihash_iter(m->streams, stream_iter_wrap, &x);
         
-        leave_mutex(m, acquired);
-    }
-    return status;
+    H2_MPLX_LEAVE(m);
+    return APR_SUCCESS;
 }
 
 static int report_stream_iter(void *ctx, void *val) {
@@ -430,14 +395,13 @@ void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
 {
     apr_status_t status;
     int i, wait_secs = 60;
-    int acquired;
 
     /* How to shut down a h2 connection:
      * 0. abort and tell the workers that no more tasks will come from us */
     m->aborted = 1;
     h2_workers_unregister(m->workers, m);
     
-    enter_mutex(m, &acquired);
+    H2_MPLX_ENTER_ALWAYS(m);
 
     /* How to shut down a h2 connection:
      * 1. cancel all streams still active */
@@ -482,7 +446,7 @@ void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
         h2_ihash_iter(m->shold, unexpected_stream_iter, m);
     }
     
-    leave_mutex(m, acquired);
+    H2_MPLX_LEAVE(m);
 
     /* 5. unregister again, now that our workers are done */
     h2_workers_unregister(m->workers, m);
@@ -493,41 +457,39 @@ void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
 
 apr_status_t h2_mplx_stream_cleanup(h2_mplx *m, h2_stream *stream)
 {
-    apr_status_t status = APR_SUCCESS;
-    int acquired;
+    H2_MPLX_ENTER(m);
     
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
-                      H2_STRM_MSG(stream, "cleanup"));
-        stream_cleanup(m, stream);        
-        leave_mutex(m, acquired);
-    }
-    return status;
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
+                  H2_STRM_MSG(stream, "cleanup"));
+    stream_cleanup(m, stream);        
+    
+    H2_MPLX_LEAVE(m);
+    return APR_SUCCESS;
 }
 
 h2_stream *h2_mplx_stream_get(h2_mplx *m, int id)
 {
     h2_stream *s = NULL;
-    int acquired;
     
-    if ((enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        s = h2_ihash_get(m->streams, id);
-        leave_mutex(m, acquired);
-    }
+    H2_MPLX_ENTER_ALWAYS(m);
+
+    s = h2_ihash_get(m->streams, id);
+
+    H2_MPLX_LEAVE(m);
     return s;
 }
 
 static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
 {
     h2_mplx *m = ctx;
-    int acquired;
     
-    if (enter_mutex(m, &acquired) == APR_SUCCESS) {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
-                      "h2_mplx(%ld-%d): output_produced", m->c->id, beam->id);
-        check_data_for(m, beam->id);
-        leave_mutex(m, acquired);
-    }
+    H2_MPLX_ENTER_ALWAYS(m);
+
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+                  "h2_mplx(%ld-%d): output_produced", m->c->id, beam->id);
+    check_data_for(m, beam->id);
+
+    H2_MPLX_LEAVE(m);
 }
 
 static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
@@ -568,17 +530,17 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
 apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
 {
     apr_status_t status;
-    int acquired;
     
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        if (m->aborted) {
-            status = APR_ECONNABORTED;
-        }
-        else {
-            status = out_open(m, stream_id, beam);
-        }
-        leave_mutex(m, acquired);
+    H2_MPLX_ENTER(m);
+
+    if (m->aborted) {
+        status = APR_ECONNABORTED;
+    }
+    else {
+        status = out_open(m, stream_id, beam);
     }
+
+    H2_MPLX_LEAVE(m);
     return status;
 }
 
@@ -609,29 +571,29 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
                                  apr_thread_cond_t *iowait)
 {
     apr_status_t status;
-    int acquired;
     
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        if (m->aborted) {
-            status = APR_ECONNABORTED;
-        }
-        else if (apr_atomic_read32(&m->event_pending) > 0) {
-            status = APR_SUCCESS;
-        }
-        else {
-            purge_streams(m);
-            h2_ihash_iter(m->streams, report_consumption_iter, m);
-            m->added_output = iowait;
-            status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
-            if (APLOGctrace2(m->c)) {
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
-                              "h2_mplx(%ld): trywait on data for %f ms)",
-                              m->id, timeout/1000.0);
-            }
-            m->added_output = NULL;
+    H2_MPLX_ENTER(m);
+
+    if (m->aborted) {
+        status = APR_ECONNABORTED;
+    }
+    else if (apr_atomic_read32(&m->event_pending) > 0) {
+        status = APR_SUCCESS;
+    }
+    else {
+        purge_streams(m);
+        h2_ihash_iter(m->streams, report_consumption_iter, m);
+        m->added_output = iowait;
+        status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
+        if (APLOGctrace2(m->c)) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+                          "h2_mplx(%ld): trywait on data for %f ms)",
+                          m->id, timeout/1000.0);
         }
-        leave_mutex(m, acquired);
+        m->added_output = NULL;
     }
+
+    H2_MPLX_LEAVE(m);
     return status;
 }
 
@@ -648,19 +610,20 @@ static void check_data_for(h2_mplx *m, int stream_id)
 apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx)
 {
     apr_status_t status;
-    int acquired;
     
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        if (m->aborted) {
-            status = APR_ECONNABORTED;
-        }
-        else {
-            h2_iq_sort(m->q, cmp, ctx);
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                          "h2_mplx(%ld): reprioritize tasks", m->id);
-        }
-        leave_mutex(m, acquired);
+    H2_MPLX_ENTER(m);
+
+    if (m->aborted) {
+        status = APR_ECONNABORTED;
     }
+    else {
+        h2_iq_sort(m->q, cmp, ctx);
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                      "h2_mplx(%ld): reprioritize tasks", m->id);
+        status = APR_SUCCESS;
+    }
+
+    H2_MPLX_LEAVE(m);
     return status;
 }
 
@@ -682,29 +645,30 @@ apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream,
                              h2_stream_pri_cmp *cmp, void *ctx)
 {
     apr_status_t status;
-    int acquired;
     
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        if (m->aborted) {
-            status = APR_ECONNABORTED;
+    H2_MPLX_ENTER(m);
+
+    if (m->aborted) {
+        status = APR_ECONNABORTED;
+    }
+    else {
+        status = APR_SUCCESS;
+        h2_ihash_add(m->streams, stream);
+        if (h2_stream_is_ready(stream)) {
+            /* already have a response */
+            check_data_for(m, stream->id);
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                          H2_STRM_MSG(stream, "process, add to readyq")); 
         }
         else {
-            h2_ihash_add(m->streams, stream);
-            if (h2_stream_is_ready(stream)) {
-                /* already have a response */
-                check_data_for(m, stream->id);
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                              H2_STRM_MSG(stream, "process, add to readyq")); 
-            }
-            else {
-                h2_iq_add(m->q, stream->id, cmp, ctx);
-                register_if_needed(m);                
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                              H2_STRM_MSG(stream, "process, added to q")); 
-            }
+            h2_iq_add(m->q, stream->id, cmp, ctx);
+            register_if_needed(m);                
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                          H2_STRM_MSG(stream, "process, added to q")); 
         }
-        leave_mutex(m, acquired);
     }
+
+    H2_MPLX_LEAVE(m);
     return status;
 }
 
@@ -762,22 +726,21 @@ static h2_task *next_stream_task(h2_mplx *m)
 h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more)
 {
     h2_task *task = NULL;
-    apr_status_t status;
-    int acquired;
     
+    H2_MPLX_ENTER_ALWAYS(m);
+
     *has_more = 0;
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        if (!m->aborted) {
-            task = next_stream_task(m);
-            if (task != NULL && !h2_iq_empty(m->q)) {
-                *has_more = 1;
-            }
-            else {
-                m->is_registered = 0; /* h2_workers will discard this mplx */
-            }
+    if (!m->aborted) {
+        task = next_stream_task(m);
+        if (task != NULL && !h2_iq_empty(m->q)) {
+            *has_more = 1;
+        }
+        else {
+            m->is_registered = 0; /* h2_workers will discard this mplx */
         }
-        leave_mutex(m, acquired);
     }
+
+    H2_MPLX_LEAVE(m);
     return task;
 }
 
@@ -895,21 +858,20 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
 
 void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask)
 {
-    int acquired;
-    
-    if (enter_mutex(m, &acquired) == APR_SUCCESS) {
-        task_done(m, task, NULL);
-        --m->tasks_active;
-        if (m->join_wait) {
-            apr_thread_cond_signal(m->join_wait);
-        }
-        if (ptask) {
-            /* caller wants another task */
-            *ptask = next_stream_task(m);
-        }
-        register_if_needed(m);
-        leave_mutex(m, acquired);
+    H2_MPLX_ENTER_ALWAYS(m);
+
+    task_done(m, task, NULL);
+    --m->tasks_active;
+    if (m->join_wait) {
+        apr_thread_cond_signal(m->join_wait);
     }
+    if (ptask) {
+        /* caller wants another task */
+        *ptask = next_stream_task(m);
+    }
+    register_if_needed(m);
+
+    H2_MPLX_LEAVE(m);
 }
 
 /*******************************************************************************
@@ -1001,52 +963,53 @@ apr_status_t h2_mplx_idle(h2_mplx *m)
 {
     apr_status_t status = APR_SUCCESS;
     apr_time_t now;            
-    int acquired;
+    apr_size_t scount;
     
-    if (enter_mutex(m, &acquired) == APR_SUCCESS) {
-        apr_size_t scount = h2_ihash_count(m->streams);
-        if (scount > 0 && m->tasks_active) {
-            /* If we have streams in connection state 'IDLE', meaning
-             * all streams are ready to sent data out, but lack
-             * WINDOW_UPDATEs. 
-             * 
-             * This is ok, unless we have streams that still occupy
-             * h2 workers. As worker threads are a scarce resource, 
-             * we need to take measures that we do not get DoSed.
-             * 
-             * This is what we call an 'idle block'. Limit the amount 
-             * of busy workers we allow for this connection until it
-             * well behaves.
-             */
-            now = apr_time_now();
-            m->last_idle_block = now;
-            if (m->limit_active > 2 
-                && now - m->last_limit_change >= m->limit_change_interval) {
-                if (m->limit_active > 16) {
-                    m->limit_active = 16;
-                }
-                else if (m->limit_active > 8) {
-                    m->limit_active = 8;
-                }
-                else if (m->limit_active > 4) {
-                    m->limit_active = 4;
-                }
-                else if (m->limit_active > 2) {
-                    m->limit_active = 2;
-                }
-                m->last_limit_change = now;
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                              "h2_mplx(%ld): decrease worker limit to %d",
-                              m->id, m->limit_active);
+    H2_MPLX_ENTER(m);
+
+    scount = h2_ihash_count(m->streams);
+    if (scount > 0 && m->tasks_active) {
+        /* If we have streams in connection state 'IDLE', meaning
+         * all streams are ready to sent data out, but lack
+         * WINDOW_UPDATEs. 
+         * 
+         * This is ok, unless we have streams that still occupy
+         * h2 workers. As worker threads are a scarce resource, 
+         * we need to take measures that we do not get DoSed.
+         * 
+         * This is what we call an 'idle block'. Limit the amount 
+         * of busy workers we allow for this connection until it
+         * well behaves.
+         */
+        now = apr_time_now();
+        m->last_idle_block = now;
+        if (m->limit_active > 2 
+            && now - m->last_limit_change >= m->limit_change_interval) {
+            if (m->limit_active > 16) {
+                m->limit_active = 16;
             }
-            
-            if (m->tasks_active > m->limit_active) {
-                status = unschedule_slow_tasks(m);
+            else if (m->limit_active > 8) {
+                m->limit_active = 8;
             }
+            else if (m->limit_active > 4) {
+                m->limit_active = 4;
+            }
+            else if (m->limit_active > 2) {
+                m->limit_active = 2;
+            }
+            m->last_limit_change = now;
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                          "h2_mplx(%ld): decrease worker limit to %d",
+                          m->id, m->limit_active);
+        }
+        
+        if (m->tasks_active > m->limit_active) {
+            status = unschedule_slow_tasks(m);
         }
-        register_if_needed(m);
-        leave_mutex(m, acquired);
     }
+    register_if_needed(m);
+
+    H2_MPLX_LEAVE(m);
     return status;
 }
 
@@ -1090,7 +1053,7 @@ apr_status_t h2_mplx_req_engine_push(const char *ngn_type,
     apr_status_t status;
     h2_mplx *m;
     h2_task *task;
-    int acquired;
+    h2_stream *stream;
     
     task = h2_ctx_rget_task(r);
     if (!task) {
@@ -1098,17 +1061,17 @@ apr_status_t h2_mplx_req_engine_push(const char *ngn_type,
     }
     m = task->mplx;
     
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
-        
-        if (stream) {
-            status = h2_ngn_shed_push_request(m->ngn_shed, ngn_type, r, einit);
-        }
-        else {
-            status = APR_ECONNABORTED;
-        }
-        leave_mutex(m, acquired);
+    H2_MPLX_ENTER(m);
+
+    stream = h2_ihash_get(m->streams, task->stream_id);
+    if (stream) {
+        status = h2_ngn_shed_push_request(m->ngn_shed, ngn_type, r, einit);
     }
+    else {
+        status = APR_ECONNABORTED;
+    }
+
+    H2_MPLX_LEAVE(m);
     return status;
 }
 
@@ -1120,35 +1083,36 @@ apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn,
     h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn);
     h2_mplx *m = h2_ngn_shed_get_ctx(shed);
     apr_status_t status;
-    int acquired;
+    int want_shutdown;
     
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        int want_shutdown = (block == APR_BLOCK_READ);
+    H2_MPLX_ENTER(m);
 
-        /* Take this opportunity to update output consummation 
-         * for this engine */
-        ngn_out_update_windows(m, ngn);
-        
-        if (want_shutdown && !h2_iq_empty(m->q)) {
-            /* For a blocking read, check first if requests are to be
-             * had and, if not, wait a short while before doing the
-             * blocking, and if unsuccessful, terminating read.
-             */
+    want_shutdown = (block == APR_BLOCK_READ);
+
+    /* Take this opportunity to update output consummation 
+     * for this engine */
+    ngn_out_update_windows(m, ngn);
+    
+    if (want_shutdown && !h2_iq_empty(m->q)) {
+        /* For a blocking read, check first if requests are to be
+         * had and, if not, wait a short while before doing the
+         * blocking, and if unsuccessful, terminating read.
+         */
+        status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr);
+        if (APR_STATUS_IS_EAGAIN(status)) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                          "h2_mplx(%ld): start block engine pull", m->id);
+            apr_thread_cond_timedwait(m->task_thawed, m->lock, 
+                                      apr_time_from_msec(20));
             status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr);
-            if (APR_STATUS_IS_EAGAIN(status)) {
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                              "h2_mplx(%ld): start block engine pull", m->id);
-                apr_thread_cond_timedwait(m->task_thawed, m->lock, 
-                                          apr_time_from_msec(20));
-                status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr);
-            }
         }
-        else {
-            status = h2_ngn_shed_pull_request(shed, ngn, capacity,
-                                              want_shutdown, pr);
-        }
-        leave_mutex(m, acquired);
     }
+    else {
+        status = h2_ngn_shed_pull_request(shed, ngn, capacity,
+                                          want_shutdown, pr);
+    }
+
+    H2_MPLX_LEAVE(m);
     return status;
 }
  
@@ -1159,29 +1123,28 @@ void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn,
     
     if (task) {
         h2_mplx *m = task->mplx;
-        int acquired;
+        h2_stream *stream;
 
-        if (enter_mutex(m, &acquired) == APR_SUCCESS) {
-            h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
-            
-            ngn_out_update_windows(m, ngn);
-            h2_ngn_shed_done_task(m->ngn_shed, ngn, task);
-            
-            if (status != APR_SUCCESS && stream 
-                && h2_task_can_redo(task) 
-                && !h2_ihash_get(m->sredo, stream->id)) {
-                h2_ihash_add(m->sredo, stream);
-            }
-            if (task->engine) { 
-                /* cannot report that as done until engine returns */
-            }
-            else {
-                task_done(m, task, ngn);
-            }
-            /* Take this opportunity to update output consummation 
-             * for this engine */
-            leave_mutex(m, acquired);
+        H2_MPLX_ENTER_ALWAYS(m);
+
+        stream = h2_ihash_get(m->streams, task->stream_id);
+        
+        ngn_out_update_windows(m, ngn);
+        h2_ngn_shed_done_task(m->ngn_shed, ngn, task);
+        
+        if (status != APR_SUCCESS && stream 
+            && h2_task_can_redo(task) 
+            && !h2_ihash_get(m->sredo, stream->id)) {
+            h2_ihash_add(m->sredo, stream);
+        }
+        if (task->engine) { 
+            /* cannot report that as done until engine returns */
         }
+        else {
+            task_done(m, task, ngn);
+        }
+
+        H2_MPLX_LEAVE(m);
     }
 }
 
@@ -1198,65 +1161,64 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m,
                                             stream_ev_callback *on_resume, 
                                             void *on_ctx)
 {
-    apr_status_t status;
-    int acquired;
     int ids[100];
     h2_stream *stream;
     size_t i, n;
     
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
-                      "h2_mplx(%ld): dispatch events", m->id);        
-        apr_atomic_set32(&m->event_pending, 0);
-        purge_streams(m);
-        
-        /* update input windows for streams */
-        h2_ihash_iter(m->streams, report_consumption_iter, m);
-        
-        if (!h2_iq_empty(m->readyq)) {
-            n = h2_iq_mshift(m->readyq, ids, H2_ALEN(ids));
-            for (i = 0; i < n; ++i) {
-                stream = h2_ihash_get(m->streams, ids[i]);
-                if (stream) {
-                    leave_mutex(m, acquired);
-                    on_resume(on_ctx, stream);
-                    enter_mutex(m, &acquired);
-                }
+    H2_MPLX_ENTER(m);
+
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
+                  "h2_mplx(%ld): dispatch events", m->id);        
+    apr_atomic_set32(&m->event_pending, 0);
+    purge_streams(m);
+    
+    /* update input windows for streams */
+    h2_ihash_iter(m->streams, report_consumption_iter, m);
+    
+    if (!h2_iq_empty(m->readyq)) {
+        n = h2_iq_mshift(m->readyq, ids, H2_ALEN(ids));
+        for (i = 0; i < n; ++i) {
+            stream = h2_ihash_get(m->streams, ids[i]);
+            if (stream) {
+                H2_MPLX_LEAVE(m);
+
+                on_resume(on_ctx, stream);
+
+                H2_MPLX_ENTER(m);
             }
         }
-        if (!h2_iq_empty(m->readyq)) {
-            apr_atomic_set32(&m->event_pending, 1);
-        } 
-        leave_mutex(m, acquired);
     }
-    return status;
+    if (!h2_iq_empty(m->readyq)) {
+        apr_atomic_set32(&m->event_pending, 1);
+    } 
+
+    H2_MPLX_LEAVE(m);
+    return APR_SUCCESS;
 }
 
 apr_status_t h2_mplx_keep_active(h2_mplx *m, int stream_id)
 {
-    apr_status_t status;
-    int acquired;
-    
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        check_data_for(m, stream_id);
-        leave_mutex(m, acquired);
-    }
-    return status;
+    H2_MPLX_ENTER(m);
+
+    check_data_for(m, stream_id);
+
+    H2_MPLX_LEAVE(m);
+    return APR_SUCCESS;
 }
 
 int h2_mplx_awaits_data(h2_mplx *m)
 {
-    apr_status_t status;
-    int acquired, waiting = 1;
+    int waiting = 1;
      
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        if (h2_ihash_empty(m->streams)) {
-            waiting = 0;
-        }
-        if (h2_iq_empty(m->readyq) && h2_iq_empty(m->q) && !m->tasks_active) {
-            waiting = 0;
-        }
-        leave_mutex(m, acquired);
+    H2_MPLX_ENTER_ALWAYS(m);
+
+    if (h2_ihash_empty(m->streams)) {
+        waiting = 0;
     }
+    if (h2_iq_empty(m->readyq) && h2_iq_empty(m->q) && !m->tasks_active) {
+        waiting = 0;
+    }
+
+    H2_MPLX_LEAVE(m);
     return waiting;
 }
index fa395255e9f7a695b557af91d52a5021e14b0d43..8eea35caeaf8c23fc21f817b1d8f72f3d8d0c719 100644 (file)
@@ -39,6 +39,7 @@ struct h2_slot {
     int sticks;
     h2_task *task;
     apr_thread_t *thread;
+    apr_thread_mutex_t *lock;
     apr_thread_cond_t *not_idle;
 };
 
@@ -78,6 +79,17 @@ static apr_status_t activate_slot(h2_workers *workers, h2_slot *slot)
     slot->workers = workers;
     slot->aborted = 0;
     slot->task = NULL;
+
+    if (!slot->lock) {
+        status = apr_thread_mutex_create(&slot->lock,
+                                         APR_THREAD_MUTEX_DEFAULT,
+                                         workers->pool);
+        if (status != APR_SUCCESS) {
+            push_slot(&workers->free, slot);
+            return status;
+        }
+    }
+
     if (!slot->not_idle) {
         status = apr_thread_cond_create(&slot->not_idle, workers->pool);
         if (status != APR_SUCCESS) {
@@ -112,9 +124,9 @@ static void wake_idle_worker(h2_workers *workers)
 {
     h2_slot *slot = pop_slot(&workers->idle);
     if (slot) {
-        apr_thread_mutex_lock(workers->lock);
+        apr_thread_mutex_lock(slot->lock);
         apr_thread_cond_signal(slot->not_idle);
-        apr_thread_mutex_unlock(workers->lock);
+        apr_thread_mutex_unlock(slot->lock);
     }
     else if (workers->dynamic) {
         add_worker(workers);
@@ -185,15 +197,16 @@ static apr_status_t get_next(h2_slot *slot)
             return APR_SUCCESS;
         }
         
-        apr_thread_mutex_lock(workers->lock);
         cleanup_zombies(workers);
 
         ++workers->idle_workers;
+
+        apr_thread_mutex_lock(slot->lock);
         push_slot(&workers->idle, slot);
-        apr_thread_cond_wait(slot->not_idle, workers->lock);
-        --workers->idle_workers;
+        apr_thread_cond_wait(slot->not_idle, slot->lock);
+        apr_thread_mutex_unlock(slot->lock);
 
-        apr_thread_mutex_unlock(workers->lock);
+        --workers->idle_workers;
     }
     return APR_EOF;
 }
@@ -239,21 +252,21 @@ static apr_status_t workers_pool_cleanup(void *data)
     h2_slot *slot;
     
     if (!workers->aborted) {
-        apr_thread_mutex_lock(workers->lock);
         workers->aborted = 1;
         /* before we go, cleanup any zombies and abort the rest */
         cleanup_zombies(workers);
         for (;;) {
             slot = pop_slot(&workers->idle);
             if (slot) {
+                apr_thread_mutex_lock(slot->lock);
                 slot->aborted = 1;
                 apr_thread_cond_signal(slot->not_idle);
+                apr_thread_mutex_unlock(slot->lock);
             }
             else {
                 break;
             }
         }
-        apr_thread_mutex_unlock(workers->lock);
 
         h2_fifo_term(workers->mplxs);
         h2_fifo_interrupt(workers->mplxs);