]> granicus.if.org Git - apache/commitdiff
backport of mod_http2 v1.4.2
authorStefan Eissing <icing@apache.org>
Wed, 16 Mar 2016 15:16:00 +0000 (15:16 +0000)
committerStefan Eissing <icing@apache.org>
Wed, 16 Mar 2016 15:16:00 +0000 (15:16 +0000)
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/branches/2.4.x@1735239 13f79535-47bb-0310-9956-ffa450edef68

25 files changed:
CHANGES
modules/http2/h2_bucket_eoc.h
modules/http2/h2_bucket_eos.h
modules/http2/h2_conn.c
modules/http2/h2_conn_io.c
modules/http2/h2_conn_io.h
modules/http2/h2_io.c
modules/http2/h2_io.h
modules/http2/h2_mplx.c
modules/http2/h2_mplx.h
modules/http2/h2_ngn_shed.c
modules/http2/h2_ngn_shed.h
modules/http2/h2_session.c
modules/http2/h2_session.h
modules/http2/h2_stream.c
modules/http2/h2_stream.h
modules/http2/h2_task.c
modules/http2/h2_task.h
modules/http2/h2_task_output.c
modules/http2/h2_task_output.h
modules/http2/h2_util.c
modules/http2/h2_util.h
modules/http2/h2_version.h
modules/http2/mod_http2.c
modules/http2/mod_http2.h

diff --git a/CHANGES b/CHANGES
index 6a0115224ecf693a6256bcb00acf6a653922b910..a76dcaf545c2b41d0277206941058aa169b1123e 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -2,6 +2,9 @@
 
 Changes with Apache 2.4.19
 
+  *) mod_http2: fixes problem with wrong lifetime of file buckets on main
+     connection. [Stefan Eissing]
+     
   *) mod_http2: fixes incorrect denial of requests without :authority header.
      [Stefan Eissing]
      
index f1cd6f813547cbb26c6340179c621e0dd8e1d229..2d46691995d56beab4a2e0f73852e3eb9ec473d6 100644 (file)
@@ -21,6 +21,7 @@ struct h2_session;
 /** End Of HTTP/2 SESSION (H2EOC) bucket */
 extern const apr_bucket_type_t h2_bucket_type_eoc;
 
+#define H2_BUCKET_IS_H2EOC(e)     (e->type == &h2_bucket_type_eoc)
 
 apr_bucket * h2_bucket_eoc_make(apr_bucket *b, 
                                 struct h2_session *session);
index bd3360db5a53bc52aa7b2f307dd60d84e5428154..27b501dad393a9d59825b23a372d4865c3f54d2a 100644 (file)
@@ -21,6 +21,7 @@ struct h2_stream;
 /** End Of HTTP/2 STREAM (H2EOS) bucket */
 extern const apr_bucket_type_t h2_bucket_type_eos;
 
+#define H2_BUCKET_IS_H2EOS(e)     (e->type == &h2_bucket_type_eos)
 
 apr_bucket *h2_bucket_eos_make(apr_bucket *b, struct h2_stream *stream);
 
index a0cd54e6ac7b4a2d5c646f9826b468f7e5bab59a..60e209492c4a6e0726f560109483c2b70a9dc99a 100644 (file)
@@ -261,7 +261,7 @@ conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *parent,
     }
     apr_pool_create_ex(&pool, parent, NULL, allocator);
     apr_pool_tag(pool, "h2_slave_conn");
-    apr_allocator_owner_set(allocator, parent);
+    apr_allocator_owner_set(allocator, pool);
     
     c = (conn_rec *) apr_palloc(pool, sizeof(conn_rec));
     if (c == NULL) {
@@ -309,15 +309,18 @@ conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *parent,
 
 void h2_slave_destroy(conn_rec *slave, apr_allocator_t **pallocator)
 {
+    apr_pool_t *parent;
     apr_allocator_t *allocator = apr_pool_allocator_get(slave->pool);
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, slave,
                   "h2_slave_conn(%ld): destroy (task=%s)", slave->id,
                   apr_table_get(slave->notes, H2_TASK_ID_NOTE));
-    apr_pool_destroy(slave->pool);
-    if (pallocator) {
+    /* Attache the allocator to the parent pool and return it for
+     * reuse, otherwise the own is still the slave pool and it will
+     * get destroyed with it. */
+    parent = apr_pool_parent_get(slave->pool);
+    if (pallocator && parent) {
+        apr_allocator_owner_set(allocator, parent);
         *pallocator = allocator;
     }
-    else {
-        apr_allocator_destroy(allocator);
-    }
+    apr_pool_destroy(slave->pool);
 }
index 56d01e6732c523d33a462d7d98120208f036fe58..59561ecd61982667a354b87fcda00c93d49e3743 100644 (file)
  */
 
 #include <assert.h>
-
+#include <apr_strings.h>
 #include <ap_mpm.h>
 
 #include <httpd.h>
 #include <http_core.h>
 #include <http_log.h>
 #include <http_connection.h>
+#include <http_request.h>
 
 #include "h2_private.h"
 #include "h2_bucket_eoc.h"
+#include "h2_bucket_eos.h"
 #include "h2_config.h"
 #include "h2_conn_io.h"
 #include "h2_h2.h"
  * which seems to create less TCP packets overall
  */
 #define WRITE_SIZE_MAX        (TLS_DATA_MAX - 100) 
-
 #define WRITE_BUFFER_SIZE     (5*WRITE_SIZE_MAX)
 
+
+static void h2_conn_io_bb_log(conn_rec *c, int stream_id, int level, 
+                              const char *tag, apr_bucket_brigade *bb)
+{
+    char buffer[16 * 1024];
+    const char *line = "(null)";
+    apr_size_t bmax = sizeof(buffer)/sizeof(buffer[0]);
+    int off = 0;
+    apr_bucket *b;
+    
+    if (bb) {
+        memset(buffer, 0, bmax--);
+        for (b = APR_BRIGADE_FIRST(bb); 
+             bmax && (b != APR_BRIGADE_SENTINEL(bb));
+             b = APR_BUCKET_NEXT(b)) {
+            
+            if (APR_BUCKET_IS_METADATA(b)) {
+                if (APR_BUCKET_IS_EOS(b)) {
+                    off += apr_snprintf(buffer+off, bmax-off, "eos ");
+                }
+                else if (APR_BUCKET_IS_FLUSH(b)) {
+                    off += apr_snprintf(buffer+off, bmax-off, "flush ");
+                }
+                else if (AP_BUCKET_IS_EOR(b)) {
+                    off += apr_snprintf(buffer+off, bmax-off, "eor ");
+                }
+                else if (H2_BUCKET_IS_H2EOC(b)) {
+                    off += apr_snprintf(buffer+off, bmax-off, "h2eoc ");
+                }
+                else if (H2_BUCKET_IS_H2EOS(b)) {
+                    off += apr_snprintf(buffer+off, bmax-off, "h2eos ");
+                }
+                else {
+                    off += apr_snprintf(buffer+off, bmax-off, "meta(unknown) ");
+                }
+            }
+            else {
+                const char *btype = "data";
+                if (APR_BUCKET_IS_FILE(b)) {
+                    btype = "file";
+                }
+                else if (APR_BUCKET_IS_PIPE(b)) {
+                    btype = "pipe";
+                }
+                else if (APR_BUCKET_IS_SOCKET(b)) {
+                    btype = "socket";
+                }
+                else if (APR_BUCKET_IS_HEAP(b)) {
+                    btype = "heap";
+                }
+                else if (APR_BUCKET_IS_TRANSIENT(b)) {
+                    btype = "transient";
+                }
+                else if (APR_BUCKET_IS_IMMORTAL(b)) {
+                    btype = "immortal";
+                }
+#if APR_HAS_MMAP
+                else if (APR_BUCKET_IS_MMAP(b)) {
+                    btype = "mmap";
+                }
+#endif
+                else if (APR_BUCKET_IS_POOL(b)) {
+                    btype = "pool";
+                }
+                
+                off += apr_snprintf(buffer+off, bmax-off, "%s[%ld] ", 
+                                    btype, 
+                                    (long)(b->length == ((apr_size_t)-1)? 
+                                           -1 : b->length));
+            }
+        }
+        line = *buffer? buffer : "(empty)";
+    }
+    /* Intentional no APLOGNO */
+    ap_log_cerror(APLOG_MARK, level, 0, c, "bb_dump(%ld-%d)-%s: %s", 
+                  c->id, stream_id, tag, line);
+
+}
+
 apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c, 
                              const h2_config *cfg, 
                              apr_pool_t *pool)
 {
-    io->connection         = c;
-    io->output             = apr_brigade_create(pool, c->bucket_alloc);
-    io->buflen             = 0;
-    io->is_tls             = h2_h2_is_tls(c);
-    io->buffer_output      = io->is_tls;
+    io->c             = c;
+    io->output        = apr_brigade_create(pool, c->bucket_alloc);
+    io->buflen        = 0;
+    io->is_tls        = h2_h2_is_tls(c);
+    io->buffer_output = io->is_tls;
     
     if (io->buffer_output) {
         io->bufsize = WRITE_BUFFER_SIZE;
@@ -65,8 +145,9 @@ apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c,
     }
     
     if (io->is_tls) {
-        /* That is where we start with, 
-         * see https://issues.apache.org/jira/browse/TS-2503 */
+        /* This is what we start with, 
+         * see https://issues.apache.org/jira/browse/TS-2503 
+         */
         io->warmup_size    = h2_config_geti64(cfg, H2_CONF_TLS_WARMUP_SIZE);
         io->cooldown_usecs = (h2_config_geti(cfg, H2_CONF_TLS_COOLDOWN_SECS) 
                               * APR_USEC_PER_SEC);
@@ -79,9 +160,10 @@ apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c,
     }
 
     if (APLOGctrace1(c)) {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->connection,
-                      "h2_conn_io(%ld): init, buffering=%d, warmup_size=%ld, cd_secs=%f",
-                      io->connection->id, io->buffer_output, (long)io->warmup_size,
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c,
+                      "h2_conn_io(%ld): init, buffering=%d, warmup_size=%ld, "
+                      "cd_secs=%f", io->c->id, io->buffer_output, 
+                      (long)io->warmup_size,
                       ((float)io->cooldown_usecs/APR_USEC_PER_SEC));
     }
 
@@ -110,16 +192,17 @@ static apr_status_t pass_out(apr_bucket_brigade *bb, void *ctx)
     }
     
     ap_update_child_status_from_conn(c->sbh, SERVER_BUSY_WRITE, c);
-    status = apr_brigade_length(bb, 0, &bblen);
-    if (status == APR_SUCCESS) {
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(03044)
+    apr_brigade_length(bb, 0, &bblen);
+    h2_conn_io_bb_log(c, 0, APLOG_TRACE2, "master conn pass", bb);
+    status = ap_pass_brigade(c->output_filters, bb);
+    if (status == APR_SUCCESS && pctx->io) {
+        pctx->io->bytes_written += (apr_size_t)bblen;
+        pctx->io->last_write = apr_time_now();
+    }
+    if (status != APR_SUCCESS) {
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c, APLOGNO(03044)
                       "h2_conn_io(%ld): pass_out brigade %ld bytes",
                       c->id, (long)bblen);
-        status = ap_pass_brigade(c->output_filters, bb);
-        if (status == APR_SUCCESS && pctx->io) {
-            pctx->io->bytes_written += (apr_size_t)bblen;
-            pctx->io->last_write = apr_time_now();
-        }
     }
     apr_brigade_cleanup(bb);
     return status;
@@ -141,17 +224,17 @@ static apr_status_t bucketeer_buffer(h2_conn_io *io)
         /* long time not written, reset write size */
         io->write_size = WRITE_SIZE_INITIAL;
         io->bytes_written = 0;
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->connection,
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c,
                       "h2_conn_io(%ld): timeout write size reset to %ld", 
-                      (long)io->connection->id, (long)io->write_size);
+                      (long)io->c->id, (long)io->write_size);
     }
     else if (io->write_size < WRITE_SIZE_MAX 
              && io->bytes_written >= io->warmup_size) {
         /* connection is hot, use max size */
         io->write_size = WRITE_SIZE_MAX;
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->connection,
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c,
                       "h2_conn_io(%ld): threshold reached, write size now %ld", 
-                      (long)io->connection->id, (long)io->write_size);
+                      (long)io->c->id, (long)io->write_size);
     }
     
     bcount = (int)(remaining / io->write_size);
@@ -177,50 +260,41 @@ apr_status_t h2_conn_io_writeb(h2_conn_io *io, apr_bucket *b)
     return APR_SUCCESS;
 }
 
-static apr_status_t h2_conn_io_flush_int(h2_conn_io *io, int force, int eoc)
+static apr_status_t h2_conn_io_flush_int(h2_conn_io *io, int flush, int eoc)
 {
-    if (io->buflen > 0 || !APR_BRIGADE_EMPTY(io->output)) {
-        pass_out_ctx ctx;
-        
-        if (io->buflen > 0) {
-            /* something in the buffer, put it in the output brigade */
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->connection,
-                          "h2_conn_io: flush, flushing %ld bytes", (long)io->buflen);
-            bucketeer_buffer(io);
-        }
-        
-        if (force) {
-            APR_BRIGADE_INSERT_TAIL(io->output,
-                                    apr_bucket_flush_create(io->output->bucket_alloc));
-        }
-        
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->connection,
-                      "h2_conn_io: flush");
-        /* Send it out */
-        io->buflen = 0;
-        ctx.c = io->connection;
-        ctx.io = eoc? NULL : io;
+    pass_out_ctx ctx;
+    apr_bucket *b;
+    
+    if (io->buflen == 0 && APR_BRIGADE_EMPTY(io->output)) {
+        return APR_SUCCESS;
+    }
         
-        return pass_out(io->output, &ctx);
-        /* no more access after this, as we might have flushed an EOC bucket
-         * that de-allocated us all. */
+    if (io->buflen > 0) {
+        /* something in the buffer, put it in the output brigade */
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c,
+                      "h2_conn_io: flush, flushing %ld bytes", 
+                      (long)io->buflen);
+        bucketeer_buffer(io);
     }
-    return APR_SUCCESS;
-}
-
-apr_status_t h2_conn_io_pass(h2_conn_io *io, int flush)
-{
-    return h2_conn_io_flush_int(io, flush, 0);
+    
+    if (flush) {
+        b = apr_bucket_flush_create(io->c->bucket_alloc);
+        APR_BRIGADE_INSERT_TAIL(io->output, b);
+    }
+    
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c, "h2_conn_io: flush");
+    io->buflen = 0;
+    ctx.c = io->c;
+    ctx.io = eoc? NULL : io;
+    
+    return pass_out(io->output, &ctx);
+    /* no more access after this, as we might have flushed an EOC bucket
+     * that de-allocated us all. */
 }
 
 apr_status_t h2_conn_io_flush(h2_conn_io *io)
 {
-    /* make sure we always write a flush, even if our buffers are empty.
-     * We want to flush not only our buffers, but alse ones further down
-     * the connection filters. */
-    apr_bucket *b = apr_bucket_flush_create(io->connection->bucket_alloc);
-    APR_BRIGADE_INSERT_TAIL(io->output, b);
-    return h2_conn_io_flush_int(io, 0, 0);
+    return h2_conn_io_flush_int(io, 1, 0);
 }
 
 apr_status_t h2_conn_io_consider_pass(h2_conn_io *io)
@@ -228,20 +302,18 @@ apr_status_t h2_conn_io_consider_pass(h2_conn_io *io)
     apr_off_t len = 0;
     
     if (!APR_BRIGADE_EMPTY(io->output)) {
-        apr_brigade_length(io->output, 0, &len);
+        len = h2_brigade_mem_size(io->output);
     }
     len += io->buflen;
     if (len >= WRITE_BUFFER_SIZE) {
-        return h2_conn_io_pass(io, 0);
+        return h2_conn_io_flush_int(io, 1, 0);
     }
     return APR_SUCCESS;
 }
 
 apr_status_t h2_conn_io_write_eoc(h2_conn_io *io, h2_session *session)
 {
-    apr_bucket *b = h2_bucket_eoc_create(io->connection->bucket_alloc, session);
-    APR_BRIGADE_INSERT_TAIL(io->output, b);
-    b = apr_bucket_flush_create(io->connection->bucket_alloc);
+    apr_bucket *b = h2_bucket_eoc_create(io->c->bucket_alloc, session);
     APR_BRIGADE_INSERT_TAIL(io->output, b);
     return h2_conn_io_flush_int(io, 0, 1);
 }
@@ -252,20 +324,20 @@ apr_status_t h2_conn_io_write(h2_conn_io *io,
     apr_status_t status = APR_SUCCESS;
     pass_out_ctx ctx;
     
-    ctx.c = io->connection;
+    ctx.c = io->c;
     ctx.io = io;
     if (io->bufsize > 0) {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->connection,
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c,
                       "h2_conn_io: buffering %ld bytes", (long)length);
                       
         if (!APR_BRIGADE_EMPTY(io->output)) {
-            status = h2_conn_io_pass(io, 0);
+            status = h2_conn_io_flush_int(io, 0, 0);
         }
         
         while (length > 0 && (status == APR_SUCCESS)) {
             apr_size_t avail = io->bufsize - io->buflen;
             if (avail <= 0) {
-                h2_conn_io_pass(io, 0);
+                status = h2_conn_io_flush_int(io, 0, 0);
             }
             else if (length > avail) {
                 memcpy(io->buffer + io->buflen, buf, avail);
@@ -283,7 +355,7 @@ apr_status_t h2_conn_io_write(h2_conn_io *io,
         
     }
     else {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE4, status, io->connection,
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE4, status, io->c,
                       "h2_conn_io: writing %ld bytes to brigade", (long)length);
         status = apr_brigade_write(io->output, pass_out, &ctx, buf, length);
     }
index 8d71fffcd7b1c92eb3f5686a6428a24d1bb0b183..b8be671d38e1214628f96f87720795a015fa1198 100644 (file)
@@ -26,7 +26,7 @@ struct h2_session;
  * directly without copying.
  */
 typedef struct {
-    conn_rec *connection;
+    conn_rec *c;
     apr_bucket_brigade *output;
 
     int is_tls;
@@ -77,7 +77,6 @@ apr_status_t h2_conn_io_write_eoc(h2_conn_io *io, struct h2_session *session);
  * @param io the connection io
  * @param flush if a flush bucket should be appended to any output
  */
-apr_status_t h2_conn_io_pass(h2_conn_io *io, int flush);
 apr_status_t h2_conn_io_flush(h2_conn_io *io);
 
 /**
index 3f82c60f1029db7578b2cf5c9aead2c5dcf6956a..0beb85606daf50c48ec90640a676aba3664e1de3 100644 (file)
@@ -23,6 +23,7 @@
 #include <http_core.h>
 #include <http_log.h>
 #include <http_connection.h>
+#include <http_request.h>
 
 #include "h2_private.h"
 #include "h2_h2.h"
 #include "h2_task.h"
 #include "h2_util.h"
 
-h2_io *h2_io_create(int id, apr_pool_t *pool, const h2_request *request)
+h2_io *h2_io_create(int id, apr_pool_t *pool, 
+                    apr_bucket_alloc_t *bucket_alloc,
+                    const h2_request *request)
 {
     h2_io *io = apr_pcalloc(pool, sizeof(*io));
     if (io) {
         io->id = id;
         io->pool = pool;
-        io->bucket_alloc = apr_bucket_alloc_create(pool);
+        io->bucket_alloc = bucket_alloc;
         io->request = h2_request_clone(pool, request);
     }
     return io;
 }
 
+static void check_bbin(h2_io *io)
+{
+    if (!io->bbin) {
+        io->bbin = apr_brigade_create(io->pool, io->bucket_alloc);
+    }
+}
+
+static void check_bbout(h2_io *io)
+{
+    if (!io->bbout) {
+        io->bbout = apr_brigade_create(io->pool, io->bucket_alloc);
+    }
+}
+
+static void check_bbtmp(h2_io *io)
+{
+    if (!io->bbtmp) {
+        io->bbtmp = apr_brigade_create(io->pool, io->bucket_alloc);
+    }
+}
+
+static void append_eos(h2_io *io, apr_bucket_brigade *bb)
+{
+    APR_BRIGADE_INSERT_TAIL(bb, apr_bucket_eos_create(io->bucket_alloc));
+}
+
 void h2_io_redo(h2_io *io)
 {
     io->worker_started = 0;
@@ -56,8 +85,8 @@ void h2_io_redo(h2_io *io)
     if (io->bbout) {
         apr_brigade_cleanup(io->bbout);
     }
-    if (io->tmp) {
-        apr_brigade_cleanup(io->tmp);
+    if (io->bbtmp) {
+        apr_brigade_cleanup(io->bbtmp);
     }
     io->started_at = io->done_at = 0;
 }
@@ -85,23 +114,12 @@ void h2_io_set_response(h2_io *io, h2_response *response)
     }
 }
 
-
 void h2_io_rst(h2_io *io, int error)
 {
     io->rst_error = error;
     io->eos_in = 1;
 }
 
-int h2_io_in_has_eos_for(h2_io *io)
-{
-    return io->eos_in || (io->bbin && h2_util_has_eos(io->bbin, -1));
-}
-
-int h2_io_in_has_data(h2_io *io)
-{
-    return io->bbin && h2_util_bb_has_data_or_eos(io->bbin);
-}
-
 int h2_io_out_has_data(h2_io *io)
 {
     return io->bbout && h2_util_bb_has_data_or_eos(io->bbout);
@@ -199,8 +217,8 @@ static int add_trailer(void *ctx, const char *key, const char *value)
     return (status == APR_SUCCESS);
 }
 
-static apr_status_t append_eos(h2_io *io, apr_bucket_brigade *bb, 
-                               apr_table_t *trailers)
+static apr_status_t in_append_eos(h2_io *io, apr_bucket_brigade *bb, 
+                                  apr_table_t *trailers)
 {
     apr_status_t status = APR_SUCCESS;
     apr_table_t *t = io->request->trailers;
@@ -222,7 +240,7 @@ static apr_status_t append_eos(h2_io *io, apr_bucket_brigade *bb,
             status = apr_brigade_puts(bb, NULL, NULL, "0\r\n\r\n");
         }
     }
-    APR_BRIGADE_INSERT_TAIL(bb, apr_bucket_eos_create(io->bucket_alloc));
+    append_eos(io, bb);
     return status;
 }
 
@@ -239,7 +257,7 @@ apr_status_t h2_io_in_read(h2_io *io, apr_bucket_brigade *bb,
     if (!io->bbin || APR_BRIGADE_EMPTY(io->bbin)) {
         if (io->eos_in) {
             if (!io->eos_in_written) {
-                status = append_eos(io, bb, trailers);
+                status = in_append_eos(io, bb, trailers);
                 io->eos_in_written = 1;
                 return status;
             }
@@ -250,26 +268,27 @@ apr_status_t h2_io_in_read(h2_io *io, apr_bucket_brigade *bb,
     
     if (io->request->chunked) {
         /* the reader expects HTTP/1.1 chunked encoding */
-        status = h2_util_move(io->tmp, io->bbin, maxlen, NULL, "h2_io_in_read_chunk");
+        check_bbtmp(io);
+        status = h2_util_move(io->bbtmp, io->bbin, maxlen, NULL, "h2_io_in_read_chunk");
         if (status == APR_SUCCESS) {
             apr_off_t tmp_len = 0;
             
-            apr_brigade_length(io->tmp, 1, &tmp_len);
+            apr_brigade_length(io->bbtmp, 1, &tmp_len);
             if (tmp_len > 0) {
                 io->input_consumed += tmp_len;
                 status = apr_brigade_printf(bb, NULL, NULL, "%lx\r\n", 
                                             (unsigned long)tmp_len);
                 if (status == APR_SUCCESS) {
-                    status = h2_util_move(bb, io->tmp, -1, NULL, "h2_io_in_read_tmp1");
+                    status = h2_util_move(bb, io->bbtmp, -1, NULL, "h2_io_in_read_tmp1");
                     if (status == APR_SUCCESS) {
                         status = apr_brigade_puts(bb, NULL, NULL, "\r\n");
                     }
                 }
             }
             else {
-                status = h2_util_move(bb, io->tmp, -1, NULL, "h2_io_in_read_tmp2");
+                status = h2_util_move(bb, io->bbtmp, -1, NULL, "h2_io_in_read_tmp2");
             }
-            apr_brigade_cleanup(io->tmp);
+            apr_brigade_cleanup(io->bbtmp);
         }
     }
     else {
@@ -286,7 +305,7 @@ apr_status_t h2_io_in_read(h2_io *io, apr_bucket_brigade *bb,
     if (status == APR_SUCCESS && (!io->bbin || APR_BRIGADE_EMPTY(io->bbin))) {
         if (io->eos_in) {
             if (!io->eos_in_written) {
-                status = append_eos(io, bb, trailers);
+                status = in_append_eos(io, bb, trailers);
                 io->eos_in_written = 1;
             }
         }
@@ -298,7 +317,7 @@ apr_status_t h2_io_in_read(h2_io *io, apr_bucket_brigade *bb,
     return status;
 }
 
-apr_status_t h2_io_in_write(h2_io *io, apr_bucket_brigade *bb)
+apr_status_t h2_io_in_write(h2_io *io, const char *d, apr_size_t len, int eos)
 {
     if (io->rst_error) {
         return APR_ECONNABORTED;
@@ -307,13 +326,12 @@ apr_status_t h2_io_in_write(h2_io *io, apr_bucket_brigade *bb)
     if (io->eos_in) {
         return APR_EOF;
     }
-    io->eos_in = h2_util_has_eos(bb, -1);
-    if (!APR_BRIGADE_EMPTY(bb)) {
-        if (!io->bbin) {
-            io->bbin = apr_brigade_create(io->pool, io->bucket_alloc);
-            io->tmp = apr_brigade_create(io->pool, io->bucket_alloc);
-        }
-        return h2_util_move(io->bbin, bb, -1, NULL, "h2_io_in_write");
+    if (eos) {
+        io->eos_in = 1;
+    }
+    if (len > 0) {
+        check_bbin(io);
+        return apr_brigade_write(io->bbin, NULL, NULL, d, len);
     }
     return APR_SUCCESS;
 }
@@ -328,27 +346,36 @@ apr_status_t h2_io_in_close(h2_io *io)
     return APR_SUCCESS;
 }
 
-apr_status_t h2_io_out_readx(h2_io *io,  
-                             h2_io_data_cb *cb, void *ctx, 
-                             apr_off_t *plen, int *peos)
+static int is_out_readable(h2_io *io, apr_off_t *plen, int *peos, 
+                           apr_status_t *ps)
 {
-    apr_status_t status;
-    
     if (io->rst_error) {
-        return APR_ECONNABORTED;
+        *ps = APR_ECONNABORTED;
+        return 0;
     }
-    
     if (io->eos_out_read) {
         *plen = 0;
         *peos = 1;
-        return APR_SUCCESS;
+        *ps = APR_SUCCESS;
+        return 0;
     }
     else if (!io->bbout) {
         *plen = 0;
         *peos = 0;
-        return APR_EAGAIN;
+        *ps = APR_EAGAIN;
+        return 0;
+    }
+    return 1;
+}
+
+apr_status_t h2_io_out_readx(h2_io *io,  
+                             h2_io_data_cb *cb, void *ctx, 
+                             apr_off_t *plen, int *peos)
+{
+    apr_status_t status;
+    if (!is_out_readable(io, plen, peos, &status)) {
+        return status;
     }
-    
     if (cb == NULL) {
         /* just checking length available */
         status = h2_util_bb_avail(io->bbout, plen, peos);
@@ -360,7 +387,6 @@ apr_status_t h2_io_out_readx(h2_io *io,
             io->output_consumed += *plen;
         }
     }
-    
     return status;
 }
 
@@ -368,24 +394,13 @@ apr_status_t h2_io_out_read_to(h2_io *io, apr_bucket_brigade *bb,
                                apr_off_t *plen, int *peos)
 {
     apr_status_t status;
-    
-    if (io->rst_error) {
-        return APR_ECONNABORTED;
-    }
-    
-    if (io->eos_out_read) {
-        *plen = 0;
-        *peos = 1;
-        return APR_SUCCESS;
-    }
-    else if (!io->bbout) {
-        *plen = 0;
-        *peos = 0;
-        return APR_EAGAIN;
+    if (!is_out_readable(io, plen, peos, &status)) {
+        return status;
     }
-
-    io->eos_out_read = *peos = h2_util_has_eos(io->bbout, *plen);
     status = h2_util_move(bb, io->bbout, *plen, NULL, "h2_io_read_to");
+    if (status == APR_SUCCESS && io->eos_out && APR_BRIGADE_EMPTY(io->bbout)) {
+        io->eos_out_read = *peos = 1;
+    }
     io->output_consumed += *plen;
     return status;
 }
@@ -403,6 +418,7 @@ apr_status_t h2_io_out_write(h2_io *io, apr_bucket_brigade *bb,
                              apr_size_t *pfile_buckets_allowed)
 {
     apr_status_t status;
+    apr_bucket *b;
     int start_allowed;
     
     if (io->rst_error) {
@@ -410,27 +426,33 @@ apr_status_t h2_io_out_write(h2_io *io, apr_bucket_brigade *bb,
     }
 
     if (io->eos_out) {
-        apr_off_t len;
+        apr_off_t len = 0;
         /* We have already delivered an EOS bucket to a reader, no
          * sense in storing anything more here.
          */
-        status = apr_brigade_length(bb, 1, &len);
-        if (status == APR_SUCCESS) {
-            if (len > 0) {
-                /* someone tries to write real data after EOS, that
-                 * does not look right. */
-                status = APR_EOF;
-            }
-            /* cleanup, as if we had moved the data */
-            apr_brigade_cleanup(bb);
+        apr_brigade_length(bb, 0, &len);
+        apr_brigade_cleanup(bb);
+        return (len > 0)? APR_EOF : APR_SUCCESS;
+    }
+
+    /* Filter the EOR bucket and set it aside. We prefer to tear down
+     * the request when the whole h2 stream is done */
+    for (b = APR_BRIGADE_FIRST(bb);
+         b != APR_BRIGADE_SENTINEL(bb);
+         b = APR_BUCKET_NEXT(b))
+    {
+        if (AP_BUCKET_IS_EOR(b)) {
+            APR_BUCKET_REMOVE(b);
+            io->eor = b;
+            break;
         }
-        return status;
-    }
-
+        else if (APR_BUCKET_IS_EOS(b)) {
+            io->eos_out = 1;
+            break;
+        }
+    }     
+    
     process_trailers(io, trailers);
-    if (!io->bbout) {
-        io->bbout = apr_brigade_create(io->pool, io->bucket_alloc);
-    }
     
     /* Let's move the buckets from the request processing in here, so
      * that the main thread can read them when it has time/capacity.
@@ -442,6 +464,7 @@ apr_status_t h2_io_out_write(h2_io *io, apr_bucket_brigade *bb,
      * many open files already buffered. Otherwise we will run out of
      * file handles.
      */
+    check_bbout(io);
     start_allowed = *pfile_buckets_allowed;
     status = h2_util_move(io->bbout, bb, maxlen, pfile_buckets_allowed, 
                           "h2_io_out_write");
@@ -460,14 +483,11 @@ apr_status_t h2_io_out_close(h2_io *io, apr_table_t *trailers)
     }
     if (!io->eos_out_read) { /* EOS has not been read yet */
         process_trailers(io, trailers);
-        if (!io->bbout) {
-            io->bbout = apr_brigade_create(io->pool, io->bucket_alloc);
-        }
         if (!io->eos_out) {
+            check_bbout(io);
             io->eos_out = 1;
             if (!h2_util_has_eos(io->bbout, -1)) {
-                APR_BRIGADE_INSERT_TAIL(io->bbout, 
-                                        apr_bucket_eos_create(io->bucket_alloc));
+                append_eos(io, io->bbout);
             }
         }
     }
index d92b7eb0d423f68b14778d8b4c21e05b784ec18d..90d0cde8f2ea1405c312d166b7ab932b6c966f18 100644 (file)
@@ -44,9 +44,12 @@ struct h2_io {
     struct h2_response *response;    /* response to request */
     int rst_error;                   /* h2 related stream abort error */
 
+    apr_bucket *eor;                 /* the EOR bucket, set aside */
+    struct h2_task *task;            /* the task once started */
+    
     apr_bucket_brigade *bbin;        /* input data for stream */
     apr_bucket_brigade *bbout;       /* output data from stream */
-    apr_bucket_brigade *tmp;         /* temporary data for chunking */
+    apr_bucket_brigade *bbtmp;       /* temporary data for chunking */
 
     unsigned int orphaned       : 1; /* h2_stream is gone for this io */    
     unsigned int worker_started : 1; /* h2_worker started processing for this io */
@@ -77,7 +80,9 @@ struct h2_io {
 /**
  * Creates a new h2_io for the given stream id. 
  */
-h2_io *h2_io_create(int id, apr_pool_t *pool, const struct h2_request *request);
+h2_io *h2_io_create(int id, apr_pool_t *pool, 
+                    apr_bucket_alloc_t *bucket_alloc, 
+                    const struct h2_request *request);
 
 /**
  * Set the response of this stream.
@@ -92,19 +97,10 @@ void h2_io_rst(h2_io *io, int error);
 int h2_io_is_repeatable(h2_io *io);
 void h2_io_redo(h2_io *io);
 
-/**
- * The input data is completely queued. Blocked reads will return immediately
- * and give either data or EOF.
- */
-int h2_io_in_has_eos_for(h2_io *io);
 /**
  * Output data is available.
  */
 int h2_io_out_has_data(h2_io *io);
-/**
- * Input data is available.
- */
-int h2_io_in_has_data(h2_io *io);
 
 void h2_io_signal(h2_io *io, h2_io_op op);
 void h2_io_signal_init(h2_io *io, h2_io_op op, apr_interval_time_t timeout, 
@@ -127,7 +123,7 @@ apr_status_t h2_io_in_read(h2_io *io, apr_bucket_brigade *bb,
 /**
  * Appends given bucket to the input.
  */
-apr_status_t h2_io_in_write(h2_io *io, apr_bucket_brigade *bb);
+apr_status_t h2_io_in_write(h2_io *io, const char *d, apr_size_t len, int eos);
 
 /**
  * Closes the input. After existing data has been read, APR_EOF will
index 4d7f63bb52e981f718f1b801478e66b85234af3d..1284c432556be3955b4c9f212d380087404b4f39 100644 (file)
@@ -195,12 +195,13 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
             return NULL;
         }
         
-        status = apr_thread_cond_create(&m->req_added, m->pool);
+        status = apr_thread_cond_create(&m->task_thawed, m->pool);
         if (status != APR_SUCCESS) {
             h2_mplx_destroy(m);
             return NULL;
         }
     
+        m->bucket_alloc = apr_bucket_alloc_create(m->pool);
         m->max_streams = h2_config_geti(conf, H2_CONF_MAX_STREAMS);
         m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
         m->q = h2_iq_create(m->pool, m->max_streams);
@@ -253,7 +254,7 @@ static void workers_register(h2_mplx *m)
     h2_workers_register(m->workers, m);
 }
 
-static int io_process_events(h2_mplx *m, h2_io *io)
+static int io_in_consumed_signal(h2_mplx *m, h2_io *io)
 {
     if (io->input_consumed && m->input_consumed) {
         m->input_consumed(m->input_consumed_ctx, 
@@ -264,18 +265,28 @@ static int io_process_events(h2_mplx *m, h2_io *io)
     return 0;
 }
 
+static int io_out_consumed_signal(h2_mplx *m, h2_io *io)
+{
+    if (io->output_consumed && io->task && io->task->assigned) {
+        h2_req_engine_out_consumed(io->task->assigned, io->task->c, 
+                                   io->output_consumed);
+        io->output_consumed = 0;
+        return 1;
+    }
+    return 0;
+}
+
 static void io_destroy(h2_mplx *m, h2_io *io, int events)
 {
-    apr_pool_t *pool = io->pool;
+    apr_pool_t *pool;
     
     /* cleanup any buffered input */
     h2_io_in_shutdown(io);
     if (events) {
         /* Process outstanding events before destruction */
-        io_process_events(m, io);
+        io_in_consumed_signal(m, io);
     }
     
-    io->pool = NULL;    
     /* The pool is cleared/destroyed which also closes all
      * allocated file handles. Give this count back to our
      * file handle pool. */
@@ -286,8 +297,20 @@ static void io_destroy(h2_mplx *m, h2_io *io, int events)
     if (m->redo_ios) {
         h2_io_set_remove(m->redo_ios, io);
     }
-    
-    if (pool) {
+
+    if (io->task) {
+        if (m->spare_allocator) {
+            apr_allocator_destroy(m->spare_allocator);
+            m->spare_allocator = NULL;
+        }
+        
+        h2_slave_destroy(io->task->c, &m->spare_allocator);
+        io->task = NULL;
+    }
+
+    pool = io->pool;
+    io->pool = NULL;    
+    if (0 && pool) {
         apr_pool_clear(pool);
         if (m->spare_pool) {
             apr_pool_destroy(m->spare_pool);
@@ -365,7 +388,7 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
         h2_mplx_set_consumed_cb(m, NULL, NULL);
         
         h2_iq_clear(m->q);
-        apr_thread_cond_broadcast(m->req_added);
+        apr_thread_cond_broadcast(m->task_thawed);
         while (!h2_io_set_iter(m->stream_ios, stream_done_iter, m)) {
             /* iterate until all ios have been orphaned or destroyed */
         }
@@ -401,7 +424,7 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
                     }
                 }
                 h2_mplx_abort(m);
-                apr_thread_cond_broadcast(m->req_added);
+                apr_thread_cond_broadcast(m->task_thawed);
             }
         }
         
@@ -448,7 +471,7 @@ apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error)
          * for processing, e.g. when we received all HEADERs. But when
          * a stream is cancelled very early, it will not exist. */
         if (io) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, 
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
                           "h2_mplx(%ld-%d): marking stream as done.", 
                           m->id, stream_id);
             io_stream_done(m, io, rst_error);
@@ -498,7 +521,7 @@ apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block,
 }
 
 apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id, 
-                              apr_bucket_brigade *bb)
+                              const char *data, apr_size_t len, int eos)
 {
     apr_status_t status;
     int acquired;
@@ -508,10 +531,10 @@ apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id,
         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
         if (io && !io->orphaned) {
             H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_pre");
-            status = h2_io_in_write(io, bb);
+            status = h2_io_in_write(io, data, len, eos);
             H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_post");
             h2_io_signal(io, H2_IO_READ);
-            io_process_events(m, io);
+            io_in_consumed_signal(m, io);
         }
         else {
             status = APR_ECONNABORTED;
@@ -533,7 +556,7 @@ apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id)
             status = h2_io_in_close(io);
             H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_close");
             h2_io_signal(io, H2_IO_READ);
-            io_process_events(m, io);
+            io_in_consumed_signal(m, io);
         }
         else {
             status = APR_ECONNABORTED;
@@ -543,6 +566,12 @@ apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id)
     return status;
 }
 
+void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx)
+{
+    m->input_consumed = cb;
+    m->input_consumed_ctx = ctx;
+}
+
 typedef struct {
     h2_mplx * m;
     int streams_updated;
@@ -551,18 +580,12 @@ typedef struct {
 static int update_window(void *ctx, h2_io *io)
 {
     update_ctx *uctx = (update_ctx*)ctx;
-    if (io_process_events(uctx->m, io)) {
+    if (io_in_consumed_signal(uctx->m, io)) {
         ++uctx->streams_updated;
     }
     return 1;
 }
 
-void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx)
-{
-    m->input_consumed = cb;
-    m->input_consumed_ctx = ctx;
-}
-
 apr_status_t h2_mplx_in_update_windows(h2_mplx *m)
 {
     apr_status_t status;
@@ -690,7 +713,7 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams)
                      * shutdown input and send out any events (e.g. window
                      * updates) asap. */
                     h2_io_in_shutdown(io);
-                    io_process_events(m, io);
+                    io_in_consumed_signal(m, io);
                 }
             }
             
@@ -717,8 +740,10 @@ static apr_status_t out_write(h2_mplx *m, h2_io *io,
            && !APR_BRIGADE_EMPTY(bb) 
            && !is_aborted(m, &status)) {
         
-        status = h2_io_out_write(io, bb, m->stream_max_mem, trailers,
-                                 &m->tx_handles_reserved);
+        status = h2_io_out_write(io, bb, blocking? m->stream_max_mem : INT_MAX, 
+                                 trailers, &m->tx_handles_reserved);
+        io_out_consumed_signal(m, io);
+        
         /* Wait for data to drain until there is room again or
          * stream timeout expires */
         h2_io_signal_init(io, H2_IO_WRITE, m->stream_timeout, iowait);
@@ -728,6 +753,9 @@ static apr_status_t out_write(h2_mplx *m, h2_io *io,
                && (m->stream_max_mem <= h2_io_out_length(io))
                && !is_aborted(m, &status)) {
             if (!blocking) {
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
+                              "h2_mplx(%ld-%d): incomplete write", 
+                              m->id, io->id);
                 return APR_INCOMPLETE;
             }
             trailers = NULL;
@@ -856,11 +884,13 @@ apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id, apr_table_t *trailers)
                               "h2_mplx(%ld-%d): close, no response, no rst", 
                               m->id, io->id);
             }
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
-                          "h2_mplx(%ld-%d): close with trailers=%s", 
-                          m->id, io->id, trailers? "yes" : "no");
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
+                          "h2_mplx(%ld-%d): close with eor=%s, trailers=%s", 
+                          m->id, io->id, io->eor? "yes" : "no", 
+                          trailers? "yes" : "no");
             status = h2_io_out_close(io, trailers);
             H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_close");
+            io_out_consumed_signal(m, io);
             
             have_out_data_for(m, stream_id);
         }
@@ -898,46 +928,6 @@ apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error)
     return status;
 }
 
-int h2_mplx_in_has_eos_for(h2_mplx *m, int stream_id)
-{
-    int has_eos = 0;
-    int acquired;
-    
-    apr_status_t status;
-    AP_DEBUG_ASSERT(m);
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
-        if (io && !io->orphaned) {
-            has_eos = h2_io_in_has_eos_for(io);
-        }
-        else {
-            has_eos = 1;
-        }
-        leave_mutex(m, acquired);
-    }
-    return has_eos;
-}
-
-int h2_mplx_in_has_data_for(h2_mplx *m, int stream_id)
-{
-    apr_status_t status;
-    int has_data = 0;
-    int acquired;
-    
-    AP_DEBUG_ASSERT(m);
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
-        if (io && !io->orphaned) {
-            has_data = h2_io_in_has_data(io);
-        }
-        else {
-            has_data = 0;
-        }
-        leave_mutex(m, acquired);
-    }
-    return has_data;
-}
-
 int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id)
 {
     apr_status_t status;
@@ -1027,7 +1017,7 @@ static h2_io *open_io(h2_mplx *m, int stream_id, const h2_request *request)
         m->spare_pool = NULL;
     }
     
-    io = h2_io_create(stream_id, io_pool, request);
+    io = h2_io_create(stream_id, io_pool, m->bucket_alloc, request);
     h2_io_set_add(m->stream_ios, io);
     
     return io;
@@ -1086,7 +1076,7 @@ static h2_task *pop_task(h2_mplx *m)
         else if (io) {
             conn_rec *slave = h2_slave_create(m->c, m->pool, m->spare_allocator);
             m->spare_allocator = NULL;
-            task = h2_task_create(m->id, io->request, slave, m);
+            io->task = task = h2_task_create(m->id, io->request, slave, m);
             apr_table_setn(slave->notes, H2_TASK_ID_NOTE, task->id);
             io->worker_started = 1;
             io->started_at = apr_time_now();
@@ -1123,7 +1113,7 @@ h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more)
     return task;
 }
 
-static void task_done(h2_mplx *m, h2_task *task)
+static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
 {
     if (task) {
         if (task->frozen) {
@@ -1135,7 +1125,7 @@ static void task_done(h2_mplx *m, h2_task *task)
              * bodies into the mplx. */
             /* FIXME: this implementation is incomplete. */
             h2_task_set_io_blocking(task, 0);
-            apr_thread_cond_broadcast(m->req_added);
+            apr_thread_cond_broadcast(m->task_thawed);
         }
         else {
             h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
@@ -1149,6 +1139,18 @@ static void task_done(h2_mplx *m, h2_task *task)
              * other mplx's. Perhaps leave after n requests? */
             h2_mplx_out_close(m, task->stream_id, NULL);
             
+            if (ngn && io) {
+                apr_off_t bytes = io->output_consumed + h2_io_out_length(io);
+                if (bytes > 0) {
+                    /* we need to report consumed and current buffered output
+                     * to the engine. The request will be streamed out or cancelled,
+                     * no more data is coming from it and the engine should update
+                     * its calculations before we destroy this information. */
+                    h2_req_engine_out_consumed(ngn, task->c, bytes);
+                    io->output_consumed = 0;
+                }
+            }
+            
             if (task->engine) {
                 if (!h2_req_engine_is_shutdown(task->engine)) {
                     ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
@@ -1159,14 +1161,6 @@ static void task_done(h2_mplx *m, h2_task *task)
                 h2_ngn_shed_done_ngn(m->ngn_shed, task->engine);
             }
             
-            if (m->spare_allocator) {
-                apr_allocator_destroy(m->spare_allocator);
-                m->spare_allocator = NULL;
-            }
-            
-            h2_slave_destroy(task->c, &m->spare_allocator);
-            task = NULL;
-            
             if (io) {
                 apr_time_t now = apr_time_now();
                 if (!io->orphaned && m->redo_ios
@@ -1208,9 +1202,14 @@ static void task_done(h2_mplx *m, h2_task *task)
                     }
                 }
                 else {
-                    /* hang around until the stream deregisteres */
+                    /* hang around until the stream deregisters */
                 }
             }
+            else {
+                ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+                              "h2_mplx(%ld): task %s without corresp. h2_io",
+                              m->id, task->id);
+            }
         }
     }
 }
@@ -1220,7 +1219,7 @@ void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask)
     int acquired;
     
     if (enter_mutex(m, &acquired) == APR_SUCCESS) {
-        task_done(m, task);
+        task_done(m, task, NULL);
         --m->workers_busy;
         if (ptask) {
             /* caller wants another task */
@@ -1373,8 +1372,37 @@ apr_status_t h2_mplx_idle(h2_mplx *m)
  * HTTP/2 request engines
  ******************************************************************************/
 
+typedef struct {
+    h2_mplx * m;
+    h2_req_engine *ngn;
+    int streams_updated;
+} ngn_update_ctx;
+
+static int ngn_update_window(void *ctx, h2_io *io)
+{
+    ngn_update_ctx *uctx = ctx;
+    if (io && io->task && io->task->assigned == uctx->ngn
+        && io_out_consumed_signal(uctx->m, io)) {
+        ++uctx->streams_updated;
+    }
+    return 1;
+}
+
+static apr_status_t ngn_out_update_windows(h2_mplx *m, h2_req_engine *ngn)
+{
+    ngn_update_ctx ctx;
+        
+    ctx.m = m;
+    ctx.ngn = ngn;
+    ctx.streams_updated = 0;
+    h2_io_set_iter(m->stream_ios, ngn_update_window, &ctx);
+    
+    return ctx.streams_updated? APR_SUCCESS : APR_EAGAIN;
+}
+
 apr_status_t h2_mplx_req_engine_push(const char *ngn_type, 
-                                     request_rec *r, h2_req_engine_init *einit)
+                                     request_rec *r,
+                                     http2_req_engine_init *einit)
 {
     apr_status_t status;
     h2_mplx *m;
@@ -1386,6 +1414,7 @@ apr_status_t h2_mplx_req_engine_push(const char *ngn_type,
         return APR_ECONNABORTED;
     }
     m = task->mplx;
+    task->r = r;
     
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
@@ -1393,8 +1422,7 @@ apr_status_t h2_mplx_req_engine_push(const char *ngn_type,
             status = APR_ECONNABORTED;
         }
         else {
-            status = h2_ngn_shed_push_req(m->ngn_shed, ngn_type, 
-                                          task, r, einit);
+            status = h2_ngn_shed_push_task(m->ngn_shed, ngn_type, task, einit);
         }
         leave_mutex(m, acquired);
     }
@@ -1409,30 +1437,37 @@ apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn,
     h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn);
     h2_mplx *m = h2_ngn_shed_get_ctx(shed);
     apr_status_t status;
+    h2_task *task = NULL;
     int acquired;
     
-    *pr = NULL;
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         int want_shutdown = (block == APR_BLOCK_READ);
+
+        /* Take this opportunity to update output consummation 
+         * for this engine */
+        ngn_out_update_windows(m, ngn);
+        
         if (want_shutdown && !h2_iq_empty(m->q)) {
             /* For a blocking read, check first if requests are to be
              * had and, if not, wait a short while before doing the
              * blocking, and if unsuccessful, terminating read.
              */
-            status = h2_ngn_shed_pull_req(shed, ngn, capacity, 1, pr);
+            status = h2_ngn_shed_pull_task(shed, ngn, capacity, 1, &task);
             if (APR_STATUS_IS_EAGAIN(status)) {
                 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
                               "h2_mplx(%ld): start block engine pull", m->id);
-                apr_thread_cond_timedwait(m->req_added, m->lock, 
+                apr_thread_cond_timedwait(m->task_thawed, m->lock, 
                                           apr_time_from_msec(20));
-                status = h2_ngn_shed_pull_req(shed, ngn, capacity, 1, pr);
+                status = h2_ngn_shed_pull_task(shed, ngn, capacity, 1, &task);
             }
         }
         else {
-            status = h2_ngn_shed_pull_req(shed, ngn, capacity, want_shutdown, pr);
+            status = h2_ngn_shed_pull_task(shed, ngn, capacity,
+                                           want_shutdown, &task);
         }
         leave_mutex(m, acquired);
     }
+    *pr = task? task->r : NULL;
     return status;
 }
  
@@ -1445,14 +1480,16 @@ void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn)
         int acquired;
 
         if (enter_mutex(m, &acquired) == APR_SUCCESS) {
+            ngn_out_update_windows(m, ngn);
             h2_ngn_shed_done_task(m->ngn_shed, ngn, task);
             if (task->engine) { 
                 /* cannot report that as done until engine returns */
             }
             else {
-                h2_task_output_close(task->output);
-                task_done(m, task);
+                task_done(m, task, ngn);
             }
+            /* Take this opportunity to update output consummation 
+             * for this engine */
             leave_mutex(m, acquired);
         }
     }
index a61a63891ae63b6dbb3b39c8083f163b5de8aeac..840f34b464efe90d1df0d5e86068619b80d89153 100644 (file)
@@ -67,6 +67,7 @@ struct h2_mplx {
     volatile int refs;
     conn_rec *c;
     apr_pool_t *pool;
+    apr_bucket_alloc_t *bucket_alloc;
 
     unsigned int aborted : 1;
     unsigned int need_registration : 1;
@@ -89,7 +90,7 @@ struct h2_mplx {
 
     apr_thread_mutex_t *lock;
     struct apr_thread_cond_t *added_output;
-    struct apr_thread_cond_t *req_added;
+    struct apr_thread_cond_t *task_thawed;
     struct apr_thread_cond_t *join_wait;
     
     apr_size_t stream_max_mem;
@@ -171,10 +172,6 @@ apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error);
  */
 int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id);
 
-/* Return != 0 iff the multiplexer has input data for the given stream. 
- */
-int h2_mplx_in_has_data_for(h2_mplx *m, int stream_id);
-
 /**
  * Waits on output data from any stream in this session to become available. 
  * Returns APR_TIMEUP if no data arrived in the given time.
@@ -238,20 +235,14 @@ apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block,
  * Appends data to the input of the given stream. Storage of input data is
  * not subject to flow control.
  */
-apr_status_t h2_mplx_in_write(h2_mplx *mplx, int stream_id, 
-                              apr_bucket_brigade *bb);
+apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id, 
+                              const char *data, apr_size_t len, int eos);
 
 /**
  * Closes the input for the given stream_id.
  */
 apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id);
 
-/**
- * Returns != 0 iff the input for the given stream has been closed. There
- * could still be data queued, but it can be read without blocking.
- */
-int h2_mplx_in_has_eos_for(h2_mplx *m, int stream_id);
-
 /**
  * Invoke the consumed callback for all streams that had bytes read since the 
  * last call to this function. If no stream had input data consumed, the 
@@ -414,12 +405,15 @@ apr_status_t h2_mplx_idle(h2_mplx *m);
  * h2_req_engine handling
  ******************************************************************************/
 
+typedef void h2_output_consumed(void *ctx, conn_rec *c, apr_off_t consumed);
 typedef apr_status_t h2_mplx_req_engine_init(struct h2_req_engine *engine, 
                                              const char *id, 
                                              const char *type,
                                              apr_pool_t *pool, 
                                              apr_uint32_t req_buffer_size,
-                                             request_rec *r);
+                                             request_rec *r,
+                                             h2_output_consumed **pconsumed,
+                                             void **pbaton);
 
 apr_status_t h2_mplx_req_engine_push(const char *ngn_type, 
                                      request_rec *r, 
index 5b97cf914d2bf81f2998929321e8b185354d0a43..3e8667aa23a205dfe06d4dfbcd1a3b4036407c45 100644 (file)
@@ -34,6 +34,7 @@
 #include "h2_ctx.h"
 #include "h2_h2.h"
 #include "h2_int_queue.h"
+#include "h2_mplx.h"
 #include "h2_response.h"
 #include "h2_request.h"
 #include "h2_task.h"
@@ -46,7 +47,6 @@ typedef struct h2_ngn_entry h2_ngn_entry;
 struct h2_ngn_entry {
     APR_RING_ENTRY(h2_ngn_entry) link;
     h2_task *task;
-    request_rec *r;
 };
 
 #define H2_NGN_ENTRY_NEXT(e)   APR_RING_NEXT((e), link)
@@ -84,6 +84,9 @@ struct h2_req_engine {
     apr_uint32_t no_assigned;  /* # of assigned requests */
     apr_uint32_t no_live;      /* # of live */
     apr_uint32_t no_finished;  /* # of finished */
+    
+    h2_output_consumed *out_consumed;
+    void *out_consumed_ctx;
 };
 
 const char *h2_req_engine_get_id(h2_req_engine *engine)
@@ -96,6 +99,14 @@ int h2_req_engine_is_shutdown(h2_req_engine *engine)
     return engine->shutdown;
 }
 
+void h2_req_engine_out_consumed(h2_req_engine *engine, conn_rec *c, 
+                                apr_off_t bytes)
+{
+    if (engine->out_consumed) {
+        engine->out_consumed(engine->out_consumed_ctx, c, bytes);
+    }
+}
+
 h2_ngn_shed *h2_ngn_shed_create(apr_pool_t *pool, conn_rec *c,
                                 apr_uint32_t default_capacity, 
                                 apr_uint32_t req_buffer_size)
@@ -132,26 +143,25 @@ void h2_ngn_shed_abort(h2_ngn_shed *shed)
     shed->aborted = 1;
 }
 
-static void ngn_add_req(h2_req_engine *ngn, h2_task *task, request_rec *r)
+static void ngn_add_task(h2_req_engine *ngn, h2_task *task)
 {
     h2_ngn_entry *entry = apr_pcalloc(task->c->pool, sizeof(*entry));
     APR_RING_ELEM_INIT(entry, link);
     entry->task = task;
-    entry->r = r;
     H2_REQ_ENTRIES_INSERT_TAIL(&ngn->entries, entry);
 }
 
 
-apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type, 
-                                  h2_task *task, request_rec *r, 
-                                  h2_req_engine_init *einit){
+apr_status_t h2_ngn_shed_push_task(h2_ngn_shed *shed, const char *ngn_type, 
+                                   h2_task *task, http2_req_engine_init *einit) 
+{
     h2_req_engine *ngn;
 
     AP_DEBUG_ASSERT(shed);
     
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c,
                   "h2_ngn_shed(%ld): PUSHing request (task=%s)", shed->c->id, 
-                  apr_table_get(r->connection->notes, H2_TASK_ID_NOTE));
+                  task->id);
     if (task->ser_headers) {
         /* Max compatibility, deny processing of this */
         return APR_EOF;
@@ -165,10 +175,10 @@ apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type,
                       "h2_ngn_shed(%ld): pushing request %s to %s", 
                       shed->c->id, task->id, ngn->id);
         if (!h2_task_is_detached(task)) {
-            h2_task_freeze(task, r);
+            h2_task_freeze(task);
         }
         /* FIXME: sometimes ngn is garbage, probly alread freed */
-        ngn_add_req(ngn, task, r);
+        ngn_add_task(ngn, task);
         ngn->no_assigned++;
         return APR_SUCCESS;
     }
@@ -191,7 +201,8 @@ apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type,
         APR_RING_INIT(&newngn->entries, h2_ngn_entry, link);
         
         status = einit(newngn, newngn->id, newngn->type, newngn->pool,
-                       shed->req_buffer_size, r);
+                       shed->req_buffer_size, task->r, 
+                       &newngn->out_consumed, &newngn->out_consumed_ctx);
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, task->c,
                       "h2_ngn_shed(%ld): create engine %s (%s)", 
                       shed->c->id, newngn->id, newngn->type);
@@ -199,6 +210,7 @@ apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type,
             AP_DEBUG_ASSERT(task->engine == NULL);
             newngn->task = task;
             task->engine = newngn;
+            task->assigned = newngn;
             apr_hash_set(shed->ngns, newngn->type, APR_HASH_KEY_STRING, newngn);
         }
         return status;
@@ -206,13 +218,17 @@ apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type,
     return APR_EOF;
 }
 
-static h2_ngn_entry *pop_non_frozen(h2_req_engine *ngn)
+static h2_ngn_entry *pop_detached(h2_req_engine *ngn)
 {
     h2_ngn_entry *entry;
     for (entry = H2_REQ_ENTRIES_FIRST(&ngn->entries);
          entry != H2_REQ_ENTRIES_SENTINEL(&ngn->entries);
          entry = H2_NGN_ENTRY_NEXT(entry)) {
-        if (!entry->task->frozen) {
+        if (h2_task_is_detached(entry->task) 
+            || (entry->task->engine == ngn)) {
+            /* The task hosting this engine can always be pulled by it.
+             * For other task, they need to become detached, e.g. no longer
+             * assigned to another worker. */
             H2_NGN_ENTRY_REMOVE(entry);
             return entry;
         }
@@ -220,16 +236,19 @@ static h2_ngn_entry *pop_non_frozen(h2_req_engine *ngn)
     return NULL;
 }
 
-apr_status_t h2_ngn_shed_pull_req(h2_ngn_shed *shed, 
-                                  h2_req_engine *ngn, 
-                                  apr_uint32_t capacity, 
-                                  int want_shutdown,
-                                  request_rec **pr)
+apr_status_t h2_ngn_shed_pull_task(h2_ngn_shed *shed, 
+                                   h2_req_engine *ngn, 
+                                   apr_uint32_t capacity, 
+                                   int want_shutdown,
+                                   h2_task **ptask)
 {   
     h2_ngn_entry *entry;
     
     AP_DEBUG_ASSERT(ngn);
-    *pr = NULL;
+    *ptask = NULL;
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c,
+                  "h2_ngn_shed(%ld): pull task for engine %s, shutdown=%d", 
+                  shed->c->id, ngn->id, want_shutdown);
     if (shed->aborted) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, shed->c,
                       "h2_ngn_shed(%ld): abort while pulling requests %s", 
@@ -249,20 +268,27 @@ apr_status_t h2_ngn_shed_pull_req(h2_ngn_shed *shed,
         return ngn->shutdown? APR_EOF : APR_EAGAIN;
     }
     
-    if ((entry = pop_non_frozen(ngn))) {
+    if ((entry = pop_detached(ngn))) {
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, entry->task->c,
                       "h2_ngn_shed(%ld): pulled request %s for engine %s", 
                       shed->c->id, entry->task->id, ngn->id);
         ngn->no_live++;
-        *pr = entry->r;
+        *ptask = entry->task;
+        entry->task->assigned = ngn;
         return APR_SUCCESS;
     }
+    
+    if (1) {
+        h2_ngn_entry *entry = H2_REQ_ENTRIES_FIRST(&ngn->entries);
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c,
+                      "h2_ngn_shed(%ld): pull task, nothing, first task %s", 
+                      shed->c->id, entry->task->id);
+    }
     return APR_EAGAIN;
 }
                                  
 static apr_status_t ngn_done_task(h2_ngn_shed *shed, h2_req_engine *ngn, 
-                                  h2_task *task, int waslive, int aborted, 
-                                  int close)
+                                  h2_task *task, int waslive, int aborted)
 {
     ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c,
                   "h2_ngn_shed(%ld): task %s %s by %s", 
@@ -271,16 +297,13 @@ static apr_status_t ngn_done_task(h2_ngn_shed *shed, h2_req_engine *ngn,
     if (waslive) ngn->no_live--;
     ngn->no_assigned--;
 
-    if (close) {
-        h2_task_output_close(task->output);
-    }
     return APR_SUCCESS;
 }
                                 
 apr_status_t h2_ngn_shed_done_task(h2_ngn_shed *shed, 
                                     struct h2_req_engine *ngn, h2_task *task)
 {
-    return ngn_done_task(shed, ngn, task, 1, 0, 0);
+    return ngn_done_task(shed, ngn, task, 1, 0);
 }
                                 
 void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn)
@@ -302,13 +325,12 @@ void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn)
         for (entry = H2_REQ_ENTRIES_FIRST(&ngn->entries);
              entry != H2_REQ_ENTRIES_SENTINEL(&ngn->entries);
              entry = H2_NGN_ENTRY_NEXT(entry)) {
-            request_rec *r = entry->r;
-            h2_task *task = h2_ctx_rget_task(r);
+            h2_task *task = entry->task;
             ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, shed->c,
                           "h2_ngn_shed(%ld): engine %s has queued task %s, "
                           "frozen=%d, aborting",
                           shed->c->id, ngn->id, task->id, task->frozen);
-            ngn_done_task(shed, ngn, task, 0, 1, 1);
+            ngn_done_task(shed, ngn, task, 0, 1);
         }
     }
     if (!shed->aborted && (ngn->no_assigned > 1 || ngn->no_live > 1)) {
index 3dc9e375ef10eb4d223deda589358e3671740eb3..832dbd3a8e95b53994bb4d2969b183f40b25ac55 100644 (file)
@@ -35,12 +35,17 @@ struct h2_ngn_shed {
 const char *h2_req_engine_get_id(h2_req_engine *engine);
 int h2_req_engine_is_shutdown(h2_req_engine *engine);
 
+void h2_req_engine_out_consumed(h2_req_engine *engine, conn_rec *c, 
+                                apr_off_t bytes);
+
 typedef apr_status_t h2_shed_ngn_init(h2_req_engine *engine, 
                                       const char *id, 
                                       const char *type,
                                       apr_pool_t *pool, 
                                       apr_uint32_t req_buffer_size,
-                                      request_rec *r);
+                                      request_rec *r,
+                                      h2_output_consumed **pconsumed,
+                                      void **pbaton);
 
 h2_ngn_shed *h2_ngn_shed_create(apr_pool_t *pool, conn_rec *c,
                                 apr_uint32_t default_capactiy, 
@@ -53,13 +58,13 @@ h2_ngn_shed *h2_ngn_shed_get_shed(struct h2_req_engine *ngn);
 
 void h2_ngn_shed_abort(h2_ngn_shed *shed);
 
-apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type, 
-                                  struct h2_task *task, request_rec *r, 
+apr_status_t h2_ngn_shed_push_task(h2_ngn_shed *shed, const char *ngn_type, 
+                                  struct h2_task *task, 
                                   h2_shed_ngn_init *init_cb);
 
-apr_status_t h2_ngn_shed_pull_req(h2_ngn_shed *shed, h2_req_engine *pub_ngn, 
-                                  apr_uint32_t capacity, 
-                                  int want_shutdown, request_rec **pr);
+apr_status_t h2_ngn_shed_pull_task(h2_ngn_shed *shed, h2_req_engine *pub_ngn, 
+                                   apr_uint32_t capacity, 
+                                   int want_shutdown, struct h2_task **ptask);
 
 apr_status_t h2_ngn_shed_done_task(h2_ngn_shed *shed, 
                                    struct h2_req_engine *ngn, 
index d99573850d5d50743b13ab9061b50ab4ddc13f89..4372353c134c895af0f793d43f0d28e69d6fc0f4 100644 (file)
@@ -235,8 +235,11 @@ static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags,
         }
         return 0;
     }
-    
-    status = h2_stream_write_data(stream, (const char *)data, len);
+
+    /* FIXME: enabling setting EOS this way seems to break input handling
+     * in mod_proxy_http2. why? */
+    status = h2_stream_write_data(stream, (const char *)data, len,
+                                  0 /*flags & NGHTTP2_FLAG_END_STREAM*/);
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
                   "h2_stream(%ld-%d): data_chunk_recv, written %ld bytes",
                   session->id, stream_id, (long)len);
@@ -683,7 +686,9 @@ static apr_status_t h2_session_shutdown(h2_session *session, int reason,
                           h2_mplx_get_max_stream_started(session->mplx), 
                           reason, (uint8_t*)err, err? strlen(err):0);
     status = nghttp2_session_send(session->ngh2);
-    h2_conn_io_pass(&session->io, 1);
+    if (status == APR_SUCCESS) {
+        status = h2_conn_io_flush(&session->io);
+    }
     ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03069)
                   "session(%ld): sent GOAWAY, err=%d, msg=%s", 
                   session->id, reason, err? err : "");
@@ -1015,7 +1020,6 @@ static apr_status_t h2_session_start(h2_session *session, int *rv)
         }
     }
     
-    h2_conn_io_pass(&session->io, 1);
     return status;
 }
 
@@ -1430,6 +1434,9 @@ apr_status_t h2_session_stream_destroy(h2_session *session, h2_stream *stream)
 {
     apr_pool_t *pool = h2_stream_detach_pool(stream);
 
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+                  "h2_stream(%ld-%d): cleanup by EOS bucket destroy", 
+                  session->id, stream->id);
     /* this may be called while the session has already freed
      * some internal structures or even when the mplx is locked. */
     if (session->mplx) {
@@ -1702,6 +1709,7 @@ static void h2_session_ev_init(h2_session *session, int arg, const char *msg)
 
 static void h2_session_ev_local_goaway(h2_session *session, int arg, const char *msg)
 {
+    session->local_shutdown = 1;
     switch (session->state) {
         case H2_SESSION_ST_LOCAL_SHUTDOWN:
             /* already did that? */
@@ -1954,15 +1962,20 @@ static const int MAX_WAIT_MICROS = 200 * 1000;
 
 static void update_child_status(h2_session *session, int status, const char *msg)
 {
-    apr_snprintf(session->status, sizeof(session->status),
-                 "%s, streams: %d/%d/%d/%d/%d (open/recv/resp/push/rst)", 
-                 msg? msg : "-",
-                 (int)h2_ihash_count(session->streams), 
-                 (int)session->requests_received,
-                 (int)session->responses_submitted,
-                 (int)session->pushes_submitted,
-                 (int)session->pushes_reset + session->streams_reset);
-    ap_update_child_status_descr(session->c->sbh, status, session->status);
+    /* Assume that we also change code/msg when something really happened and
+     * avoid updating the scoreboard in between */
+    if (session->last_status_code != status 
+        || session->last_status_msg != msg) {
+        apr_snprintf(session->status, sizeof(session->status),
+                     "%s, streams: %d/%d/%d/%d/%d (open/recv/resp/push/rst)", 
+                     msg? msg : "-",
+                     (int)h2_ihash_count(session->streams), 
+                     (int)session->requests_received,
+                     (int)session->responses_submitted,
+                     (int)session->pushes_submitted,
+                     (int)session->pushes_reset + session->streams_reset);
+        ap_update_child_status_descr(session->c->sbh, status, session->status);
+    }
 }
 
 apr_status_t h2_session_process(h2_session *session, int async)
@@ -2016,7 +2029,6 @@ apr_status_t h2_session_process(h2_session *session, int async)
                 update_child_status(session, (no_streams? SERVER_BUSY_KEEPALIVE
                                               : SERVER_BUSY_READ), "idle");
                 /* make certain, the client receives everything before we idle */
-                h2_conn_io_flush(&session->io);
                 if (!session->keep_sync_until 
                     && async && no_streams && !session->r && session->requests_received) {
                     ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
@@ -2093,10 +2105,12 @@ apr_status_t h2_session_process(h2_session *session, int async)
             case H2_SESSION_ST_LOCAL_SHUTDOWN:
             case H2_SESSION_ST_REMOTE_SHUTDOWN:
                 if (nghttp2_session_want_read(session->ngh2)) {
+                    ap_update_child_status(session->c->sbh, SERVER_BUSY_READ, NULL);
                     h2_filter_cin_timeout_set(session->cin, session->s->timeout);
                     status = h2_session_read(session, 0);
                     if (status == APR_SUCCESS) {
                         have_read = 1;
+                        update_child_status(session, SERVER_BUSY_READ, "busy");
                         dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
                     }
                     else if (status == APR_EAGAIN) {
@@ -2133,7 +2147,7 @@ apr_status_t h2_session_process(h2_session *session, int async)
                     }
                 }
                 
-                while (nghttp2_session_want_write(session->ngh2)) {
+                if (nghttp2_session_want_write(session->ngh2)) {
                     ap_update_child_status(session->c->sbh, SERVER_BUSY_WRITE, NULL);
                     status = h2_session_send(session);
                     if (status == APR_SUCCESS) {
@@ -2149,7 +2163,6 @@ apr_status_t h2_session_process(h2_session *session, int async)
                 if (have_read || have_written) {
                     if (session->wait_us) {
                         session->wait_us = 0;
-                        update_child_status(session, SERVER_BUSY_READ, "busy");
                     }
                 }
                 else if (!nghttp2_session_want_write(session->ngh2)) {
@@ -2180,8 +2193,6 @@ apr_status_t h2_session_process(h2_session *session, int async)
                                   "h2_session: wait for data, %ld micros", 
                                   (long)session->wait_us);
                 }
-                /* make certain, the client receives everything before we idle */
-                h2_conn_io_flush(&session->io);
                 status = h2_mplx_out_trywait(session->mplx, session->wait_us, 
                                              session->iowait);
                 if (status == APR_SUCCESS) {
@@ -2190,7 +2201,8 @@ apr_status_t h2_session_process(h2_session *session, int async)
                 }
                 else if (status == APR_TIMEUP) {
                     /* go back to checking all inputs again */
-                    transit(session, "wait cycle", H2_SESSION_ST_BUSY);
+                    transit(session, "wait cycle", session->local_shutdown? 
+                            H2_SESSION_ST_LOCAL_SHUTDOWN : H2_SESSION_ST_BUSY);
                 }
                 else {
                     ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, c,
@@ -2214,7 +2226,10 @@ apr_status_t h2_session_process(h2_session *session, int async)
                 break;
         }
 
-        h2_conn_io_pass(&session->io, 1);
+        status = h2_conn_io_flush(&session->io);
+        if (status != APR_SUCCESS) {
+            dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+        }
         if (!nghttp2_session_want_read(session->ngh2) 
                  && !nghttp2_session_want_write(session->ngh2)) {
             dispatch_event(session, H2_SESSION_EV_NGH2_DONE, 0, NULL); 
index fa98bf91869cc31c4c7a31e4f96a255f985998ea..566e79dee2a2932943c819a875074bdea4e99cd1 100644 (file)
@@ -85,6 +85,7 @@ typedef struct h2_session {
     unsigned int reprioritize  : 1; /* scheduled streams priority changed */
     unsigned int eoc_written   : 1; /* h2 eoc bucket written */
     unsigned int flush         : 1; /* flushing output necessary */
+    unsigned int local_shutdown: 1; /* GOAWAY has been sent by us */
     apr_interval_time_t  wait_us;   /* timout during BUSY_WAIT state, micro secs */
     
     int unsent_submits;             /* number of submitted, but not yet written responses. */
@@ -130,6 +131,8 @@ typedef struct h2_session {
     struct h2_push_diary *push_diary; /* remember pushes, avoid duplicates */
     
     char status[64];                /* status message for scoreboard */
+    int last_status_code;           /* the one already reported */
+    const char *last_status_msg;    /* the one already reported */
 } h2_session;
 
 
index 29df7afd82352d84990bbdaf89dda5d97cd12847..2b368b67cf04f2b6fd49bc827812b6f3ec8bb101 100644 (file)
 #include "h2_util.h"
 
 
-#define H2_STREAM_IN(lvl,s,msg) \
-    do { \
-        if (APLOG_C_IS_LEVEL((s)->session->c,lvl)) \
-        h2_util_bb_log((s)->session->c,(s)->id,lvl,msg,(s)->bbin); \
-    } while(0)
-    
-
 static int state_transition[][7] = {
     /*  ID OP RL RR CI CO CL */
 /*ID*/{  1, 0, 0, 0, 0, 0, 0 },
@@ -144,19 +137,13 @@ static int output_open(h2_stream *stream)
 
 static h2_sos *h2_sos_mplx_create(h2_stream *stream, h2_response *response);
 
-h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session)
+h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session)
 {
     h2_stream *stream = apr_pcalloc(pool, sizeof(h2_stream));
     stream->id        = id;
     stream->state     = H2_STREAM_ST_IDLE;
     stream->pool      = pool;
     stream->session   = session;
-    return stream;
-}
-
-h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session)
-{
-    h2_stream *stream = h2_stream_create(id, pool, session);
     set_state(stream, H2_STREAM_ST_OPEN);
     stream->request   = h2_request_create(id, pool, 
         h2_config_geti(session->config, H2_CONF_SER_HEADERS));
@@ -296,8 +283,6 @@ apr_status_t h2_stream_schedule(h2_stream *stream, int eos, int push_enabled,
     if (status == APR_SUCCESS) {
         if (!eos) {
             stream->request->body = 1;
-            stream->bbin = apr_brigade_create(stream->pool, 
-                                              stream->session->c->bucket_alloc);
         }
         stream->input_remaining = stream->request->content_length;
         
@@ -328,33 +313,6 @@ int h2_stream_is_scheduled(const h2_stream *stream)
     return stream->scheduled;
 }
 
-static apr_status_t h2_stream_input_flush(h2_stream *stream)
-{
-    apr_status_t status = APR_SUCCESS;
-    if (stream->bbin && !APR_BRIGADE_EMPTY(stream->bbin)) {
-
-        status = h2_mplx_in_write(stream->session->mplx, stream->id, stream->bbin);
-        if (status != APR_SUCCESS) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->mplx->c,
-                          "h2_stream(%ld-%d): flushing input data",
-                          stream->session->id, stream->id);
-        }
-    }
-    return status;
-}
-
-static apr_status_t input_flush(apr_bucket_brigade *bb, void *ctx) 
-{
-    (void)bb;
-    return h2_stream_input_flush(ctx);
-}
-
-static apr_status_t input_add_data(h2_stream *stream,
-                                   const char *data, size_t len)
-{
-    return apr_brigade_write(stream->bbin, input_flush, stream, data, len);
-}
-
 apr_status_t h2_stream_close_input(h2_stream *stream)
 {
     apr_status_t status = APR_SUCCESS;
@@ -368,28 +326,23 @@ apr_status_t h2_stream_close_input(h2_stream *stream)
         return APR_ECONNRESET;
     }
     
-    H2_STREAM_IN(APLOG_TRACE2, stream, "close_pre");
-    if (close_input(stream) && stream->bbin) {
-        status = h2_stream_input_flush(stream);
-        if (status == APR_SUCCESS) {
-            status = h2_mplx_in_close(stream->session->mplx, stream->id);
-        }
+    if (close_input(stream)) {
+        status = h2_mplx_in_close(stream->session->mplx, stream->id);
     }
-    H2_STREAM_IN(APLOG_TRACE2, stream, "close_post");
     return status;
 }
 
 apr_status_t h2_stream_write_data(h2_stream *stream,
-                                  const char *data, size_t len)
+                                  const char *data, size_t len, int eos)
 {
     apr_status_t status = APR_SUCCESS;
     
     AP_DEBUG_ASSERT(stream);
-    if (input_closed(stream) || !stream->request->eoh || !stream->bbin) {
+    if (input_closed(stream) || !stream->request->eoh) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
-                      "h2_stream(%ld-%d): writing denied, closed=%d, eoh=%d, bbin=%d", 
+                      "h2_stream(%ld-%d): writing denied, closed=%d, eoh=%d", 
                       stream->session->id, stream->id, input_closed(stream),
-                      stream->request->eoh, !!stream->bbin);
+                      stream->request->eoh);
         return APR_EINVAL;
     }
 
@@ -397,7 +350,6 @@ apr_status_t h2_stream_write_data(h2_stream *stream,
                   "h2_stream(%ld-%d): add %ld input bytes", 
                   stream->session->id, stream->id, (long)len);
 
-    H2_STREAM_IN(APLOG_TRACE2, stream, "write_data_pre");
     if (!stream->request->chunked) {
         stream->input_remaining -= len;
         if (stream->input_remaining < 0) {
@@ -413,11 +365,10 @@ apr_status_t h2_stream_write_data(h2_stream *stream,
         }
     }
     
-    status = input_add_data(stream, data, len);
-    if (status == APR_SUCCESS) {
-        status = h2_stream_input_flush(stream);
+    status = h2_mplx_in_write(stream->session->mplx, stream->id, data, len, eos);
+    if (eos) {
+        close_input(stream);
     }
-    H2_STREAM_IN(APLOG_TRACE2, stream, "write_data_post");
     return status;
 }
 
index 7d724259fa51c1989c7e128ce94ae2809a63342e..b7df632502c0fa79748f78ee99b89b35c18c65d6 100644 (file)
@@ -57,7 +57,6 @@ struct h2_stream {
     unsigned int submitted : 1; /* response HEADER has been sent */
     
     apr_off_t input_remaining;  /* remaining bytes on input as advertised via content-length */
-    apr_bucket_brigade *bbin;   /* input DATA */
 
     struct h2_sos *sos;         /* stream output source, e.g. to read output from */
     apr_off_t data_frames_sent; /* # of DATA frames sent out for this stream */
@@ -66,15 +65,6 @@ struct h2_stream {
 
 #define H2_STREAM_RST(s, def)    (s->rst_error? s->rst_error : (def))
 
-/**
- * Create a stream in IDLE state.
- * @param id      the stream identifier
- * @param pool    the memory pool to use for this stream
- * @param session the session this stream belongs to
- * @return the newly created IDLE stream
- */
-h2_stream *h2_stream_create(int id, apr_pool_t *pool, struct h2_session *session);
-
 /**
  * Create a stream in OPEN state.
  * @param id      the stream identifier
@@ -155,7 +145,7 @@ apr_status_t h2_stream_close_input(h2_stream *stream);
  * @param len the number of bytes to write
  */
 apr_status_t h2_stream_write_data(h2_stream *stream,
-                                  const char *data, size_t len);
+                                  const char *data, size_t len, int eos);
 
 /**
  * Reset the stream. Stream write/reads will return errors afterwards.
index 7b1aa8df67eeab362e67b5abd3497cd6213b9aa7..b722f5281e5850bec902031aa11e8774a222fbf1 100644 (file)
@@ -86,27 +86,6 @@ static apr_status_t h2_filter_read_response(ap_filter_t* f,
     return h2_from_h1_read_response(task->output->from_h1, f, bb);
 }
 
-static apr_status_t h2_response_freeze_filter(ap_filter_t* f,
-                                              apr_bucket_brigade* bb)
-{
-    h2_task *task = f->ctx;
-    AP_DEBUG_ASSERT(task);
-    
-    if (task->frozen) {
-        ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, f->r,
-                      "h2_response_freeze_filter, saving");
-        return ap_save_brigade(f, &task->output->frozen_bb, &bb, task->c->pool);
-    }
-    
-    if (APR_BRIGADE_EMPTY(bb)) {
-        return APR_SUCCESS;
-    }
-
-    ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, f->r,
-                  "h2_response_freeze_filter, passing");
-    return ap_pass_brigade(f->next, bb);
-}
-
 /*******************************************************************************
  * Register various hooks
  */
@@ -141,8 +120,6 @@ void h2_task_register_hooks(void)
                               NULL, AP_FTYPE_PROTOCOL);
     ap_register_output_filter("H2_TRAILERS", h2_response_trailers_filter,
                               NULL, AP_FTYPE_PROTOCOL);
-    ap_register_output_filter("H2_RESPONSE_FREEZE", h2_response_freeze_filter,
-                              NULL, AP_FTYPE_RESOURCE);
 }
 
 /* post config init */
@@ -314,15 +291,11 @@ static int h2_task_process_conn(conn_rec* c)
     return DECLINED;
 }
 
-apr_status_t h2_task_freeze(h2_task *task, request_rec *r)
+apr_status_t h2_task_freeze(h2_task *task)
 {   
     if (!task->frozen) {
-        conn_rec *c = task->c;
-        
         task->frozen = 1;
-        task->output->frozen_bb = apr_brigade_create(c->pool, c->bucket_alloc);
-        ap_add_output_filter("H2_RESPONSE_FREEZE", task, r, r->connection);
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, 
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->c, 
                       "h2_task(%s), frozen", task->id);
     }
     return APR_SUCCESS;
index c4c1c13d1dc0f3a3afe0a5c4c10aec94fe27ce0f..fd3e8c9b39cbedae9a005b4c837294f3821db5cf 100644 (file)
@@ -66,7 +66,9 @@ struct h2_task {
     struct h2_task_output *output;
     struct apr_thread_cond_t *io;   /* used to wait for events on */
     
-    struct h2_req_engine *engine;
+    struct h2_req_engine *engine;   /* engine hosted by this task */
+    struct h2_req_engine *assigned; /* engine that task has been assigned to */
+    request_rec *r;                 /* request being processed in this task */
 };
 
 h2_task *h2_task_create(long session_id, const struct h2_request *req, 
@@ -83,7 +85,7 @@ apr_status_t h2_task_init(apr_pool_t *pool, server_rec *s);
 extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_in) *h2_task_logio_add_bytes_in;
 extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_out) *h2_task_logio_add_bytes_out;
 
-apr_status_t h2_task_freeze(h2_task *task, request_rec *r);
+apr_status_t h2_task_freeze(h2_task *task);
 apr_status_t h2_task_thaw(h2_task *task);
 int h2_task_is_detached(h2_task *task);
 
index 025c13987321cf573dc008951a50775fb6a766b3..1ff08484c9e42b5d1d0a3830a2bac5bbcee3066f 100644 (file)
@@ -42,9 +42,6 @@ h2_task_output *h2_task_output_create(h2_task *task, conn_rec *c)
         output->task = task;
         output->state = H2_TASK_OUT_INIT;
         output->from_h1 = h2_from_h1_create(task->stream_id, c->pool);
-        if (!output->from_h1) {
-            return NULL;
-        }
     }
     return output;
 }
@@ -66,47 +63,43 @@ static apr_table_t *get_trailers(h2_task_output *output)
     return NULL;
 }
 
-static apr_status_t open_if_needed(h2_task_output *output, ap_filter_t *f,
-                                   apr_bucket_brigade *bb, const char *caller)
+static apr_status_t open_response(h2_task_output *output, ap_filter_t *f,
+                                  apr_bucket_brigade *bb, const char *caller)
 {
-    if (output->state == H2_TASK_OUT_INIT) {
-        h2_response *response;
-        output->state = H2_TASK_OUT_STARTED;
-        response = h2_from_h1_get_response(output->from_h1);
-        if (!response) {
-            if (f) {
-                /* This happens currently when ap_die(status, r) is invoked
-                 * by a read request filter. */
-                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, output->c, APLOGNO(03204)
-                              "h2_task_output(%s): write without response by %s "
-                              "for %s %s %s",
-                              output->task->id, caller, 
-                              output->task->request->method, 
-                              output->task->request->authority, 
-                              output->task->request->path);
-                output->c->aborted = 1;
-            }
-            if (output->task->io) {
-                apr_thread_cond_broadcast(output->task->io);
-            }
-            return APR_ECONNABORTED;
+    h2_response *response;
+    response = h2_from_h1_get_response(output->from_h1);
+    if (!response) {
+        if (f) {
+            /* This happens currently when ap_die(status, r) is invoked
+             * by a read request filter. */
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, output->c, APLOGNO(03204)
+                          "h2_task_output(%s): write without response by %s "
+                          "for %s %s %s",
+                          output->task->id, caller, 
+                          output->task->request->method, 
+                          output->task->request->authority, 
+                          output->task->request->path);
+            output->c->aborted = 1;
         }
-        
-        if (h2_task_logio_add_bytes_out) {
-            /* counter headers as if we'd do a HTTP/1.1 serialization */
-            output->written = h2_util_table_bytes(response->headers, 3)+1;
-            h2_task_logio_add_bytes_out(output->c, output->written);
+        if (output->task->io) {
+            apr_thread_cond_broadcast(output->task->io);
         }
-        get_trailers(output);
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, output->c, APLOGNO(03348)
-                      "h2_task(%s): open response to %s %s %s",
-                      output->task->id, output->task->request->method, 
-                      output->task->request->authority, 
-                      output->task->request->path);
-        return h2_mplx_out_open(output->task->mplx, output->task->stream_id, 
-                                response, f, bb, output->task->io);
+        return APR_ECONNABORTED;
+    }
+    
+    if (h2_task_logio_add_bytes_out) {
+        /* count headers as if we'd do a HTTP/1.1 serialization */
+        output->written = h2_util_table_bytes(response->headers, 3)+1;
+        h2_task_logio_add_bytes_out(output->c, output->written);
     }
-    return APR_SUCCESS;
+    get_trailers(output);
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, output->c, APLOGNO(03348)
+                  "h2_task(%s): open response to %s %s %s",
+                  output->task->id, output->task->request->method, 
+                  output->task->request->authority, 
+                  output->task->request->path);
+    return h2_mplx_out_open(output->task->mplx, output->task->stream_id, 
+                            response, f, bb, output->task->io);
 }
 
 static apr_status_t write_brigade_raw(h2_task_output *output, 
@@ -145,7 +138,7 @@ static apr_status_t write_brigade_raw(h2_task_output *output,
 apr_status_t h2_task_output_write(h2_task_output *output,
                                   ap_filter_t* f, apr_bucket_brigade* bb)
 {
-    apr_status_t status;
+    apr_status_t status = APR_SUCCESS;
     
     if (APR_BRIGADE_EMPTY(bb)) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, output->c,
@@ -155,15 +148,17 @@ apr_status_t h2_task_output_write(h2_task_output *output,
     
     if (output->task->frozen) {
         h2_util_bb_log(output->c, output->task->stream_id, APLOG_TRACE2,
-                       "frozen task output write", bb);
-        return ap_save_brigade(f, &output->frozen_bb, &bb, output->c->pool);
+                       "frozen task output write, ignored", bb);
+        return APR_SUCCESS;
     }
     
-    status = open_if_needed(output, f, bb, "write");
+    if (output->state == H2_TASK_OUT_INIT) {
+        status = open_response(output, f, bb, "write");
+        output->state = H2_TASK_OUT_STARTED;
+    }
     
     /* Attempt to write saved brigade first */
-    if (status == APR_SUCCESS && output->bb 
-        && !APR_BRIGADE_EMPTY(output->bb)) {
+    if (status == APR_SUCCESS && output->bb && !APR_BRIGADE_EMPTY(output->bb)) {
         status = write_brigade_raw(output, f, output->bb);
     }
     
@@ -188,20 +183,3 @@ apr_status_t h2_task_output_write(h2_task_output *output,
     return status;
 }
 
-void h2_task_output_close(h2_task_output *output)
-{
-    if (output->task->frozen) {
-        return;
-    }
-    open_if_needed(output, NULL, NULL, "close");
-    if (output->state != H2_TASK_OUT_DONE) {
-        if (output->frozen_bb && !APR_BRIGADE_EMPTY(output->frozen_bb)) {
-            h2_mplx_out_write(output->task->mplx, output->task->stream_id, 
-                NULL, 1, output->frozen_bb, NULL, NULL);
-        }
-        output->state = H2_TASK_OUT_DONE;
-        h2_mplx_out_close(output->task->mplx, output->task->stream_id, 
-                          get_trailers(output));
-    }
-}
-
index 26326f0908b33e8f4e4459a4bd783cc7630823db..76705820725357b15a5cde5b9539dad83dec2289 100644 (file)
@@ -44,7 +44,6 @@ struct h2_task_output {
 
     apr_off_t written;
     apr_bucket_brigade *bb;
-    apr_bucket_brigade *frozen_bb;
 };
 
 h2_task_output *h2_task_output_create(struct h2_task *task, conn_rec *c);
@@ -53,8 +52,6 @@ apr_status_t h2_task_output_write(h2_task_output *output,
                                   ap_filter_t* filter,
                                   apr_bucket_brigade* brigade);
 
-void h2_task_output_close(h2_task_output *output);
-
 apr_status_t h2_task_output_freeze(h2_task_output *output);
 apr_status_t h2_task_output_thaw(h2_task_output *output);
 
index 904349658c32fa8e9bb20ea910a3d52f02465d0c..e84a4aa72f3f9473c6c8821528c834ec3df8cd19 100644 (file)
@@ -14,7 +14,6 @@
  */
 
 #include <assert.h>
-
 #include <apr_strings.h>
 
 #include <httpd.h>
@@ -537,6 +536,7 @@ apr_status_t h2_util_move(apr_bucket_brigade *to, apr_bucket_brigade *from,
                 else {
                     const char *data;
                     apr_size_t len;
+
                     status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
                     if (status == APR_SUCCESS && len > 0) {
                         status = apr_brigade_write(to, NULL, NULL, data, len);
@@ -635,20 +635,6 @@ apr_status_t h2_util_copy(apr_bucket_brigade *to, apr_bucket_brigade *from,
     return status;
 }
 
-int h2_util_has_flush_or_eos(apr_bucket_brigade *bb)
-{
-    apr_bucket *b;
-    for (b = APR_BRIGADE_FIRST(bb);
-         b != APR_BRIGADE_SENTINEL(bb);
-         b = APR_BUCKET_NEXT(b))
-    {
-        if (APR_BUCKET_IS_EOS(b) || APR_BUCKET_IS_FLUSH(b)) {
-            return 1;
-        }
-    }
-    return 0;
-}
-
 int h2_util_has_eos(apr_bucket_brigade *bb, apr_off_t len)
 {
     apr_bucket *b, *end;
@@ -949,6 +935,27 @@ apr_status_t h2_transfer_brigade(apr_bucket_brigade *to,
     return APR_SUCCESS;
 }
 
+apr_off_t h2_brigade_mem_size(apr_bucket_brigade *bb)
+{
+    apr_bucket *b;
+    apr_off_t total = 0;
+
+    for (b = APR_BRIGADE_FIRST(bb);
+         b != APR_BRIGADE_SENTINEL(bb);
+         b = APR_BUCKET_NEXT(b))
+    {
+        total += sizeof(*b);
+        if (b->length > 0) {
+            if (APR_BUCKET_IS_HEAP(b)
+                || APR_BUCKET_IS_POOL(b)) {
+                total += b->length;
+            }
+        }
+    }
+    return total;
+}
+
+
 /*******************************************************************************
  * h2_ngheader
  ******************************************************************************/
index 4fffabb959696fe105c4f561b1d8dfbbed452610..a83f362ffccd010af98ad4c1d61e69943d0737b5 100644 (file)
@@ -196,7 +196,6 @@ apr_status_t h2_util_copy(apr_bucket_brigade *to, apr_bucket_brigade *from,
  * @param bb the brigade to check on
  * @return != 0 iff brigade holds FLUSH or EOS bucket (or both)
  */
-int h2_util_has_flush_or_eos(apr_bucket_brigade *bb);
 int h2_util_has_eos(apr_bucket_brigade *bb, apr_off_t len);
 int h2_util_bb_has_data(apr_bucket_brigade *bb);
 int h2_util_bb_has_data_or_eos(apr_bucket_brigade *bb);
@@ -257,4 +256,13 @@ apr_status_t h2_transfer_brigade(apr_bucket_brigade *to,
                                  apr_off_t *plen,
                                  int *peos);
 
+/**
+ * Get an approximnation of the memory footprint of the given
+ * brigade. This varies from apr_brigade_length as
+ * - no buckets are ever read
+ * - only buckets known to allocate memory (HEAP+POOL) are counted
+ * - the bucket struct itself is counted
+ */
+apr_off_t h2_brigade_mem_size(apr_bucket_brigade *bb);
+
 #endif /* defined(__mod_h2__h2_util__) */
index ddb5f3de5625d8aa2d7db3b93ac97d3e630cb148..70f8c790efabaf9beb4a024906a0cb4feb5dcd10 100644 (file)
@@ -26,7 +26,7 @@
  * @macro
  * Version number of the http2 module as c string
  */
-#define MOD_HTTP2_VERSION "1.4.1"
+#define MOD_HTTP2_VERSION "1.4.2"
 
 /**
  * @macro
@@ -34,7 +34,7 @@
  * release. This is a 24 bit number with 8 bits for major number, 8 bits
  * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
  */
-#define MOD_HTTP2_VERSION_NUM 0x010401
+#define MOD_HTTP2_VERSION_NUM 0x010402
 
 
 #endif /* mod_h2_h2_version_h */
index c3b01733a95d61b5a6d69db96b90360bdf3bc0d6..6450eb9ea01b900478aae2f408597bc5d348c8a2 100644 (file)
@@ -130,7 +130,7 @@ static int http2_is_h2(conn_rec *);
 
 static apr_status_t http2_req_engine_push(const char *ngn_type, 
                                           request_rec *r, 
-                                          h2_req_engine_init *einit)
+                                          http2_req_engine_init *einit)
 {
     return h2_mplx_req_engine_push(ngn_type, r, einit);
 }
index c5cfe704e3a279df6401b9408cb7d2b5a1aa2368..3073579282fba7c176d88eb269babc24fb1b9915 100644 (file)
@@ -36,6 +36,8 @@ struct apr_thread_cond_t;
 
 typedef struct h2_req_engine h2_req_engine;
 
+typedef void http2_output_consumed(void *ctx, conn_rec *c, apr_off_t consumed);
+
 /**
  * Initialize a h2_req_engine. The structure will be passed in but
  * only the name and master are set. The function should initialize
@@ -43,12 +45,14 @@ typedef struct h2_req_engine h2_req_engine;
  * @param engine the allocated, partially filled structure
  * @param r      the first request to process, or NULL
  */
-typedef apr_status_t h2_req_engine_init(h2_req_engine *engine, 
-                                        const char *id, 
-                                        const char *type,
-                                        apr_pool_t *pool, 
-                                        apr_uint32_t req_buffer_size,
-                                        request_rec *r);
+typedef apr_status_t http2_req_engine_init(h2_req_engine *engine, 
+                                           const char *id, 
+                                           const char *type,
+                                           apr_pool_t *pool, 
+                                           apr_uint32_t req_buffer_size,
+                                           request_rec *r,
+                                           http2_output_consumed **pconsumed,
+                                           void **pbaton);
 
 /**
  * Push a request to an engine with the specified name for further processing.
@@ -66,7 +70,7 @@ typedef apr_status_t h2_req_engine_init(h2_req_engine *engine,
 APR_DECLARE_OPTIONAL_FN(apr_status_t, 
                         http2_req_engine_push, (const char *engine_type, 
                                                 request_rec *r,
-                                                h2_req_engine_init *einit));
+                                                http2_req_engine_init *einit));
 
 /**
  * Get a new request for processing in this engine.