]> granicus.if.org Git - apache/commitdiff
sharing bucket_alloc for all streams inside mplx, explicit lifetime handling of EOR...
authorStefan Eissing <icing@apache.org>
Mon, 14 Mar 2016 16:43:52 +0000 (16:43 +0000)
committerStefan Eissing <icing@apache.org>
Mon, 14 Mar 2016 16:43:52 +0000 (16:43 +0000)
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1734957 13f79535-47bb-0310-9956-ffa450edef68

modules/http2/h2_conn.c
modules/http2/h2_io.c
modules/http2/h2_io.h
modules/http2/h2_mplx.c
modules/http2/h2_mplx.h
modules/http2/h2_ngn_shed.c
modules/http2/h2_task_output.c
modules/http2/h2_task_output.h
modules/http2/h2_util.c

index a0cd54e6ac7b4a2d5c646f9826b468f7e5bab59a..60e209492c4a6e0726f560109483c2b70a9dc99a 100644 (file)
@@ -261,7 +261,7 @@ conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *parent,
     }
     apr_pool_create_ex(&pool, parent, NULL, allocator);
     apr_pool_tag(pool, "h2_slave_conn");
-    apr_allocator_owner_set(allocator, parent);
+    apr_allocator_owner_set(allocator, pool);
     
     c = (conn_rec *) apr_palloc(pool, sizeof(conn_rec));
     if (c == NULL) {
@@ -309,15 +309,18 @@ conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *parent,
 
 void h2_slave_destroy(conn_rec *slave, apr_allocator_t **pallocator)
 {
+    apr_pool_t *parent;
     apr_allocator_t *allocator = apr_pool_allocator_get(slave->pool);
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, slave,
                   "h2_slave_conn(%ld): destroy (task=%s)", slave->id,
                   apr_table_get(slave->notes, H2_TASK_ID_NOTE));
-    apr_pool_destroy(slave->pool);
-    if (pallocator) {
+    /* Attache the allocator to the parent pool and return it for
+     * reuse, otherwise the own is still the slave pool and it will
+     * get destroyed with it. */
+    parent = apr_pool_parent_get(slave->pool);
+    if (pallocator && parent) {
+        apr_allocator_owner_set(allocator, parent);
         *pallocator = allocator;
     }
-    else {
-        apr_allocator_destroy(allocator);
-    }
+    apr_pool_destroy(slave->pool);
 }
index d66558e6172f1c44b1cf63355ed986bdca299e49..39ebad3a793bf585a46523c02974da156185a5fd 100644 (file)
@@ -23,6 +23,7 @@
 #include <http_core.h>
 #include <http_log.h>
 #include <http_connection.h>
+#include <http_request.h>
 
 #include "h2_private.h"
 #include "h2_h2.h"
 #include "h2_task.h"
 #include "h2_util.h"
 
-h2_io *h2_io_create(int id, apr_pool_t *pool, const h2_request *request)
+h2_io *h2_io_create(int id, apr_pool_t *pool, 
+                    apr_bucket_alloc_t *bucket_alloc,
+                    const h2_request *request)
 {
     h2_io *io = apr_pcalloc(pool, sizeof(*io));
     if (io) {
         io->id = id;
         io->pool = pool;
-        io->bucket_alloc = apr_bucket_alloc_create(pool);
+        io->bucket_alloc = bucket_alloc;
         io->request = h2_request_clone(pool, request);
     }
     return io;
@@ -413,28 +416,36 @@ apr_status_t h2_io_out_write(h2_io *io, apr_bucket_brigade *bb,
                              apr_size_t *pfile_buckets_allowed)
 {
     apr_status_t status;
+    apr_bucket *b;
     int start_allowed;
     
     if (io->rst_error) {
         return APR_ECONNABORTED;
     }
 
+    if (!io->eor) {
+        /* Filter the EOR bucket and set it aside. We prefer to tear down
+         * the request when the whole h2 stream is done */
+        for (b = APR_BRIGADE_FIRST(bb);
+             b != APR_BRIGADE_SENTINEL(bb);
+             b = APR_BUCKET_NEXT(b))
+        {
+            if (AP_BUCKET_IS_EOR(b)) {
+                APR_BUCKET_REMOVE(b);
+                io->eor = b;
+                break;
+            }
+        }     
+    }
+    
     if (io->eos_out) {
-        apr_off_t len;
+        apr_off_t len = 0;
         /* We have already delivered an EOS bucket to a reader, no
          * sense in storing anything more here.
          */
-        status = apr_brigade_length(bb, 1, &len);
-        if (status == APR_SUCCESS) {
-            if (len > 0) {
-                /* someone tries to write real data after EOS, that
-                 * does not look right. */
-                status = APR_EOF;
-            }
-            /* cleanup, as if we had moved the data */
-            apr_brigade_cleanup(bb);
-        }
-        return status;
+        apr_brigade_length(bb, 0, &len);
+        apr_brigade_cleanup(bb);
+        return (len > 0)? APR_EOF : APR_SUCCESS;
     }
 
     process_trailers(io, trailers);
index a602c0952ef4ed2e7a0191f05f06998362a66c83..90d0cde8f2ea1405c312d166b7ab932b6c966f18 100644 (file)
@@ -44,6 +44,9 @@ struct h2_io {
     struct h2_response *response;    /* response to request */
     int rst_error;                   /* h2 related stream abort error */
 
+    apr_bucket *eor;                 /* the EOR bucket, set aside */
+    struct h2_task *task;            /* the task once started */
+    
     apr_bucket_brigade *bbin;        /* input data for stream */
     apr_bucket_brigade *bbout;       /* output data from stream */
     apr_bucket_brigade *bbtmp;       /* temporary data for chunking */
@@ -77,7 +80,9 @@ struct h2_io {
 /**
  * Creates a new h2_io for the given stream id. 
  */
-h2_io *h2_io_create(int id, apr_pool_t *pool, const struct h2_request *request);
+h2_io *h2_io_create(int id, apr_pool_t *pool, 
+                    apr_bucket_alloc_t *bucket_alloc, 
+                    const struct h2_request *request);
 
 /**
  * Set the response of this stream.
index b60d328959bf7d89bc3b5febba215fa6c3cc54d9..f77a404e7a33d2353eb0ba3665572dfffa64b62c 100644 (file)
@@ -201,6 +201,7 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
             return NULL;
         }
     
+        m->bucket_alloc = apr_bucket_alloc_create(m->pool);
         m->max_streams = h2_config_geti(conf, H2_CONF_MAX_STREAMS);
         m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
         m->q = h2_iq_create(m->pool, m->max_streams);
@@ -266,7 +267,7 @@ static int io_process_events(h2_mplx *m, h2_io *io)
 
 static void io_destroy(h2_mplx *m, h2_io *io, int events)
 {
-    apr_pool_t *pool = io->pool;
+    apr_pool_t *pool;
     
     /* cleanup any buffered input */
     h2_io_in_shutdown(io);
@@ -275,7 +276,6 @@ static void io_destroy(h2_mplx *m, h2_io *io, int events)
         io_process_events(m, io);
     }
     
-    io->pool = NULL;    
     /* The pool is cleared/destroyed which also closes all
      * allocated file handles. Give this count back to our
      * file handle pool. */
@@ -286,7 +286,19 @@ static void io_destroy(h2_mplx *m, h2_io *io, int events)
     if (m->redo_ios) {
         h2_io_set_remove(m->redo_ios, io);
     }
-    
+
+    if (io->task) {
+        if (m->spare_allocator) {
+            apr_allocator_destroy(m->spare_allocator);
+            m->spare_allocator = NULL;
+        }
+        
+        h2_slave_destroy(io->task->c, &m->spare_allocator);
+        io->task = NULL;
+    }
+
+    pool = io->pool;
+    io->pool = NULL;    
     if (pool) {
         apr_pool_clear(pool);
         if (m->spare_pool) {
@@ -856,12 +868,17 @@ apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id, apr_table_t *trailers)
                               "h2_mplx(%ld-%d): close, no response, no rst", 
                               m->id, io->id);
             }
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
-                          "h2_mplx(%ld-%d): close with trailers=%s", 
-                          m->id, io->id, trailers? "yes" : "no");
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
+                          "h2_mplx(%ld-%d): close with eor=%s, trailers=%s", 
+                          m->id, io->id, io->eor? "yes" : "no", 
+                          trailers? "yes" : "no");
             status = h2_io_out_close(io, trailers);
             H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_close");
             
+            if (io->eor) {
+                apr_bucket_delete(io->eor);
+                io->eor = NULL;
+            }
             have_out_data_for(m, stream_id);
         }
         else {
@@ -987,7 +1004,7 @@ static h2_io *open_io(h2_mplx *m, int stream_id, const h2_request *request)
         m->spare_pool = NULL;
     }
     
-    io = h2_io_create(stream_id, io_pool, request);
+    io = h2_io_create(stream_id, io_pool, m->bucket_alloc, request);
     h2_io_set_add(m->stream_ios, io);
     
     return io;
@@ -1044,9 +1061,9 @@ static h2_task *pop_task(h2_mplx *m)
             }
         }
         else if (io) {
-            conn_rec *slave = h2_slave_create(m->c, m->pool, m->spare_allocator);
+            conn_rec *slave = h2_slave_create(m->c, io->pool, m->spare_allocator);
             m->spare_allocator = NULL;
-            task = h2_task_create(m->id, io->request, slave, m);
+            io->task = task = h2_task_create(m->id, io->request, slave, m);
             apr_table_setn(slave->notes, H2_TASK_ID_NOTE, task->id);
             io->worker_started = 1;
             io->started_at = apr_time_now();
@@ -1119,14 +1136,6 @@ static void task_done(h2_mplx *m, h2_task *task)
                 h2_ngn_shed_done_ngn(m->ngn_shed, task->engine);
             }
             
-            if (m->spare_allocator) {
-                apr_allocator_destroy(m->spare_allocator);
-                m->spare_allocator = NULL;
-            }
-            
-            h2_slave_destroy(task->c, &m->spare_allocator);
-            task = NULL;
-            
             if (io) {
                 apr_time_t now = apr_time_now();
                 if (!io->orphaned && m->redo_ios
@@ -1168,9 +1177,14 @@ static void task_done(h2_mplx *m, h2_task *task)
                     }
                 }
                 else {
-                    /* hang around until the stream deregisteres */
+                    /* hang around until the stream deregisters */
                 }
             }
+            else {
+                ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+                              "h2_mplx(%ld): task %s without corresp. h2_io",
+                              m->id, task->id);
+            }
         }
     }
 }
@@ -1410,7 +1424,6 @@ void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn)
                 /* cannot report that as done until engine returns */
             }
             else {
-                h2_task_output_close(task->output);
                 task_done(m, task);
             }
             leave_mutex(m, acquired);
index e33d5e5a2ad237283321d05a8fcc43ef59293265..f50239c3a1d9e4b1e47e98c2613539ea4a6681fe 100644 (file)
@@ -67,6 +67,7 @@ struct h2_mplx {
     volatile int refs;
     conn_rec *c;
     apr_pool_t *pool;
+    apr_bucket_alloc_t *bucket_alloc;
 
     unsigned int aborted : 1;
     unsigned int need_registration : 1;
index 5b97cf914d2bf81f2998929321e8b185354d0a43..79ca72e846aa4cc8fd15d245475478923a88c4aa 100644 (file)
@@ -261,8 +261,7 @@ apr_status_t h2_ngn_shed_pull_req(h2_ngn_shed *shed,
 }
                                  
 static apr_status_t ngn_done_task(h2_ngn_shed *shed, h2_req_engine *ngn, 
-                                  h2_task *task, int waslive, int aborted, 
-                                  int close)
+                                  h2_task *task, int waslive, int aborted)
 {
     ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c,
                   "h2_ngn_shed(%ld): task %s %s by %s", 
@@ -271,16 +270,13 @@ static apr_status_t ngn_done_task(h2_ngn_shed *shed, h2_req_engine *ngn,
     if (waslive) ngn->no_live--;
     ngn->no_assigned--;
 
-    if (close) {
-        h2_task_output_close(task->output);
-    }
     return APR_SUCCESS;
 }
                                 
 apr_status_t h2_ngn_shed_done_task(h2_ngn_shed *shed, 
                                     struct h2_req_engine *ngn, h2_task *task)
 {
-    return ngn_done_task(shed, ngn, task, 1, 0, 0);
+    return ngn_done_task(shed, ngn, task, 1, 0);
 }
                                 
 void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn)
@@ -308,7 +304,7 @@ void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn)
                           "h2_ngn_shed(%ld): engine %s has queued task %s, "
                           "frozen=%d, aborting",
                           shed->c->id, ngn->id, task->id, task->frozen);
-            ngn_done_task(shed, ngn, task, 0, 1, 1);
+            ngn_done_task(shed, ngn, task, 0, 1);
         }
     }
     if (!shed->aborted && (ngn->no_assigned > 1 || ngn->no_live > 1)) {
index 3ef884ffd25ec8088f9e62d414cd68d9a0f4e05e..87bbe38c3c2751bac3e73c92d1ce09148a585e06 100644 (file)
@@ -42,9 +42,6 @@ h2_task_output *h2_task_output_create(h2_task *task, conn_rec *c)
         output->task = task;
         output->state = H2_TASK_OUT_INIT;
         output->from_h1 = h2_from_h1_create(task->stream_id, c->pool);
-        if (!output->from_h1) {
-            return NULL;
-        }
     }
     return output;
 }
@@ -66,47 +63,43 @@ static apr_table_t *get_trailers(h2_task_output *output)
     return NULL;
 }
 
-static apr_status_t open_if_needed(h2_task_output *output, ap_filter_t *f,
-                                   apr_bucket_brigade *bb, const char *caller)
+static apr_status_t open_response(h2_task_output *output, ap_filter_t *f,
+                                  apr_bucket_brigade *bb, const char *caller)
 {
-    if (output->state == H2_TASK_OUT_INIT) {
-        h2_response *response;
-        output->state = H2_TASK_OUT_STARTED;
-        response = h2_from_h1_get_response(output->from_h1);
-        if (!response) {
-            if (f) {
-                /* This happens currently when ap_die(status, r) is invoked
-                 * by a read request filter. */
-                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, output->c, APLOGNO(03204)
-                              "h2_task_output(%s): write without response by %s "
-                              "for %s %s %s",
-                              output->task->id, caller, 
-                              output->task->request->method, 
-                              output->task->request->authority, 
-                              output->task->request->path);
-                output->c->aborted = 1;
-            }
-            if (output->task->io) {
-                apr_thread_cond_broadcast(output->task->io);
-            }
-            return APR_ECONNABORTED;
+    h2_response *response;
+    response = h2_from_h1_get_response(output->from_h1);
+    if (!response) {
+        if (f) {
+            /* This happens currently when ap_die(status, r) is invoked
+             * by a read request filter. */
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, output->c, APLOGNO(03204)
+                          "h2_task_output(%s): write without response by %s "
+                          "for %s %s %s",
+                          output->task->id, caller, 
+                          output->task->request->method, 
+                          output->task->request->authority, 
+                          output->task->request->path);
+            output->c->aborted = 1;
         }
-        
-        if (h2_task_logio_add_bytes_out) {
-            /* counter headers as if we'd do a HTTP/1.1 serialization */
-            output->written = h2_util_table_bytes(response->headers, 3)+1;
-            h2_task_logio_add_bytes_out(output->c, output->written);
+        if (output->task->io) {
+            apr_thread_cond_broadcast(output->task->io);
         }
-        get_trailers(output);
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, output->c, APLOGNO(03348)
-                      "h2_task(%s): open response to %s %s %s",
-                      output->task->id, output->task->request->method, 
-                      output->task->request->authority, 
-                      output->task->request->path);
-        return h2_mplx_out_open(output->task->mplx, output->task->stream_id, 
-                                response, f, bb, output->task->io);
+        return APR_ECONNABORTED;
+    }
+    
+    if (h2_task_logio_add_bytes_out) {
+        /* count headers as if we'd do a HTTP/1.1 serialization */
+        output->written = h2_util_table_bytes(response->headers, 3)+1;
+        h2_task_logio_add_bytes_out(output->c, output->written);
     }
-    return APR_SUCCESS;
+    get_trailers(output);
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, output->c, APLOGNO(03348)
+                  "h2_task(%s): open response to %s %s %s",
+                  output->task->id, output->task->request->method, 
+                  output->task->request->authority, 
+                  output->task->request->path);
+    return h2_mplx_out_open(output->task->mplx, output->task->stream_id, 
+                            response, f, bb, output->task->io);
 }
 
 static apr_status_t write_brigade_raw(h2_task_output *output, 
@@ -145,7 +138,7 @@ static apr_status_t write_brigade_raw(h2_task_output *output,
 apr_status_t h2_task_output_write(h2_task_output *output,
                                   ap_filter_t* f, apr_bucket_brigade* bb)
 {
-    apr_status_t status;
+    apr_status_t status = APR_SUCCESS;
     
     if (APR_BRIGADE_EMPTY(bb)) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, output->c,
@@ -159,7 +152,10 @@ apr_status_t h2_task_output_write(h2_task_output *output,
         return APR_SUCCESS;
     }
     
-    status = open_if_needed(output, f, bb, "write");
+    if (output->state == H2_TASK_OUT_INIT) {
+        status = open_response(output, f, bb, "write");
+        output->state = H2_TASK_OUT_STARTED;
+    }
     
     /* Attempt to write saved brigade first */
     if (status == APR_SUCCESS && output->bb 
@@ -188,13 +184,3 @@ apr_status_t h2_task_output_write(h2_task_output *output,
     return status;
 }
 
-void h2_task_output_close(h2_task_output *output)
-{
-    open_if_needed(output, NULL, NULL, "close");
-    if (output->state != H2_TASK_OUT_DONE) {
-        h2_mplx_out_close(output->task->mplx, output->task->stream_id, 
-                          get_trailers(output));
-        output->state = H2_TASK_OUT_DONE;
-    }
-}
-
index 7861039e0dee6fea9065bb6b54e8aee94b5aeab7..76705820725357b15a5cde5b9539dad83dec2289 100644 (file)
@@ -52,8 +52,6 @@ apr_status_t h2_task_output_write(h2_task_output *output,
                                   ap_filter_t* filter,
                                   apr_bucket_brigade* brigade);
 
-void h2_task_output_close(h2_task_output *output);
-
 apr_status_t h2_task_output_freeze(h2_task_output *output);
 apr_status_t h2_task_output_thaw(h2_task_output *output);
 
index 904349658c32fa8e9bb20ea910a3d52f02465d0c..8fe2b26f4a0c2b5a6343a68457a1cd0449858832 100644 (file)
@@ -537,6 +537,7 @@ apr_status_t h2_util_move(apr_bucket_brigade *to, apr_bucket_brigade *from,
                 else {
                     const char *data;
                     apr_size_t len;
+
                     status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
                     if (status == APR_SUCCESS && len > 0) {
                         status = apr_brigade_write(to, NULL, NULL, data, len);