]> granicus.if.org Git - apache/commitdiff
mod_proxy_http2: start of some sort of flow control, mod_http2: keeping spare allocat...
authorStefan Eissing <icing@apache.org>
Fri, 26 Feb 2016 13:26:25 +0000 (13:26 +0000)
committerStefan Eissing <icing@apache.org>
Fri, 26 Feb 2016 13:26:25 +0000 (13:26 +0000)
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1732477 13f79535-47bb-0310-9956-ffa450edef68

15 files changed:
modules/http2/h2_conn.c
modules/http2/h2_conn.h
modules/http2/h2_mplx.c
modules/http2/h2_mplx.h
modules/http2/h2_proxy_session.c
modules/http2/h2_proxy_session.h
modules/http2/h2_push.c
modules/http2/h2_task.c
modules/http2/h2_task.h
modules/http2/h2_task_output.c
modules/http2/h2_util.c
modules/http2/h2_util.h
modules/http2/h2_worker.c
modules/http2/mod_http2.h
modules/http2/mod_proxy_http2.c

index f6f814e6346be81e7a732ab19c8a9fe44f23d593..e7bb1dd1e10a6e6b746e0d1ebabaacafed384783 100644 (file)
@@ -44,6 +44,7 @@ static struct h2_workers *workers;
 static h2_mpm_type_t mpm_type = H2_MPM_UNKNOWN;
 static module *mpm_module;
 static int async_mpm;
+static apr_socket_t *dummy_socket;
 
 static void check_modules(int force) 
 {
@@ -154,7 +155,12 @@ apr_status_t h2_conn_child_init(apr_pool_t *pool, server_rec *s)
                              NULL, AP_FTYPE_CONNECTION);
    
     status = h2_mplx_child_init(pool, s);
-    
+
+    if (status == APR_SUCCESS) {
+        status = apr_socket_create(&dummy_socket, APR_INET, SOCK_STREAM,
+                                   APR_PROTO_TCP, pool);
+    }
+
     return status;
 }
 
@@ -234,22 +240,30 @@ apr_status_t h2_conn_pre_close(struct h2_ctx *ctx, conn_rec *c)
 }
 
 
-conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *p
-                          apr_thread_t *thread, apr_socket_t *socket)
+conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *parent,
+                          apr_allocator_t *allocator)
 {
+    apr_pool_t *pool;
     conn_rec *c;
     void *cfg;
     
     AP_DEBUG_ASSERT(master);
     ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, master,
-                  "h2_conn(%ld): created from master", master->id);
+                  "h2_conn(%ld): create slave", master->id);
     
-    /* This is like the slave connection creation from 2.5-DEV. A
-     * very efficient way - not sure how compatible this is, since
-     * the core hooks are no longer run.
-     * But maybe it's is better this way, not sure yet.
+    /* We create a pool with its own allocator to be used for
+     * processing a request. This is the only way to have the processing
+     * independant of its parent pool in the sense that it can work in
+     * another thread.
      */
-    c = (conn_rec *) apr_palloc(p, sizeof(conn_rec));
+    if (!allocator) {
+        apr_allocator_create(&allocator);
+    }
+    apr_pool_create_ex(&pool, parent, NULL, allocator);
+    apr_pool_tag(pool, "h2_slave_conn");
+    apr_allocator_owner_set(allocator, parent);
+    
+    c = (conn_rec *) apr_palloc(pool, sizeof(conn_rec));
     if (c == NULL) {
         ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, master, 
                       APLOGNO(02913) "h2_task: creating conn");
@@ -260,13 +274,12 @@ conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *p,
            
     /* Replace these */
     c->master                 = master;
-    c->pool                   = p;   
-    c->current_thread         = thread;
-    c->conn_config            = ap_create_conn_config(p);
-    c->notes                  = apr_table_make(p, 5);
+    c->pool                   = pool;   
+    c->conn_config            = ap_create_conn_config(pool);
+    c->notes                  = apr_table_make(pool, 5);
     c->input_filters          = NULL;
     c->output_filters         = NULL;
-    c->bucket_alloc           = apr_bucket_alloc_create(p);
+    c->bucket_alloc           = apr_bucket_alloc_create(pool);
     c->data_in_input_filters  = 0;
     c->data_in_output_filters = 0;
     c->clogging_input_filters = 1;
@@ -274,11 +287,18 @@ conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *p,
     c->log_id                 = NULL;
     /* Simulate that we had already a request on this connection. */
     c->keepalives             = 1;
-    
+    /* We cannot install the master connection socket on the slaves, as
+     * modules mess with timeouts/blocking of the socket, with
+     * unwanted side effects to the master connection processing.
+     * Fortunately, since we never use the slave socket, we can just install
+     * a single, process-wide dummy and everyone is happy.
+     */
+    ap_set_module_config(c->conn_config, &core_module, dummy_socket);
     /* TODO: these should be unique to this thread */
     c->sbh                    = master->sbh;
-    
-    ap_set_module_config(c->conn_config, &core_module, socket);
+    /* TODO: not all mpm modules have learned about slave connections yet.
+     * copy their config from master to slave.
+     */
     if (h2_conn_mpm_module()) {
         cfg = ap_get_module_config(master->conn_config, h2_conn_mpm_module());
         ap_set_module_config(c->conn_config, h2_conn_mpm_module(), cfg);
@@ -287,3 +307,14 @@ conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *p,
     return c;
 }
 
+void h2_slave_destroy(conn_rec *slave, apr_allocator_t **pallocator)
+{
+    apr_allocator_t *allocator = apr_pool_allocator_get(slave->pool);
+    apr_pool_destroy(slave->pool);
+    if (pallocator) {
+        *pallocator = allocator;
+    }
+    else {
+        apr_allocator_destroy(allocator);
+    }
+}
index 0ffcf3b08d84d337fb852e7387d86ef8b9ac9f44..023eecaaca075a708bb5603fa18bb3e6ccae0be9 100644 (file)
@@ -66,7 +66,8 @@ typedef enum {
 h2_mpm_type_t h2_conn_mpm_type(void);
 
 
-conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *p, 
-                          apr_thread_t *thread, apr_socket_t *socket);
+conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *parent,
+                          apr_allocator_t *allocator);
+void h2_slave_destroy(conn_rec *slave, apr_allocator_t **pallocator);
 
 #endif /* defined(__mod_h2__h2_conn__) */
index f18b3437aa4b9982d3cde7ed05a4e4a5f93f8afb..2d4c3ae36927c3e7bc1a5b9a2f6273e8ff05d808 100644 (file)
@@ -203,13 +203,6 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
             return NULL;
         }
     
-        status = apr_socket_create(&m->dummy_socket, APR_INET, SOCK_STREAM,
-                                   APR_PROTO_TCP, m->pool);
-        if (status != APR_SUCCESS) {
-            h2_mplx_destroy(m);
-            return NULL;
-        }
-
         m->q = h2_iq_create(m->pool, h2_config_geti(conf, H2_CONF_MAX_STREAMS));
         m->stream_ios = h2_io_set_create(m->pool);
         m->ready_ios = h2_io_set_create(m->pool);
@@ -1061,26 +1054,9 @@ static h2_task *pop_task(h2_mplx *m)
         && (sid = h2_iq_shift(m->q)) > 0) {
         h2_io *io = h2_io_set_get(m->stream_ios, sid);
         if (io) {
-            conn_rec *c;
-            apr_pool_t *task_pool;
-            apr_allocator_t *task_allocator = NULL;
-            
-            /* We create a pool with its own allocator to be used for
-             * processing a request. This is the only way to have the processing
-             * independant of the worker pool as the h2_mplx pool as well as
-             * not sensitive to which thread it is in.
-             * In that sense, memory allocation and lifetime is similar to a master
-             * connection.
-             * The main goal in this is that slave connections and requests will
-             * - one day - be suspended and resumed in different threads.
-             */
-            apr_allocator_create(&task_allocator);
-            apr_pool_create_ex(&task_pool, io->pool, NULL, task_allocator);
-            apr_pool_tag(task_pool, "h2_task");
-            apr_allocator_owner_set(task_allocator, task_pool);
-            
-            c = h2_slave_create(m->c, task_pool, m->c->current_thread, m->dummy_socket);
-            task = h2_task_create(m->id, io->request, c, m);
+            conn_rec *slave = h2_slave_create(m->c, m->pool, m->spare_allocator);
+            m->spare_allocator = NULL;
+            task = h2_task_create(m->id, io->request, slave, m);
             
             io->processing_started = 1;
             if (sid > m->max_stream_started) {
@@ -1133,10 +1109,15 @@ static void task_done(h2_mplx *m, h2_task *task)
             /* TODO: this will keep a worker attached to this h2_mplx as
              * long as it has requests to handle. Might no be fair to
              * other mplx's. Perhaps leave after n requests? */
+            h2_mplx_out_close(m, task->stream_id, NULL);
+            if (m->spare_allocator) {
+                apr_allocator_destroy(m->spare_allocator);
+                m->spare_allocator = NULL;
+            }
+            h2_slave_destroy(task->c, &m->spare_allocator);
             task = NULL;
             if (io) {
                 io->processing_done = 1;
-                h2_mplx_out_close(m, io->id, NULL);
                 if (io->orphaned) {
                     io_destroy(m, io, 0);
                     if (m->join_wait) {
@@ -1288,6 +1269,8 @@ apr_status_t h2_mplx_engine_push(const char *engine_type,
                                                m->id, m->next_eng_id++);
                 engine->pub.pool = task->c->pool;
                 engine->pub.type = apr_pstrdup(task->c->pool, engine_type);
+                engine->pub.window_bits = 30;
+                engine->pub.req_window_bits = h2_log2(m->stream_max_mem);
                 engine->c = r->connection;
                 APR_RING_INIT(&engine->entries, h2_req_entry, link);
                 engine->m = m;
index 12bb2d39ace719412a093b2b67902ec98e02da55..8dff6e0853770ebadb87f1e1dce85270cdf390ae 100644 (file)
@@ -82,12 +82,13 @@ struct h2_mplx {
     struct apr_thread_cond_t *added_output;
     struct apr_thread_cond_t *task_done;
     struct apr_thread_cond_t *join_wait;
-    apr_socket_t *dummy_socket;
     
     apr_size_t stream_max_mem;
     apr_interval_time_t stream_timeout;
     
     apr_pool_t *spare_pool;           /* spare pool, ready for next io */
+    apr_allocator_t *spare_allocator;
+    
     struct h2_workers *workers;
     apr_size_t tx_handles_reserved;
     apr_size_t tx_chunk_size;
index 9a1f808927f667d3b22fa48d3a7483c43f874130..598de3e3931b8523528cc87550a609e72c622057 100644 (file)
@@ -485,6 +485,8 @@ static ssize_t stream_data_read(nghttp2_session *ngh2, int32_t stream_id,
 
 h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn,
                                          proxy_server_conf *conf,
+                                         unsigned char window_bits_connection,
+                                         unsigned char window_bits_stream,
                                          h2_proxy_request_done *done)
 {
     if (!p_conn->data) {
@@ -503,8 +505,8 @@ h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn,
         session->conf = conf;
         session->pool = p_conn->scpool;
         session->state = H2_PROXYS_ST_INIT;
-        session->window_bits_default    = 30;
-        session->window_bits_connection = 30;
+        session->window_bits_stream = window_bits_stream;
+        session->window_bits_connection = window_bits_connection;
         session->streams = h2_ihash_create(pool, offsetof(h2_proxy_stream, id));
         session->suspended = h2_iq_create(pool, 5);
         session->done = done;
@@ -543,7 +545,7 @@ static apr_status_t session_start(h2_proxy_session *session)
     settings[0].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH;
     settings[0].value = 0;
     settings[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
-    settings[1].value = (1 << session->window_bits_default) - 1;
+    settings[1].value = (1 << session->window_bits_stream) - 1;
     
     rv = nghttp2_submit_settings(session->ngh2, NGHTTP2_FLAG_NONE, settings, 
                                  H2_ALEN(settings));
index d4f68b3a19c6797afd8d53f7c111632197d561de..3fad2b6003a0a6b970a9764e9d88d46c8502e575 100644 (file)
@@ -64,8 +64,8 @@ struct h2_proxy_session {
     h2_proxy_request_done *done;
     void *user_data;
     
-    int window_bits_default;
-    int window_bits_connection;
+    unsigned char window_bits_stream;
+    unsigned char window_bits_connection;
 
     h2_proxys_state state;
     apr_interval_time_t wait_timeout;
@@ -81,6 +81,8 @@ struct h2_proxy_session {
 
 h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn,
                                          proxy_server_conf *conf,
+                                         unsigned char window_bits_connection,
+                                         unsigned char window_bits_stream,
                                          h2_proxy_request_done *done);
 
 apr_status_t h2_proxy_session_submit(h2_proxy_session *s, const char *url,
index a8b7c8591c9fbb4ea823b4abbf288ebf33408557..82615afdbbbd9143b532e5dae8db29ab4ee87f53 100644 (file)
@@ -328,9 +328,7 @@ static int add_push(link_ctx *ctx)
                  * TLS (if any) parameters.
                  */
                 path = apr_uri_unparse(ctx->pool, &uri, APR_URI_UNP_OMITSITEPART);
-                
                 push = apr_pcalloc(ctx->pool, sizeof(*push));
-                
                 switch (ctx->req->push_policy) {
                     case H2_PUSH_HEAD:
                         method = "HEAD";
@@ -701,36 +699,6 @@ apr_array_header_t *h2_push_collect_update(h2_stream *stream,
     return h2_push_diary_update(stream->session, pushes);
 }
 
-/* h2_log2(n) iff n is a power of 2 */
-static unsigned char h2_log2(apr_uint32_t n)
-{
-    int lz = 0;
-    if (!n) {
-        return 0;
-    }
-    if (!(n & 0xffff0000u)) {
-        lz += 16;
-        n = (n << 16);
-    }
-    if (!(n & 0xff000000u)) {
-        lz += 8;
-        n = (n << 8);
-    }
-    if (!(n & 0xf0000000u)) {
-        lz += 4;
-        n = (n << 4);
-    }
-    if (!(n & 0xc0000000u)) {
-        lz += 2;
-        n = (n << 2);
-    }
-    if (!(n & 0x80000000u)) {
-        lz += 1;
-    }
-    
-    return 31 - lz;
-}
-
 static apr_int32_t h2_log2inv(unsigned char log2)
 {
     return log2? (1 << log2) : 1;
index 7299b299cf6f99bc2e66a956e6ff1eacc5d485e8..06543d670c82ef4d1bf111312eccad168e77946c 100644 (file)
@@ -95,8 +95,7 @@ static apr_status_t h2_response_freeze_filter(ap_filter_t* f,
     if (task->frozen) {
         ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, f->r,
                       "h2_response_freeze_filter, saving");
-        APR_BRIGADE_CONCAT(task->frozen_out, bb);
-        return APR_SUCCESS;
+        return ap_save_brigade(f, &task->frozen_out, &bb, task->c->pool);
     }
     
     if (APR_BRIGADE_EMPTY(bb)) {
@@ -204,8 +203,7 @@ h2_task *h2_task_create(long session_id, const h2_request *req,
     return task;
 }
 
-apr_status_t h2_task_do(h2_task *task, apr_thread_cond_t *cond, 
-                        apr_socket_t *socket)
+apr_status_t h2_task_do(h2_task *task, apr_thread_cond_t *cond)
 {
     apr_status_t status;
     
@@ -214,7 +212,7 @@ apr_status_t h2_task_do(h2_task *task, apr_thread_cond_t *cond,
     task->input = h2_task_input_create(task, task->c);
     task->output = h2_task_output_create(task, task->c);
     
-    ap_process_connection(task->c, socket);
+    ap_process_connection(task->c, ap_get_conn_socket(task->c));
     
     if (task->frozen) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
index f2cc6dfd8acd53df895d1b98dcc0a393898559b0..24bde946f302460fb0c8b448feedc0c87aac6bae 100644 (file)
@@ -69,7 +69,7 @@ struct h2_task {
 h2_task *h2_task_create(long session_id, const struct h2_request *req, 
                         conn_rec *c, struct h2_mplx *mplx);
 
-apr_status_t h2_task_do(h2_task *task, struct apr_thread_cond_t *cond, apr_socket_t *socket);
+apr_status_t h2_task_do(h2_task *task, struct apr_thread_cond_t *cond);
 
 void h2_task_register_hooks(void);
 /*
index b717fc3d6ecbc1a2497f15866ff6037b4ef6bb24..0cf3d355e06325790fd3a6d517040b3895b44f31 100644 (file)
@@ -145,8 +145,8 @@ apr_status_t h2_task_output_write(h2_task_output *output,
     if (output->task->frozen) {
         h2_util_bb_log(output->c, output->task->stream_id, APLOG_TRACE2,
                        "frozen task output write", bb);
-        APR_BRIGADE_CONCAT(output->task->frozen_out, bb);
-        return APR_SUCCESS;
+        return ap_save_brigade(f, &output->task->frozen_out, &bb, 
+                               output->c->pool);
     }
     
     status = open_if_needed(output, f, bb, "write");
index 52c858e609583d8d37f682b0422305b06328e762..54e6a2ab0bebfda61038bda263bdcf7608e1ace1 100644 (file)
 #include "h2_request.h"
 #include "h2_util.h"
 
+/* h2_log2(n) iff n is a power of 2 */
+unsigned char h2_log2(apr_uint32_t n)
+{
+    int lz = 0;
+    if (!n) {
+        return 0;
+    }
+    if (!(n & 0xffff0000u)) {
+        lz += 16;
+        n = (n << 16);
+    }
+    if (!(n & 0xff000000u)) {
+        lz += 8;
+        n = (n << 8);
+    }
+    if (!(n & 0xf0000000u)) {
+        lz += 4;
+        n = (n << 4);
+    }
+    if (!(n & 0xc0000000u)) {
+        lz += 2;
+        n = (n << 2);
+    }
+    if (!(n & 0x80000000u)) {
+        lz += 1;
+    }
+    
+    return 31 - lz;
+}
+
 size_t h2_util_hex_dump(char *buffer, size_t maxlen,
                         const char *data, size_t datalen)
 {
index cd2d8a12e363e2873c01f40690968a7acc400a34..97417f72616e1259f27c86400e499fb3b7617609 100644 (file)
@@ -66,6 +66,9 @@ void h2_ihash_clear(h2_ihash_t *ih);
 /*******************************************************************************
  * common helpers
  ******************************************************************************/
+/* h2_log2(n) iff n is a power of 2 */
+unsigned char h2_log2(apr_uint32_t n);
+
 /**
  * Count the bytes that all key/value pairs in a table have
  * in length (exlucding terminating 0s), plus additional extra per pair.
index 75d0ead91670c1f652bc5845c4453b225fec1ba4..23466e864ff81716ca69ae653e83ce1c1fd3dfce 100644 (file)
@@ -43,7 +43,7 @@ static void* APR_THREAD_FUNC execute(apr_thread_t *thread, void *wctx)
         /* Get a h2_task from the main workers queue. */
         status = worker->get_next(worker, worker->ctx, &task, &sticky);
         while (task) {
-            h2_task_do(task, worker->io, task->mplx->dummy_socket);
+            h2_task_do(task, worker->io);
             
             /* if someone was waiting on this task, time to wake up */
             apr_thread_cond_signal(worker->io);
index edacd0f134963f0454e29f8dc4990456153964bb..ae135293101ec029eadd5ecebf03e8a87db8e22f 100644 (file)
@@ -56,7 +56,12 @@ struct h2_req_engine {
     const char *id;        /* identifier */
     apr_pool_t *pool;      /* pool for engine specific allocations */
     const char *type;      /* name of the engine type */
-    apr_size_t capacity;   /* number of max assigned requests */
+    unsigned char window_bits;/* preferred size of overall response data
+                            * mod_http2 is willing to buffer as log2 */
+    unsigned char req_window_bits;/* preferred size of response body data
+                            * mod_http2 is willing to buffer per request,
+                            * as log2 */
+    apr_size_t capacity;   /* maximum concurrent requests */
     void *user_data;       /* user specific data */
 };
 
index a2d80d32e8bdd9b2012f7b455c402e85fccc9eb0..4a3c4cc95eeb6a1d61b16434ed42ddcb19287ccb 100644 (file)
@@ -300,8 +300,10 @@ setup_backend:
      */
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, ctx->owner, 
                   "eng(%s): setup session", ctx->engine->id);
-    session = h2_proxy_session_setup(ctx->engine->id, ctx->p_conn, 
-                                     ctx->conf, request_done);
+    session = h2_proxy_session_setup(ctx->engine->id, ctx->p_conn, ctx->conf, 
+                                     ctx->engine->window_bits, 
+                                     ctx->engine->req_window_bits, 
+                                     request_done);
     if (!session) {
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->p_conn->connection, 
                       "session unavailable");
@@ -470,6 +472,8 @@ static int proxy_http2_handler(request_rec *r,
         engine->type = engine_type;
         engine->pool = p;
         engine->capacity = 1;
+        engine->window_bits = 30;
+        engine->req_window_bits = 16;
         ctx->engine = engine;
         ctx->standalone = 1;
         ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,