]> granicus.if.org Git - apache/commitdiff
On the trunk:
authorStefan Eissing <icing@apache.org>
Wed, 5 Apr 2017 14:49:25 +0000 (14:49 +0000)
committerStefan Eissing <icing@apache.org>
Wed, 5 Apr 2017 14:49:25 +0000 (14:49 +0000)
mod_http2: less and more granular mutex use for improved performance.

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1790284 13f79535-47bb-0310-9956-ffa450edef68

CHANGES
modules/http2/h2_mplx.c
modules/http2/h2_mplx.h
modules/http2/h2_stream.c
modules/http2/h2_util.c
modules/http2/h2_util.h
modules/http2/h2_workers.c
modules/http2/h2_workers.h

diff --git a/CHANGES b/CHANGES
index 716eaa5cfcef2f85f0cfdb4440c0176dd1982a26..5f05476f64313e2348df16a143b9c67310782c59 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -1,8 +1,10 @@
                                                          -*- coding: utf-8 -*-
 Changes with Apache 2.5.0
 
-  *) mod_http2/mod_proxy_http2: less read attempts on bucket beams that already
-     delivered EOS/headers. Fixed bug in re-attempting proxy request after 
+  *) mod_http2: less and more granular mutex use for improved performance.
+     [Stefan Eissing]
+     
+  *) mod_proxy_http2: Fixed bug in re-attempting proxy requests after 
      connection error. [Stefan Eissing]
   
   *) core: Disallow multiple Listen on the same IP:port when listener buckets
index d044c19eabb2a230706b015727d5f60f7dd11dbc..40af1c60001a2f7e63a9730e7a6ebc5cb0d21ae9 100644 (file)
@@ -65,14 +65,19 @@ apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s)
         return rv;\
     } } while(0)
 
-#define H2_MPLX_ENTER_ALWAYS(m)    \
-    apr_thread_mutex_lock(m->lock)
-
 #define H2_MPLX_LEAVE(m)    \
     apr_thread_mutex_unlock(m->lock)
  
+#define H2_MPLX_ENTER_ALWAYS(m)    \
+    apr_thread_mutex_lock(m->lock)
+
+#define H2_MPLX_ENTER_MAYBE(m, lock)    \
+    if (lock) apr_thread_mutex_lock(m->lock)
 
-static void check_data_for(h2_mplx *m, int stream_id);
+#define H2_MPLX_LEAVE_MAYBE(m, lock)    \
+    if (lock) apr_thread_mutex_unlock(m->lock)
+
+static void check_data_for(h2_mplx *m, h2_stream *stream, int lock);
 
 static void stream_output_consumed(void *ctx, 
                                    h2_bucket_beam *beam, apr_off_t length)
@@ -121,6 +126,7 @@ static void stream_cleanup(h2_mplx *m, h2_stream *stream)
     h2_stream_cleanup(stream);
 
     h2_iq_remove(m->q, stream->id);
+    h2_fifo_remove(m->readyq, stream);
     h2_ihash_remove(m->streams, stream->id);
     h2_ihash_add(m->shold, stream);
     
@@ -206,7 +212,12 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
         m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id));
         m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id));
         m->q = h2_iq_create(m->pool, m->max_streams);
-        m->readyq = h2_iq_create(m->pool, m->max_streams);
+
+        status = h2_fifo_set_create(&m->readyq, m->pool, m->max_streams);
+        if (status != APR_SUCCESS) {
+            apr_pool_destroy(m->pool);
+            return NULL;
+        }
 
         m->workers = workers;
         m->max_active = workers->max_workers;
@@ -481,15 +492,10 @@ h2_stream *h2_mplx_stream_get(h2_mplx *m, int id)
 
 static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
 {
-    h2_mplx *m = ctx;
+    h2_stream *stream = ctx;
+    h2_mplx *m = stream->session->mplx;
     
-    H2_MPLX_ENTER_ALWAYS(m);
-
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
-                  "h2_mplx(%ld-%d): output_produced", m->c->id, beam->id);
-    check_data_for(m, beam->id);
-
-    H2_MPLX_LEAVE(m);
+    check_data_for(m, stream, 1);
 }
 
 static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
@@ -513,7 +519,7 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
     }
     
     h2_beam_on_consumed(stream->output, NULL, stream_output_consumed, stream);
-    h2_beam_on_produced(stream->output, output_produced, m);
+    h2_beam_on_produced(stream->output, output_produced, stream);
     if (stream->task->output.copy_files) {
         h2_beam_on_file_beam(stream->output, h2_beam_no_files, NULL);
     }
@@ -523,7 +529,7 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
     
     /* we might see some file buckets in the output, see
      * if we have enough handles reserved. */
-    check_data_for(m, stream->id);
+    check_data_for(m, stream, 0);
     return status;
 }
 
@@ -563,7 +569,7 @@ static apr_status_t out_close(h2_mplx *m, h2_task *task)
     status = h2_beam_close(task->output.beam);
     h2_beam_log(task->output.beam, m->c, APLOG_TRACE2, "out_close");
     output_consumed_signal(m, task);
-    check_data_for(m, task->stream_id);
+    check_data_for(m, stream, 0);
     return status;
 }
 
@@ -577,7 +583,7 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
     if (m->aborted) {
         status = APR_ECONNABORTED;
     }
-    else if (apr_atomic_read32(&m->event_pending) > 0) {
+    else if (h2_mplx_has_master_events(m)) {
         status = APR_SUCCESS;
     }
     else {
@@ -597,13 +603,15 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
     return status;
 }
 
-static void check_data_for(h2_mplx *m, int stream_id)
+static void check_data_for(h2_mplx *m, h2_stream *stream, int lock)
 {
-    ap_assert(m);
-    h2_iq_append(m->readyq, stream_id);
-    apr_atomic_set32(&m->event_pending, 1);
-    if (m->added_output) {
-        apr_thread_cond_signal(m->added_output);
+    if (h2_fifo_push(m->readyq, stream) == APR_SUCCESS) {
+        apr_atomic_set32(&m->event_pending, 1);
+        H2_MPLX_ENTER_MAYBE(m, lock);
+        if (m->added_output) {
+            apr_thread_cond_signal(m->added_output);
+        }
+        H2_MPLX_LEAVE_MAYBE(m, lock);
     }
 }
 
@@ -656,7 +664,7 @@ apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream,
         h2_ihash_add(m->streams, stream);
         if (h2_stream_is_ready(stream)) {
             /* already have a response */
-            check_data_for(m, stream->id);
+            check_data_for(m, stream, 0);
             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
                           H2_STRM_MSG(stream, "process, add to readyq")); 
         }
@@ -828,7 +836,7 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
         if (stream->output) {
             h2_beam_mutex_disable(stream->output);
         }
-        check_data_for(m, stream->id);
+        check_data_for(m, stream, 0);
     }
     else if ((stream = h2_ihash_get(m->shold, task->stream_id)) != NULL) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
@@ -1161,48 +1169,33 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m,
                                             stream_ev_callback *on_resume, 
                                             void *on_ctx)
 {
-    int ids[100];
     h2_stream *stream;
-    size_t i, n;
+    int n;
     
-    H2_MPLX_ENTER(m);
-
     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
                   "h2_mplx(%ld): dispatch events", m->id);        
     apr_atomic_set32(&m->event_pending, 0);
-    purge_streams(m);
-    
+
     /* update input windows for streams */
     h2_ihash_iter(m->streams, report_consumption_iter, m);
     
-    if (!h2_iq_empty(m->readyq)) {
-        n = h2_iq_mshift(m->readyq, ids, H2_ALEN(ids));
-        for (i = 0; i < n; ++i) {
-            stream = h2_ihash_get(m->streams, ids[i]);
-            if (stream) {
-                H2_MPLX_LEAVE(m);
-
-                on_resume(on_ctx, stream);
-
-                H2_MPLX_ENTER(m);
-            }
-        }
+    n = h2_fifo_count(m->readyq);
+    while (n > 0 
+           && (h2_fifo_try_pull(m->readyq, (void**)&stream) == APR_SUCCESS)) {
+        --n;
+        on_resume(on_ctx, stream);
     }
-    if (!h2_iq_empty(m->readyq)) {
-        apr_atomic_set32(&m->event_pending, 1);
-    } 
-
+    
+    H2_MPLX_ENTER(m);
+    purge_streams(m);
     H2_MPLX_LEAVE(m);
+    
     return APR_SUCCESS;
 }
 
-apr_status_t h2_mplx_keep_active(h2_mplx *m, int stream_id)
+apr_status_t h2_mplx_keep_active(h2_mplx *m, h2_stream *stream)
 {
-    H2_MPLX_ENTER(m);
-
-    check_data_for(m, stream_id);
-
-    H2_MPLX_LEAVE(m);
+    check_data_for(m, stream, 1);
     return APR_SUCCESS;
 }
 
@@ -1215,7 +1208,8 @@ int h2_mplx_awaits_data(h2_mplx *m)
     if (h2_ihash_empty(m->streams)) {
         waiting = 0;
     }
-    if (h2_iq_empty(m->readyq) && h2_iq_empty(m->q) && !m->tasks_active) {
+    if ((h2_fifo_count(m->readyq) == 0) 
+        && h2_iq_empty(m->q) && !m->tasks_active) {
         waiting = 0;
     }
 
index 82a98fce0a83ea3942d53a3adc7ca3ae26971a30..ed332c8bc3d16efb55d99c9b71404df4ce9a353a 100644 (file)
@@ -68,7 +68,7 @@ struct h2_mplx {
     struct h2_ihash_t *spurge;      /* all streams done, ready for destroy */
     
     struct h2_iqueue *q;            /* all stream ids that need to be started */
-    struct h2_iqueue *readyq;       /* all stream ids ready for output */
+    struct h2_fifo *readyq;         /* all streams ready for output */
         
     struct h2_ihash_t *redo_tasks;  /* all tasks that need to be redone */
     
@@ -158,7 +158,7 @@ apr_status_t h2_mplx_stream_cleanup(h2_mplx *m, struct h2_stream *stream);
 apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
                                  struct apr_thread_cond_t *iowait);
 
-apr_status_t h2_mplx_keep_active(h2_mplx *m, int stream_id);
+apr_status_t h2_mplx_keep_active(h2_mplx *m, struct h2_stream *stream);
 
 /*******************************************************************************
  * Stream processing.
index 9d416cb7d7709cc6d0f52d3660f433149220bbc7..9784b4ec28d9feee5fefdc88e8c6351baa92b95b 100644 (file)
@@ -859,7 +859,7 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen,
     
     if (status == APR_EAGAIN) {
         /* TODO: ugly, someone needs to retrieve the response first */
-        h2_mplx_keep_active(stream->session->mplx, stream->id);
+        h2_mplx_keep_active(stream->session->mplx, stream);
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
                       H2_STRM_MSG(stream, "prep, response eagain"));
         return status;
index 0389193e88f9752fe9c773d838113e90f50efffc..a0b81fa0a8cd4a7827b599aa546ff7fd52d05f36 100644 (file)
@@ -438,12 +438,12 @@ int h2_iq_count(h2_iqueue *q)
 }
 
 
-void h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx)
+int h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx)
 {
     int i;
     
     if (h2_iq_contains(q, sid)) {
-        return;
+        return 0;
     }
     if (q->nelts >= q->nalloc) {
         iq_grow(q, q->nalloc * 2);
@@ -456,11 +456,12 @@ void h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx)
         /* bubble it to the front of the queue */
         iq_bubble_up(q, i, q->head, cmp, ctx);
     }
+    return 1;
 }
 
-void h2_iq_append(h2_iqueue *q, int sid)
+int h2_iq_append(h2_iqueue *q, int sid)
 {
-    h2_iq_add(q, sid, NULL, NULL);
+    return h2_iq_add(q, sid, NULL, NULL);
 }
 
 int h2_iq_remove(h2_iqueue *q, int sid)
@@ -612,6 +613,7 @@ int h2_iq_contains(h2_iqueue *q, int sid)
 struct h2_fifo {
     void **elems;
     int nelems;
+    int set;
     int head;
     int count;
     int aborted;
@@ -636,7 +638,20 @@ static apr_status_t fifo_destroy(void *data)
     return APR_SUCCESS;
 }
 
-apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity)
+static int index_of(h2_fifo *fifo, void *elem)
+{
+    int i;
+    
+    for (i = 0; i < fifo->count; ++i) {
+        if (elem == fifo->elems[nth_index(fifo, i)]) {
+            return i;
+        }
+    }
+    return -1;
+}
+
+static apr_status_t create_int(h2_fifo **pfifo, apr_pool_t *pool, 
+                               int capacity, int as_set)
 {
     apr_status_t rv;
     h2_fifo *fifo;
@@ -667,6 +682,7 @@ apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity)
         return APR_ENOMEM;
     }
     fifo->nelems = capacity;
+    fifo->set = as_set;
     
     *pfifo = fifo;
     apr_pool_cleanup_register(pool, fifo, fifo_destroy, apr_pool_cleanup_null);
@@ -674,6 +690,16 @@ apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity)
     return APR_SUCCESS;
 }
 
+apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity)
+{
+    return create_int(pfifo, pool, capacity, 0);
+}
+
+apr_status_t h2_fifo_set_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity)
+{
+    return create_int(pfifo, pool, capacity, 1);
+}
+
 apr_status_t h2_fifo_term(h2_fifo *fifo)
 {
     apr_status_t rv;
@@ -725,7 +751,12 @@ static apr_status_t fifo_push(h2_fifo *fifo, void *elem, int block)
     }
 
     if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
-        if (fifo->count == fifo->nelems) {
+        if (fifo->set && index_of(fifo, elem) >= 0) {
+            /* set mode, elem already member */
+            apr_thread_mutex_unlock(fifo->lock);
+            return APR_EEXIST;
+        }
+        else if (fifo->count == fifo->nelems) {
             if (block) {
                 while (fifo->count == fifo->nelems) {
                     if (fifo->aborted) {
index f6a4b9a43dfed5f261c46811fa9055f0ea3cd4a3..9b408fad3df47fb8f406d4ce3dd57ed48b4f1e36 100644 (file)
@@ -119,17 +119,19 @@ int h2_iq_count(h2_iqueue *q);
  * @param q the queue to append the id to
  * @param sid the stream id to add
  * @param cmp the comparator for sorting
- * @param ctx user data for comparator 
+ * @param ctx user data for comparator
+ * @return != 0 iff id was not already there 
  */
-void h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx);
+int h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx);
 
 /**
  * Append the id to the queue if not already present. 
  *
  * @param q the queue to append the id to
  * @param sid the id to append
+ * @return != 0 iff id was not already there 
  */
-void h2_iq_append(h2_iqueue *q, int sid);
+int h2_iq_append(h2_iqueue *q, int sid);
 
 /**
  * Remove the stream id from the queue. Return != 0 iff task
@@ -193,12 +195,32 @@ int h2_iq_contains(h2_iqueue *q, int sid);
  */
 typedef struct h2_fifo h2_fifo;
 
+/**
+ * Create a FIFO queue that can hold up to capacity elements. Elements can
+ * appear several times.
+ */
 apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity);
+
+/**
+ * Create a FIFO set that can hold up to capacity elements. Elements only
+ * appear once. Pushing an element already present does not change the
+ * queue and is successful.
+ */
+apr_status_t h2_fifo_set_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity);
+
 apr_status_t h2_fifo_term(h2_fifo *fifo);
 apr_status_t h2_fifo_interrupt(h2_fifo *fifo);
 
 int h2_fifo_count(h2_fifo *fifo);
 
+/**
+ * Push en element into the queue. Blocks if there is no capacity left.
+ * 
+ * @param fifo the FIFO queue
+ * @param elem the element to push
+ * @return APR_SUCCESS on push, APR_EAGAIN on try_push on a full queue,
+ *         APR_EEXIST when in set mode and elem already there.
+ */
 apr_status_t h2_fifo_push(h2_fifo *fifo, void *elem);
 apr_status_t h2_fifo_try_push(h2_fifo *fifo, void *elem);
 
index 8eea35caeaf8c23fc21f817b1d8f72f3d8d0c719..d14502341a011bff773703b5760f3a8fe07354db 100644 (file)
@@ -107,7 +107,7 @@ static apr_status_t activate_slot(h2_workers *workers, h2_slot *slot)
         return APR_ENOMEM;
     }
     
-    ++workers->worker_count;
+    apr_atomic_inc32(&workers->worker_count);
     return APR_SUCCESS;
 }
 
@@ -142,7 +142,7 @@ static void cleanup_zombies(h2_workers *workers)
             apr_thread_join(&status, slot->thread);
             slot->thread = NULL;
         }
-        --workers->worker_count;
+        apr_atomic_dec32(&workers->worker_count);
         push_slot(&workers->free, slot);
     }
 }
@@ -199,14 +199,10 @@ static apr_status_t get_next(h2_slot *slot)
         
         cleanup_zombies(workers);
 
-        ++workers->idle_workers;
-
         apr_thread_mutex_lock(slot->lock);
         push_slot(&workers->idle, slot);
         apr_thread_cond_wait(slot->not_idle, slot->lock);
         apr_thread_mutex_unlock(slot->lock);
-
-        --workers->idle_workers;
     }
     return APR_EOF;
 }
index 30a7514cd0cdf2028dc420b0902140f09a60ddfe..7964b3c3aa6a130f168c5fd5f9304a42136a24c1 100644 (file)
@@ -40,8 +40,6 @@ struct h2_workers {
     int next_worker_id;
     int min_workers;
     int max_workers;
-    int worker_count;
-    int idle_workers;
     int max_idle_secs;
     
     int aborted;
@@ -51,6 +49,8 @@ struct h2_workers {
     int nslots;
     struct h2_slot *slots;
     
+    volatile apr_uint32_t worker_count;
+    
     struct h2_slot *free;
     struct h2_slot *idle;
     struct h2_slot *zombies;