]> granicus.if.org Git - apache/commitdiff
mod_http2: backport of v1.5.3
authorStefan Eissing <icing@apache.org>
Wed, 4 May 2016 13:58:02 +0000 (13:58 +0000)
committerStefan Eissing <icing@apache.org>
Wed, 4 May 2016 13:58:02 +0000 (13:58 +0000)
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/branches/2.4.x@1742288 13f79535-47bb-0310-9956-ffa450edef68

20 files changed:
CHANGES
modules/http2/NWGNUmod_http2
modules/http2/h2_bucket_beam.c
modules/http2/h2_conn_io.c
modules/http2/h2_conn_io.h
modules/http2/h2_ctx.c
modules/http2/h2_mplx.c
modules/http2/h2_mplx.h
modules/http2/h2_push.c
modules/http2/h2_request.c
modules/http2/h2_request.h
modules/http2/h2_session.c
modules/http2/h2_session.h
modules/http2/h2_stream.c
modules/http2/h2_stream.h
modules/http2/h2_task.c
modules/http2/h2_task.h
modules/http2/h2_util.c
modules/http2/h2_util.h
modules/http2/h2_version.h

diff --git a/CHANGES b/CHANGES
index 97f1ae47c814f92e40910c28c806d826a8e6b765..d1c103f19bd49265d6b839c99758f1250c724ac0 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -2,6 +2,26 @@
 
 Changes with Apache 2.4.21
 
+  *) mod_http2: slave connections have conn_rec->aborted flag set when a stream
+     has been reset by the client. [Stefan Eissing]
+     
+  *) mod_http2: merge of some 2.4.x adaptions re filters on slave connections.
+     Small fixes in bucket beams when forwarding file buckets. Output handling
+     on master connection uses less FLUSH and passes automatically when more
+     than half of H2StreamMaxMemSize bytes have accumulated.
+     Workaround for http: when forwarding partial file buckets to keep the
+     output filter from closing these too early. [Stefan Eissing]
+     
+  *) mod_http2: elimination of fixed master connectin buffer for TLS 
+     connections. New scratch bucket handling optimized for TLS write sizes. 
+     File bucket data read directly into scratch buffers, avoiding one
+     copy. Non-TLS connections continue to pass buckets unchanged to the core
+     filters to allow sendfile() usage. [Stefan Eissing]
+  
+  *) mod_http2/mod_proxy_http2: h2_request.c is no longer shared between these
+     modules. This simplifies building on platforms such as Windows, as module
+     reference used in logging is now clear. [Stefan Eissing]
+     
   *) Scoreboard: Fix a regression in 2.4.20 that causes wrong request data
      to be displayed on the status page. PR 59333. [Yann Ylavic, William Rowe]
 
index dbab0359f64dcf199f1ea950047b51164cbe3e12..5a6a60b36fa3ba34760188296746bfa4a63b5d42 100644 (file)
@@ -367,8 +367,10 @@ $(OBJDIR)/mod_http2.imp : NWGNUmod_http2
        @echo $(DL) h2_iq_remove,$(DL) >> $@
        @echo $(DL) h2_log2,$(DL) >> $@
        @echo $(DL) h2_proxy_res_ignore_header,$(DL) >> $@
-       @echo $(DL) h2_request_create,$(DL) >> $@
-       @echo $(DL) h2_request_make,$(DL) >> $@
+       @echo $(DL) h2_headers_add_h1,$(DL) >> $@
+       @echo $(DL) h2_req_create,$(DL) >> $@
+       @echo $(DL) h2_req_createn,$(DL) >> $@
+       @echo $(DL) h2_req_make,$(DL) >> $@
        @echo $(DL) h2_util_camel_case_header,$(DL) >> $@
        @echo $(DL) h2_util_frame_print,$(DL) >> $@
        @echo $(DL) h2_util_ngheader_make_req,$(DL) >> $@
index 2e9f4b1f39098162dc214102af0ecd78e0af8ad0..65f9906a10bace775d6a073cd4ce4c2590789f3e 100644 (file)
@@ -204,8 +204,12 @@ apr_size_t h2_util_bl_print(char *buffer, apr_size_t bmax,
 
 static apr_status_t enter_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl)
 {
-    if (beam->m_enter) {
-        return beam->m_enter(beam->m_ctx, pbl);
+    h2_beam_mutex_enter *enter = beam->m_enter;
+    if (enter) {
+        void *ctx = beam->m_ctx;
+        if (ctx) {
+            return enter(ctx, pbl);
+        }
     }
     pbl->mutex = NULL;
     pbl->leave = NULL;
@@ -535,6 +539,9 @@ apr_status_t h2_beam_shutdown(h2_bucket_beam *beam, apr_read_type_e block)
                 status = APR_EAGAIN;
                 break;
             }
+            if (beam->m_cond) {
+                apr_thread_cond_broadcast(beam->m_cond);
+            }
             status = wait_cond(beam, bl.mutex);
         }
         leave_yellow(beam, &bl);
@@ -716,6 +723,9 @@ apr_status_t h2_beam_receive(h2_bucket_beam *beam,
     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
 transfer:
         if (beam->aborted) {
+            if (!APR_BRIGADE_EMPTY(beam->green)) {
+                apr_brigade_cleanup(beam->green);
+            }
             status = APR_ECONNABORTED;
             goto leave;
         }
@@ -781,6 +791,10 @@ transfer:
 #endif
                 remain -= bred->length;
                 ++transferred;
+                APR_BUCKET_REMOVE(bred);
+                H2_BLIST_INSERT_TAIL(&beam->hold, bred);
+                ++transferred;
+                continue;
             }
             else {
                 /* create a "green" standin bucket. we took care about the
index 2e0a5f2d690241a56e39d9453c6cd96a75077d6a..fb679ad3decac0c2400572840bd7727b56134330 100644 (file)
@@ -45,7 +45,6 @@
  * which seems to create less TCP packets overall
  */
 #define WRITE_SIZE_MAX        (TLS_DATA_MAX - 100) 
-#define WRITE_BUFFER_SIZE     (5*WRITE_SIZE_MAX)
 
 
 static void h2_conn_io_bb_log(conn_rec *c, int stream_id, int level, 
@@ -127,22 +126,13 @@ static void h2_conn_io_bb_log(conn_rec *c, int stream_id, int level,
 }
 
 apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c, 
-                             const h2_config *cfg, 
-                             apr_pool_t *pool)
+                             const h2_config *cfg)
 {
     io->c             = c;
-    io->output        = apr_brigade_create(pool, c->bucket_alloc);
-    io->buflen        = 0;
+    io->output        = apr_brigade_create(c->pool, c->bucket_alloc);
     io->is_tls        = h2_h2_is_tls(c);
     io->buffer_output = io->is_tls;
-    
-    if (io->buffer_output) {
-        io->bufsize = WRITE_BUFFER_SIZE;
-        io->buffer = apr_pcalloc(pool, io->bufsize);
-    }
-    else {
-        io->bufsize = 0;
-    }
+    io->pass_threshold = h2_config_geti64(cfg, H2_CONF_STREAM_MAX_MEM) / 2;
     
     if (io->is_tls) {
         /* This is what we start with, 
@@ -151,12 +141,13 @@ apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c,
         io->warmup_size    = h2_config_geti64(cfg, H2_CONF_TLS_WARMUP_SIZE);
         io->cooldown_usecs = (h2_config_geti(cfg, H2_CONF_TLS_COOLDOWN_SECS) 
                               * APR_USEC_PER_SEC);
-        io->write_size     = WRITE_SIZE_INITIAL; 
+        io->write_size     = (io->cooldown_usecs > 0? 
+                              WRITE_SIZE_INITIAL : WRITE_SIZE_MAX); 
     }
     else {
         io->warmup_size    = 0;
         io->cooldown_usecs = 0;
-        io->write_size     = io->bufsize;
+        io->write_size     = 0;
     }
 
     if (APLOGctrace1(c)) {
@@ -170,54 +161,94 @@ apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c,
     return APR_SUCCESS;
 }
 
-int h2_conn_io_is_buffered(h2_conn_io *io)
+#define LOG_SCRATCH 0
+
+static void append_scratch(h2_conn_io *io) 
 {
-    return io->bufsize > 0;
+    if (io->scratch && io->slen > 0) {
+        apr_bucket *b = apr_bucket_heap_create(io->scratch, io->slen,
+                                               apr_bucket_free,
+                                               io->c->bucket_alloc);
+        APR_BRIGADE_INSERT_TAIL(io->output, b);
+#if LOG_SCRATCH
+        ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, io->c,
+                      "h2_conn_io(%ld): append_scratch(%ld)", 
+                      io->c->id, (long)io->slen);
+#endif
+        io->scratch = NULL;
+        io->slen = io->ssize = 0;
+    }
 }
 
-typedef struct {
-    conn_rec *c;
-    h2_conn_io *io;
-} pass_out_ctx;
-
-static apr_status_t pass_out(apr_bucket_brigade *bb, void *ctx) 
+static apr_size_t assure_scratch_space(h2_conn_io *io) {
+    apr_size_t remain = io->ssize - io->slen; 
+    if (io->scratch && remain == 0) {
+        append_scratch(io);
+    }
+    if (!io->scratch) {
+        /* we control the size and it is larger than what buckets usually
+         * allocate. */
+        io->scratch = apr_bucket_alloc(io->write_size, io->c->bucket_alloc);
+        io->ssize = io->write_size;
+        io->slen = 0;
+        remain = io->ssize;
+    }
+    return remain;
+}
+    
+static apr_status_t read_to_scratch(h2_conn_io *io, apr_bucket *b)
 {
-    pass_out_ctx *pctx = ctx;
-    conn_rec *c = pctx->c;
     apr_status_t status;
-    apr_off_t bblen;
+    const char *data;
+    apr_size_t len;
     
-    if (APR_BRIGADE_EMPTY(bb)) {
+    if (!b->length) {
         return APR_SUCCESS;
     }
     
-    ap_update_child_status(c->sbh, SERVER_BUSY_WRITE, NULL);
-    apr_brigade_length(bb, 0, &bblen);
-    h2_conn_io_bb_log(c, 0, APLOG_TRACE2, "master conn pass", bb);
-    status = ap_pass_brigade(c->output_filters, bb);
-    if (status == APR_SUCCESS && pctx->io) {
-        pctx->io->bytes_written += (apr_size_t)bblen;
-        pctx->io->last_write = apr_time_now();
+    AP_DEBUG_ASSERT(b->length <= (io->ssize - io->slen));
+    if (APR_BUCKET_IS_FILE(b)) {
+        apr_bucket_file *f = (apr_bucket_file *)b->data;
+        apr_file_t *fd = f->fd;
+        apr_off_t offset = b->start;
+        apr_size_t len = b->length;
+        
+        /* file buckets will either mmap (which we do not want) or
+         * read 8000 byte chunks and split themself. However, we do
+         * know *exactly* how many bytes we need where.
+         */
+        status = apr_file_seek(fd, APR_SET, &offset);
+        if (status != APR_SUCCESS) {
+            return status;
+        }
+        status = apr_file_read(fd, io->scratch + io->slen, &len);
+#if LOG_SCRATCH
+        ap_log_cerror(APLOG_MARK, APLOG_INFO, status, io->c,
+                      "h2_conn_io(%ld): FILE_to_scratch(%ld)", 
+                      io->c->id, (long)len); 
+#endif
+        if (status != APR_SUCCESS && status != APR_EOF) {
+            return status;
+        }
+        io->slen += len;
     }
-    if (status != APR_SUCCESS) {
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c, APLOGNO(03044)
-                      "h2_conn_io(%ld): pass_out brigade %ld bytes",
-                      c->id, (long)bblen);
+    else {
+        status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
+        if (status == APR_SUCCESS) {
+#if LOG_SCRATCH
+            ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, io->c,
+                          "h2_conn_io(%ld): read_to_scratch(%ld)", 
+                          io->c->id, (long)b->length); 
+#endif
+            memcpy(io->scratch+io->slen, data, len);
+            io->slen += len;
+        }
     }
-    apr_brigade_cleanup(bb);
     return status;
 }
 
-/* Bring the current buffer content into the output brigade, appropriately
- * chunked.
- */
-static apr_status_t bucketeer_buffer(h2_conn_io *io)
+static void check_write_size(h2_conn_io *io) 
 {
-    const char *data = io->buffer;
-    apr_size_t remaining = io->buflen;
-    apr_bucket *b;
-    int bcount, i;
-
     if (io->write_size > WRITE_SIZE_INITIAL 
         && (io->cooldown_usecs > 0)
         && (apr_time_now() - io->last_write) >= io->cooldown_usecs) {
@@ -236,134 +267,156 @@ static apr_status_t bucketeer_buffer(h2_conn_io *io)
                       "h2_conn_io(%ld): threshold reached, write size now %ld", 
                       (long)io->c->id, (long)io->write_size);
     }
-    
-    bcount = (int)(remaining / io->write_size);
-    for (i = 0; i < bcount; ++i) {
-        b = apr_bucket_transient_create(data, io->write_size, 
-                                        io->output->bucket_alloc);
-        APR_BRIGADE_INSERT_TAIL(io->output, b);
-        data += io->write_size;
-        remaining -= io->write_size;
-    }
-    
-    if (remaining > 0) {
-        b = apr_bucket_transient_create(data, remaining, 
-                                        io->output->bucket_alloc);
-        APR_BRIGADE_INSERT_TAIL(io->output, b);
-    }
-    return APR_SUCCESS;
 }
 
-apr_status_t h2_conn_io_writeb(h2_conn_io *io, apr_bucket *b, int flush)
+static apr_status_t pass_output(h2_conn_io *io, int flush, int eoc)
 {
-    APR_BRIGADE_INSERT_TAIL(io->output, b);
+    conn_rec *c = io->c;
+    apr_bucket *b;
+    apr_off_t bblen;
+    apr_status_t status;
+    
+    append_scratch(io);
     if (flush) {
-        b = apr_bucket_flush_create(io->c->bucket_alloc);
+        b = apr_bucket_flush_create(c->bucket_alloc);
         APR_BRIGADE_INSERT_TAIL(io->output, b);
     }
-    return APR_SUCCESS;
-}
-
-static apr_status_t h2_conn_io_flush_int(h2_conn_io *io, int flush, int eoc)
-{
-    pass_out_ctx ctx;
-    apr_bucket *b;
     
-    if (io->buflen == 0 && APR_BRIGADE_EMPTY(io->output)) {
+    if (APR_BRIGADE_EMPTY(io->output)) {
         return APR_SUCCESS;
     }
-        
-    if (io->buflen > 0) {
-        /* something in the buffer, put it in the output brigade */
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c,
-                      "h2_conn_io: flush, flushing %ld bytes", 
-                      (long)io->buflen);
-        bucketeer_buffer(io);
-    }
-    
-    if (flush) {
-        b = apr_bucket_flush_create(io->c->bucket_alloc);
-        APR_BRIGADE_INSERT_TAIL(io->output, b);
-    }
     
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c, "h2_conn_io: flush");
-    io->buflen = 0;
-    ctx.c = io->c;
-    ctx.io = eoc? NULL : io;
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, c, "h2_conn_io: pass_output");
+    ap_update_child_status(c->sbh, SERVER_BUSY_WRITE, NULL);
+    apr_brigade_length(io->output, 0, &bblen);
     
-    return pass_out(io->output, &ctx);
-    /* no more access after this, as we might have flushed an EOC bucket
+    h2_conn_io_bb_log(c, 0, APLOG_TRACE2, "master conn pass", io->output);
+    status = ap_pass_brigade(c->output_filters, io->output);
+
+    /* careful with access after this, as we might have flushed an EOC bucket
      * that de-allocated us all. */
+    if (!eoc) {
+        apr_brigade_cleanup(io->output);
+        if (status == APR_SUCCESS) {
+            io->bytes_written += (apr_size_t)bblen;
+            io->last_write = apr_time_now();
+        }
+    }
+    
+    if (status != APR_SUCCESS) {
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c, APLOGNO(03044)
+                      "h2_conn_io(%ld): pass_out brigade %ld bytes",
+                      c->id, (long)bblen);
+    }
+    return status;
 }
 
 apr_status_t h2_conn_io_flush(h2_conn_io *io)
 {
-    return h2_conn_io_flush_int(io, 1, 0);
-}
-
-apr_status_t h2_conn_io_consider_pass(h2_conn_io *io)
-{
-    apr_off_t len = 0;
-    
-    if (!APR_BRIGADE_EMPTY(io->output)) {
-        len = h2_brigade_mem_size(io->output);
-    }
-    len += io->buflen;
-    if (len >= WRITE_BUFFER_SIZE) {
-        return h2_conn_io_flush_int(io, 1, 0);
-    }
-    return APR_SUCCESS;
+    return pass_output(io, 1, 0);
 }
 
 apr_status_t h2_conn_io_write_eoc(h2_conn_io *io, h2_session *session)
 {
     apr_bucket *b = h2_bucket_eoc_create(io->c->bucket_alloc, session);
     APR_BRIGADE_INSERT_TAIL(io->output, b);
-    return h2_conn_io_flush_int(io, 1, 1);
+    return pass_output(io, 1, 1);
 }
 
-apr_status_t h2_conn_io_write(h2_conn_io *io, 
-                              const char *buf, size_t length)
+apr_status_t h2_conn_io_write(h2_conn_io *io, const char *data, size_t length)
 {
     apr_status_t status = APR_SUCCESS;
-    pass_out_ctx ctx;
+    apr_size_t remain;
     
-    ctx.c = io->c;
-    ctx.io = io;
-    if (io->bufsize > 0) {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c,
-                      "h2_conn_io: buffering %ld bytes", (long)length);
-                      
-        if (!APR_BRIGADE_EMPTY(io->output)) {
-            status = h2_conn_io_flush_int(io, 0, 0);
-        }
-        
-        while (length > 0 && (status == APR_SUCCESS)) {
-            apr_size_t avail = io->bufsize - io->buflen;
-            if (avail <= 0) {
-                status = h2_conn_io_flush_int(io, 0, 0);
-            }
-            else if (length > avail) {
-                memcpy(io->buffer + io->buflen, buf, avail);
-                io->buflen += avail;
-                length -= avail;
-                buf += avail;
+    if (io->buffer_output) {
+        while (length > 0) {
+            remain = assure_scratch_space(io);
+            if (remain >= length) {
+#if LOG_SCRATCH
+                ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, io->c,
+                              "h2_conn_io(%ld): write_to_scratch(%ld)", 
+                              io->c->id, (long)length); 
+#endif
+                memcpy(io->scratch + io->slen, data, length);
+                io->slen += length;
+                length = 0;
             }
             else {
-                memcpy(io->buffer + io->buflen, buf, length);
-                io->buflen += length;
-                length = 0;
-                break;
+#if LOG_SCRATCH
+                ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, io->c,
+                              "h2_conn_io(%ld): write_to_scratch(%ld)", 
+                              io->c->id, (long)remain); 
+#endif
+                memcpy(io->scratch + io->slen, data, remain);
+                io->slen += remain;
+                data += remain;
+                length -= remain;
             }
         }
-        
     }
     else {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE4, status, io->c,
-                      "h2_conn_io: writing %ld bytes to brigade", (long)length);
-        status = apr_brigade_write(io->output, pass_out, &ctx, buf, length);
+        status = apr_brigade_write(io->output, NULL, NULL, data, length);
+    }
+    return status;
+}
+
+apr_status_t h2_conn_io_pass(h2_conn_io *io, apr_bucket_brigade *bb)
+{
+    apr_bucket *b;
+    apr_status_t status = APR_SUCCESS;
+    
+    check_write_size(io);
+    while (!APR_BRIGADE_EMPTY(bb) && status == APR_SUCCESS) {
+        b = APR_BRIGADE_FIRST(bb);
+        
+        if (APR_BUCKET_IS_METADATA(b)) {
+            /* need to finish any open scratch bucket, as meta data 
+             * needs to be forward "in order". */
+            append_scratch(io);
+            APR_BUCKET_REMOVE(b);
+            APR_BRIGADE_INSERT_TAIL(io->output, b);
+        }
+        else if (io->buffer_output) {
+            apr_size_t remain = assure_scratch_space(io);
+            if (b->length > remain) {
+                apr_bucket_split(b, remain);
+                if (io->slen == 0) {
+                    /* complete write_size bucket, append unchanged */
+                    APR_BUCKET_REMOVE(b);
+                    APR_BRIGADE_INSERT_TAIL(io->output, b);
+#if LOG_SCRATCH
+                    ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, io->c,
+                                  "h2_conn_io(%ld): pass bucket(%ld)", 
+                                  io->c->id, (long)b->length);
+#endif
+                    continue;
+                }
+            }
+            else {
+                /* bucket fits in remain, copy to scratch */
+                read_to_scratch(io, b);
+                apr_bucket_delete(b);
+                continue;
+            }
+        }
+        else {
+            /* no buffering, forward buckets setaside on flush */
+            if (APR_BUCKET_IS_TRANSIENT(b)) {
+                apr_bucket_setaside(b, io->c->pool);
+            }
+            APR_BUCKET_REMOVE(b);
+            APR_BRIGADE_INSERT_TAIL(io->output, b);
+        }
     }
     
+    if (status == APR_SUCCESS) {
+        if (!APR_BRIGADE_EMPTY(io->output)) {
+            apr_off_t len = h2_brigade_mem_size(io->output);
+            if (len >= io->pass_threshold) {
+                return pass_output(io, 0, 0);
+            }
+        }
+    }
     return status;
 }
 
index c397e9f608ea87277f33965a8867bb6a472a3f70..4ccf007086e483be8f7855dc18f51c1715f3c0e6 100644 (file)
@@ -39,16 +39,15 @@ typedef struct {
     apr_int64_t bytes_written;
     
     int buffer_output;
-    char *buffer;
-    apr_size_t buflen;
-    apr_size_t bufsize;
+    apr_size_t pass_threshold;
+    
+    char *scratch;
+    apr_size_t ssize;
+    apr_size_t slen;
 } h2_conn_io;
 
 apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c, 
-                             const struct h2_config *cfg, 
-                             apr_pool_t *pool);
-
-int h2_conn_io_is_buffered(h2_conn_io *io);
+                             const struct h2_config *cfg);
 
 /**
  * Append data to the buffered output.
@@ -59,12 +58,7 @@ apr_status_t h2_conn_io_write(h2_conn_io *io,
                          const char *buf,
                          size_t length);
 
-/**
- * Append a bucket to the buffered output.
- * @param io the connection io
- * @param b the bucket to append
- */
-apr_status_t h2_conn_io_writeb(h2_conn_io *io, apr_bucket *b, int flush);
+apr_status_t h2_conn_io_pass(h2_conn_io *io, apr_bucket_brigade *bb);
 
 /**
  * Append an End-Of-Connection bucket to the output that, once destroyed,
@@ -79,11 +73,4 @@ apr_status_t h2_conn_io_write_eoc(h2_conn_io *io, struct h2_session *session);
  */
 apr_status_t h2_conn_io_flush(h2_conn_io *io);
 
-/**
- * Check the amount of buffered output and pass it on if enough has accumulated.
- * @param io the connection io
- * @param flush if a flush bucket should be appended to any output
- */
-apr_status_t h2_conn_io_consider_pass(h2_conn_io *io);
-
 #endif /* defined(__mod_h2__h2_conn_io__) */
index 8b786b94d943e6c96a654dd87dabccf25a3a07d4..4b596a3d78f25ae2978aa197e2f8b387f7081dc4 100644 (file)
@@ -23,7 +23,6 @@
 #include "h2_session.h"
 #include "h2_task.h"
 #include "h2_ctx.h"
-#include "h2_private.h"
 
 static h2_ctx *h2_ctx_create(const conn_rec *c)
 {
index 5fd23ae0f8efb4e489d015b1e512dcaecc35ff5f..3ae02f4fd2ebdf56319403be47670aad4372c7cf 100644 (file)
@@ -90,6 +90,7 @@ static apr_status_t enter_mutex(h2_mplx *m, int *pacquired)
      * This allow recursive entering of the mutex from the saem thread,
      * which is what we need in certain situations involving callbacks
      */
+    AP_DEBUG_ASSERT(m);
     apr_threadkey_private_get(&mutex, thread_lock);
     if (mutex == m->lock) {
         *pacquired = 0;
@@ -167,6 +168,7 @@ static int can_beam_file(void *ctx, h2_bucket_beam *beam,  apr_file_t *file)
 }
 
 static void have_out_data_for(h2_mplx *m, int stream_id);
+static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master);
 
 static void check_tx_reservation(h2_mplx *m) 
 {
@@ -189,6 +191,29 @@ static void check_tx_free(h2_mplx *m)
     }
 }
 
+static int purge_stream(void *ctx, void *val) 
+{
+    h2_mplx *m = ctx;
+    h2_stream *stream = val;
+    h2_task *task = h2_ihash_get(m->tasks, stream->id);
+    h2_ihash_remove(m->spurge, stream->id);
+    h2_stream_destroy(stream);
+    if (task) {
+        task_destroy(m, task, 1);
+    }
+    return 0;
+}
+
+static void purge_streams(h2_mplx *m)
+{
+    if (!h2_ihash_empty(m->spurge)) {
+        while(!h2_ihash_iter(m->spurge, purge_stream, m)) {
+            /* repeat until empty */
+        }
+        h2_ihash_clear(m->spurge);
+    }
+}
+
 static void h2_mplx_destroy(h2_mplx *m)
 {
     AP_DEBUG_ASSERT(m);
@@ -257,6 +282,8 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
         m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
 
         m->streams = h2_ihash_create(m->pool, offsetof(h2_stream,id));
+        m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id));
+        m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id));
         m->q = h2_iq_create(m->pool, m->max_streams);
         m->tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id));
         m->ready_tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id));
@@ -294,10 +321,10 @@ apr_uint32_t h2_mplx_shutdown(h2_mplx *m)
     return max_stream_started;
 }
 
-static void input_consumed_signal(h2_mplx *m, h2_task *task)
+static void input_consumed_signal(h2_mplx *m, h2_stream *stream)
 {
-    if (task->input.beam && task->worker_started) {
-        h2_beam_send(task->input.beam, NULL, 0); /* trigger updates */
+    if (stream->input) {
+        h2_beam_send(stream->input, NULL, 0); /* trigger updates */
     }
 }
 
@@ -310,12 +337,14 @@ static int output_consumed_signal(h2_mplx *m, h2_task *task)
 }
 
 
-static void task_destroy(h2_mplx *m, h2_task *task, int events)
+static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master)
 {
     conn_rec *slave = NULL;
     int reuse_slave = 0;
     apr_status_t status;
     
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, 
+                  "h2_task(%s): destroy", task->id);
     /* cleanup any buffered input */
     status = h2_task_shutdown(task, 0);
     if (status != APR_SUCCESS){
@@ -323,18 +352,17 @@ static void task_destroy(h2_mplx *m, h2_task *task, int events)
                       "h2_task(%s): shutdown", task->id);
     }
     
-    if (events) {
+    if (called_from_master) {
         /* Process outstanding events before destruction */
-        input_consumed_signal(m, task);
+        h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
+        if (stream) {
+            input_consumed_signal(m, stream);
+        }
     }
     
     /* The pool is cleared/destroyed which also closes all
      * allocated file handles. Give this count back to our
      * file handle pool. */
-    if (task->input.beam) {
-        m->tx_handles_reserved += 
-        h2_beam_get_files_beamed(task->input.beam);
-    }
     if (task->output.beam) {
         m->tx_handles_reserved += 
         h2_beam_get_files_beamed(task->output.beam);
@@ -368,47 +396,68 @@ static void stream_done(h2_mplx *m, h2_stream *stream, int rst_error)
 {
     h2_task *task;
     
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, 
+                  "h2_stream(%ld-%d): done", m->c->id, stream->id);
+    /* Situation: we are, on the master connection, done with processing
+     * the stream. Either we have handled it successfully, or the stream
+     * was reset by the client or the connection is gone and we are 
+     * shutting down the whole session.
+     *
+     * We possibly have created a task for this stream to be processed
+     * on a slave connection. The processing might actually be ongoing
+     * right now or has already finished. A finished task waits for its
+     * stream to be done. This is the common case.
+     * 
+     * If the stream had input (e.g. the request had a body), a task
+     * may have read, or is still reading buckets from the input beam.
+     * This means that the task is referencing memory from the stream's
+     * pool (or the master connection bucket alloc). Before we can free
+     * the stream pool, we need to make sure that those references are
+     * gone. This is what h2_beam_shutdown() on the input waits for.
+     *
+     * With the input handled, we can tear down that beam and care
+     * about the output beam. The stream might still have buffered some
+     * buckets read from the output, so we need to get rid of those. That
+     * is done by h2_stream_cleanup().
+     *
+     * Now it is save to destroy the task (if it exists and is finished).
+     * 
+     * FIXME: we currently destroy the stream, even if the task is still
+     * ongoing. This is not ok, since task->request is coming from stream
+     * memory. We should either copy it on task creation or wait with the
+     * stream destruction until the task is done. 
+     */
+    h2_iq_remove(m->q, stream->id);
+    h2_ihash_remove(m->ready_tasks, stream->id);
     h2_ihash_remove(m->streams, stream->id);
     if (stream->input) {
-        apr_status_t status;
-        status = h2_beam_shutdown(stream->input, APR_NONBLOCK_READ);
-        if (status == APR_EAGAIN) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
-                          "h2_stream(%ld-%d): wait on input shutdown", 
-                          m->id, stream->id);
-            status = h2_beam_shutdown(stream->input, APR_BLOCK_READ);
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c, 
-                          "h2_stream(%ld-%d): input shutdown returned", 
-                          m->id, stream->id);
-        }
+        m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input);
     }
+    h2_stream_cleanup(stream);
 
     task = h2_ihash_get(m->tasks, stream->id);
     if (task) {
-        /* Remove task from ready set, we will never submit it */
-        h2_ihash_remove(m->ready_tasks, stream->id);
-        
-        if (task->worker_done) {
-            /* already finished or not even started yet */
-            h2_iq_remove(m->q, task->stream_id);
-            task_destroy(m, task, 0);
-        }
-        else {
+        if (!task->worker_done) {
             /* task still running, cleanup once it is done */
-            task->orphaned = 1;
-            task->input.beam = NULL; 
             if (rst_error) {
                 h2_task_rst(task, rst_error);
             }
+            /* FIXME: this should work, but does not 
+            h2_ihash_add(m->shold, stream);
+            return;*/
+            task->input.beam = NULL;
+        }
+        else {
+            /* already finished */
+            task_destroy(m, task, 0);
         }
     }
+    h2_stream_destroy(stream);
 }
 
 static int stream_done_iter(void *ctx, void *val)
 {
-    h2_stream *stream = val;
     stream_done((h2_mplx*)ctx, val, 0);
-    h2_stream_destroy(stream);
     return 0;
 }
 
@@ -416,6 +465,7 @@ static int task_print(void *ctx, void *val)
 {
     h2_mplx *m = ctx;
     h2_task *task = val;
+    h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
     if (task->request) {
         ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
                       "->03198: h2_stream(%s): %s %s %s -> %s %d"
@@ -424,7 +474,7 @@ static int task_print(void *ctx, void *val)
                       task->request->authority, task->request->path,
                       task->response? "http" : (task->rst_error? "reset" : "?"),
                       task->response? task->response->http_status : task->rst_error,
-                      task->orphaned, task->worker_started, 
+                      (stream? 0 : 1), task->worker_started, 
                       task->worker_done);
     }
     else if (task) {
@@ -451,6 +501,17 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
         /* disable WINDOW_UPDATE callbacks */
         h2_mplx_set_consumed_cb(m, NULL, NULL);
         
+        if (!h2_ihash_empty(m->shold)) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+                          "h2_mplx(%ld): start release_join with %d streams in hold", 
+                          m->id, (int)h2_ihash_count(m->shold));
+        }
+        if (!h2_ihash_empty(m->spurge)) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+                          "h2_mplx(%ld): start release_join with %d streams to purge", 
+                          m->id, (int)h2_ihash_count(m->spurge));
+        }
+        
         h2_iq_clear(m->q);
         apr_thread_cond_broadcast(m->task_thawed);
         while (!h2_ihash_iter(m->streams, stream_done_iter, m)) {
@@ -458,19 +519,25 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
         }
         AP_DEBUG_ASSERT(h2_ihash_empty(m->streams));
     
+        if (!h2_ihash_empty(m->shold)) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+                          "h2_mplx(%ld): 2. release_join with %d streams in hold", 
+                          m->id, (int)h2_ihash_count(m->shold));
+        }
+        if (!h2_ihash_empty(m->spurge)) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+                          "h2_mplx(%ld): 2. release_join with %d streams to purge", 
+                          m->id, (int)h2_ihash_count(m->spurge));
+        }
+        
         /* If we still have busy workers, we cannot release our memory
-         * pool yet, as slave connections have child pools of their respective
-         * h2_io's.
-         * Any remaining ios are processed in these workers. Any operation 
-         * they do on their input/outputs will be errored ECONNRESET/ABORTED, 
-         * so processing them should fail and workers *should* return.
+         * pool yet, as tasks have references to us.
+         * Any operation on the task slave connection will from now on
+         * be errored ECONNRESET/ABORTED, so processing them should fail 
+         * and workers *should* return in a timely fashion.
          */
         for (i = 0; m->workers_busy > 0; ++i) {
             m->join_wait = wait;
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                          "h2_mplx(%ld): release_join, waiting on %d tasks to report back", 
-                          m->id, (int)h2_ihash_count(m->tasks));
-                          
             status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs));
             
             if (APR_STATUS_IS_TIMEUP(status)) {
@@ -494,9 +561,22 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
             }
         }
         
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
-                      "h2_mplx(%ld): release_join (%d tasks left) -> destroy", 
-                      m->id, (int)h2_ihash_count(m->tasks));
+        AP_DEBUG_ASSERT(h2_ihash_empty(m->shold));
+        if (!h2_ihash_empty(m->spurge)) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+                          "h2_mplx(%ld): release_join %d streams to purge", 
+                          m->id, (int)h2_ihash_count(m->spurge));
+            purge_streams(m);
+        }
+        AP_DEBUG_ASSERT(h2_ihash_empty(m->spurge));
+        AP_DEBUG_ASSERT(h2_ihash_empty(m->tasks));
+        
+        if (!h2_ihash_empty(m->tasks)) {
+            ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03056)
+                          "h2_mplx(%ld): release_join -> destroy, "
+                          "%d tasks still present", 
+                          m->id, (int)h2_ihash_count(m->tasks));
+        }
         leave_mutex(m, acquired);
         h2_mplx_destroy(m);
         /* all gone */
@@ -516,24 +596,17 @@ void h2_mplx_abort(h2_mplx *m)
     }
 }
 
-apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error)
+apr_status_t h2_mplx_stream_done(h2_mplx *m, h2_stream *stream)
 {
     apr_status_t status = APR_SUCCESS;
     int acquired;
     
-    /* This maybe called from inside callbacks that already hold the lock.
-     * E.g. when we are streaming out DATA and the EOF triggers the stream
-     * release.
-     */
     AP_DEBUG_ASSERT(m);
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_stream *stream = h2_ihash_get(m->streams, stream_id);
-        if (stream) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
-                          "h2_mplx(%ld-%d): marking stream as done.", 
-                          m->id, stream_id);
-            stream_done(m, stream, rst_error);
-        }
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
+                      "h2_mplx(%ld-%d): marking stream as done.", 
+                      m->id, stream->id);
+        stream_done(m, stream, stream->rst_error);
         leave_mutex(m, acquired);
     }
     return status;
@@ -547,8 +620,7 @@ void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx)
 
 static int update_window(void *ctx, void *val)
 {
-    h2_mplx *m = ctx;
-    input_consumed_signal(m, val);
+    input_consumed_signal(ctx, val);
     return 1;
 }
 
@@ -562,7 +634,7 @@ apr_status_t h2_mplx_in_update_windows(h2_mplx *m)
         return APR_ECONNABORTED;
     }
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_ihash_iter(m->tasks, update_window, m);
+        h2_ihash_iter(m->streams, update_window, m);
         
         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
                       "h2_session(%ld): windows updated", m->id);
@@ -580,7 +652,7 @@ static int task_iter_first(void *ctx, void *val)
     return 0;
 }
 
-h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams)
+h2_stream *h2_mplx_next_submit(h2_mplx *m)
 {
     apr_status_t status;
     h2_stream *stream = NULL;
@@ -597,7 +669,7 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams)
             h2_task *task = ctx.task;
             
             h2_ihash_remove(m->ready_tasks, task->stream_id);
-            stream = h2_ihash_get(streams, task->stream_id);
+            stream = h2_ihash_get(m->streams, task->stream_id);
             if (stream && task) {
                 task->submitted = 1;
                 if (task->rst_error) {
@@ -618,16 +690,14 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams)
                               "h2_mplx(%s): stream for response closed, "
                               "resetting io to close request processing",
                               task->id);
-                task->orphaned = 1;
                 h2_task_rst(task, H2_ERR_STREAM_CLOSED);
                 if (!task->worker_started || task->worker_done) {
                     task_destroy(m, task, 1);
                 }
                 else {
                     /* hang around until the h2_task is done, but
-                     * shutdown input/output and send out any events asap. */
+                     * shutdown output */
                     h2_task_shutdown(task, 0);
-                    input_consumed_signal(m, task);
                 }
             }
         }
@@ -640,8 +710,9 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response)
 {
     apr_status_t status = APR_SUCCESS;
     h2_task *task = h2_ihash_get(m->tasks, stream_id);
+    h2_stream *stream = h2_ihash_get(m->streams, stream_id);
     
-    if (!task || task->orphaned) {
+    if (!task || !stream) {
         return APR_ECONNABORTED;
     }
     
@@ -691,8 +762,9 @@ apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response)
 static apr_status_t out_close(h2_mplx *m, h2_task *task)
 {
     apr_status_t status = APR_SUCCESS;
+    h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
     
-    if (!task || task->orphaned) {
+    if (!task || !stream) {
         return APR_ECONNABORTED;
     }
     
@@ -835,6 +907,7 @@ static h2_task *pop_task(h2_mplx *m)
             }
             
             slave->sbh = m->c->sbh;
+            slave->aborted = 0;
             task = h2_task_create(slave, stream->request, stream->input, m);
             h2_ihash_add(m->tasks, task);
             
@@ -888,93 +961,115 @@ h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more)
 
 static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
 {
-    if (task) {
-        if (task->frozen) {
-            /* this task was handed over to an engine for processing 
-             * and the original worker has finished. That means the 
-             * engine may start processing now. */
-            h2_task_thaw(task);
-            /* we do not want the task to block on writing response
-             * bodies into the mplx. */
-            /* FIXME: this implementation is incomplete. */
-            h2_task_set_io_blocking(task, 0);
-            apr_thread_cond_broadcast(m->task_thawed);
-            return;
-        }
-        else {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                          "h2_mplx(%ld): task(%s) done", m->id, task->id);
-            out_close(m, task);
-            
-            if (ngn) {
-                apr_off_t bytes = 0;
-                if (task->output.beam) {
-                    h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
-                    bytes += h2_beam_get_buffered(task->output.beam);
-                }
-                if (bytes > 0) {
-                    /* we need to report consumed and current buffered output
-                     * to the engine. The request will be streamed out or cancelled,
-                     * no more data is coming from it and the engine should update
-                     * its calculations before we destroy this information. */
-                    h2_req_engine_out_consumed(ngn, task->c, bytes);
-                }
+    if (task->frozen) {
+        /* this task was handed over to an engine for processing 
+         * and the original worker has finished. That means the 
+         * engine may start processing now. */
+        h2_task_thaw(task);
+        /* we do not want the task to block on writing response
+         * bodies into the mplx. */
+        h2_task_set_io_blocking(task, 0);
+        apr_thread_cond_broadcast(m->task_thawed);
+        return;
+    }
+    else {
+        h2_stream *stream;
+        
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                      "h2_mplx(%ld): task(%s) done", m->id, task->id);
+        out_close(m, task);
+        stream = h2_ihash_get(m->streams, task->stream_id);
+        
+        if (ngn) {
+            apr_off_t bytes = 0;
+            if (task->output.beam) {
+                h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
+                bytes += h2_beam_get_buffered(task->output.beam);
             }
-            
-            if (task->engine) {
-                if (!h2_req_engine_is_shutdown(task->engine)) {
-                    ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
-                                  "h2_mplx(%ld): task(%s) has not-shutdown "
-                                  "engine(%s)", m->id, task->id, 
-                                  h2_req_engine_get_id(task->engine));
-                }
-                h2_ngn_shed_done_ngn(m->ngn_shed, task->engine);
+            if (bytes > 0) {
+                /* we need to report consumed and current buffered output
+                 * to the engine. The request will be streamed out or cancelled,
+                 * no more data is coming from it and the engine should update
+                 * its calculations before we destroy this information. */
+                h2_req_engine_out_consumed(ngn, task->c, bytes);
             }
-            
-            if (!m->aborted && !task->orphaned && m->redo_tasks
-                && h2_ihash_get(m->redo_tasks, task->stream_id)) {
-                /* reset and schedule again */
-                h2_task_redo(task);
-                h2_ihash_remove(m->redo_tasks, task->stream_id);
-                h2_iq_add(m->q, task->stream_id, NULL, NULL);
-                return;
+        }
+        
+        if (task->engine) {
+            if (!h2_req_engine_is_shutdown(task->engine)) {
+                ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+                              "h2_mplx(%ld): task(%s) has not-shutdown "
+                              "engine(%s)", m->id, task->id, 
+                              h2_req_engine_get_id(task->engine));
             }
-            
-            task->worker_done = 1;
-            task->done_at = apr_time_now();
-            if (task->output.beam) {
-                h2_beam_on_consumed(task->output.beam, NULL, NULL);
-                h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL);
+            h2_ngn_shed_done_ngn(m->ngn_shed, task->engine);
+        }
+        
+        if (!m->aborted && stream && m->redo_tasks
+            && h2_ihash_get(m->redo_tasks, task->stream_id)) {
+            /* reset and schedule again */
+            h2_task_redo(task);
+            h2_ihash_remove(m->redo_tasks, task->stream_id);
+            h2_iq_add(m->q, task->stream_id, NULL, NULL);
+            return;
+        }
+        
+        task->worker_done = 1;
+        task->done_at = apr_time_now();
+        if (task->output.beam) {
+            h2_beam_on_consumed(task->output.beam, NULL, NULL);
+            h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL);
+        }
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+                      "h2_mplx(%s): request done, %f ms elapsed", task->id, 
+                      (task->done_at - task->started_at) / 1000.0);
+        if (task->started_at > m->last_idle_block) {
+            /* this task finished without causing an 'idle block', e.g.
+             * a block by flow control.
+             */
+            if (task->done_at- m->last_limit_change >= m->limit_change_interval
+                && m->workers_limit < m->workers_max) {
+                /* Well behaving stream, allow it more workers */
+                m->workers_limit = H2MIN(m->workers_limit * 2, 
+                                         m->workers_max);
+                m->last_limit_change = task->done_at;
+                m->need_registration = 1;
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                              "h2_mplx(%ld): increase worker limit to %d",
+                              m->id, m->workers_limit);
             }
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                          "h2_mplx(%s): request done, %f ms"
-                          " elapsed", task->id, 
-                          (task->done_at - task->started_at) / 1000.0);
-            if (task->started_at > m->last_idle_block) {
-                /* this task finished without causing an 'idle block', e.g.
-                 * a block by flow control.
-                 */
-                if (task->done_at- m->last_limit_change >= m->limit_change_interval
-                    && m->workers_limit < m->workers_max) {
-                    /* Well behaving stream, allow it more workers */
-                    m->workers_limit = H2MIN(m->workers_limit * 2, 
-                                             m->workers_max);
-                    m->last_limit_change = task->done_at;
-                    m->need_registration = 1;
-                    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                                  "h2_mplx(%ld): increase worker limit to %d",
-                                  m->id, m->workers_limit);
-                }
+        }
+        
+        if (stream) {
+            /* hang around until the stream deregisters */
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+                          "h2_mplx(%s): task_done, stream still open", 
+                          task->id);
+        }
+        else {
+            /* stream done, was it placed in hold? */
+            stream = h2_ihash_get(m->shold, task->stream_id);
+            if (stream) {
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+                              "h2_mplx(%s): task_done, stream in hold", 
+                              task->id);
+                stream->response = NULL; /* ref from task memory */
+                /* We cannot destroy the stream here since this is 
+                 * called from a worker thread and freeing memory pools
+                 * is only safe in the only thread using it (and its
+                 * parent pool / allocator) */
+                h2_ihash_remove(m->shold, stream->id);
+                h2_ihash_add(m->spurge, stream);
             }
-            
-            if (task->orphaned) {
+            else {
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+                              "h2_mplx(%s): task_done, stream not found", 
+                              task->id);
                 task_destroy(m, task, 0);
-                if (m->join_wait) {
-                    apr_thread_cond_signal(m->join_wait);
-                }
             }
-            else {
-                /* hang around until the stream deregisters */
+            
+            if (m->join_wait) {
+                apr_thread_cond_signal(m->join_wait);
             }
         }
     }
@@ -1180,11 +1275,13 @@ apr_status_t h2_mplx_req_engine_push(const char *ngn_type,
     task->r = r;
     
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        if (task->orphaned) {
-            status = APR_ECONNABORTED;
+        h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
+        
+        if (stream) {
+            status = h2_ngn_shed_push_task(m->ngn_shed, ngn_type, task, einit);
         }
         else {
-            status = h2_ngn_shed_push_task(m->ngn_shed, ngn_type, task, einit);
+            status = APR_ECONNABORTED;
         }
         leave_mutex(m, acquired);
     }
index a6fe12a3efce6fd4b5d0731a77dbc709a1d763bc..9b316b0b3f84516f9ecb25cb5b86d130ce2cf091 100644 (file)
@@ -73,6 +73,8 @@ struct h2_mplx {
     unsigned int need_registration : 1;
 
     struct h2_ihash_t *streams;     /* all streams currently processing */
+    struct h2_ihash_t *shold;       /* all streams done with task ongoing */
+    struct h2_ihash_t *spurge;      /* all streams done, ready for destroy */
     struct h2_iqueue *q;            /* all stream ids that need to be started */
     
     struct h2_ihash_t *tasks;       /* all tasks started and not destroyed */
@@ -167,7 +169,7 @@ apr_uint32_t h2_mplx_shutdown(h2_mplx *m);
  * @param rst_error if != 0, the stream was reset with the error given
  *
  */
-apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error);
+apr_status_t h2_mplx_stream_done(h2_mplx *m, struct h2_stream *stream);
 
 /**
  * Waits on output data from any stream in this session to become available. 
@@ -235,8 +237,7 @@ apr_status_t h2_mplx_in_update_windows(h2_mplx *m);
  * @param m the mplxer to get a response from
  * @param bb the brigade to place any existing repsonse body data into
  */
-struct h2_stream *h2_mplx_next_submit(h2_mplx *m, 
-                                      struct h2_ihash_t *streams);
+struct h2_stream *h2_mplx_next_submit(h2_mplx *m);
 
 /**
  * Opens the output for the given stream with the specified response.
index 748e32abbfd74803508942aa7459cf04a781d89e..977fab58a3d986373a6adde52aecf31d0c0636c6 100644 (file)
@@ -346,9 +346,9 @@ static int add_push(link_ctx *ctx)
                 }
                 headers = apr_table_make(ctx->pool, 5);
                 apr_table_do(set_push_header, headers, ctx->req->headers, NULL);
-                req = h2_request_createn(0, ctx->pool, method, ctx->req->scheme,
-                                         ctx->req->authority, path, headers,
-                                         ctx->req->serialize);
+                req = h2_req_createn(0, ctx->pool, method, ctx->req->scheme,
+                                     ctx->req->authority, path, headers,
+                                     ctx->req->serialize);
                 /* atm, we do not push on pushes */
                 h2_request_end_headers(req, ctx->pool, 1, 0);
                 push->req = req;
index f8c0041334eddb54cbfd15d8bb6a02f912bc7344..d213e16790f29e664e366ae1fe9fe2035041c166 100644 (file)
 #include "h2_util.h"
 
 
-h2_request *h2_request_create(int id, apr_pool_t *pool, int serialize)
-{
-    return h2_request_createn(id, pool, NULL, NULL, NULL, NULL, NULL,
-                              serialize);
-}
-
-h2_request *h2_request_createn(int id, apr_pool_t *pool,
-                               const char *method, const char *scheme,
-                               const char *authority, const char *path,
-                               apr_table_t *header, int serialize)
-{
-    h2_request *req = apr_pcalloc(pool, sizeof(h2_request));
-    
-    req->id             = id;
-    req->method         = method;
-    req->scheme         = scheme;
-    req->authority      = authority;
-    req->path           = path;
-    req->headers        = header? header : apr_table_make(pool, 10);
-    req->request_time   = apr_time_now();
-    req->serialize      = serialize;
-    
-    return req;
-}
-
 static apr_status_t inspect_clen(h2_request *req, const char *s)
 {
     char *end;
@@ -67,111 +42,28 @@ static apr_status_t inspect_clen(h2_request *req, const char *s)
     return (s == end)? APR_EINVAL : APR_SUCCESS;
 }
 
-static apr_status_t add_h1_header(h2_request *req, apr_pool_t *pool, 
-                                  const char *name, size_t nlen,
-                                  const char *value, size_t vlen)
-{
-    char *hname, *hvalue;
-    
-    if (h2_req_ignore_header(name, nlen)) {
-        return APR_SUCCESS;
-    }
-    else if (H2_HD_MATCH_LIT("cookie", name, nlen)) {
-        const char *existing = apr_table_get(req->headers, "cookie");
-        if (existing) {
-            char *nval;
-            
-            /* Cookie header come separately in HTTP/2, but need
-             * to be merged by "; " (instead of default ", ")
-             */
-            hvalue = apr_pstrndup(pool, value, vlen);
-            nval = apr_psprintf(pool, "%s; %s", existing, hvalue);
-            apr_table_setn(req->headers, "Cookie", nval);
-            return APR_SUCCESS;
-        }
-    }
-    else if (H2_HD_MATCH_LIT("host", name, nlen)) {
-        if (apr_table_get(req->headers, "Host")) {
-            return APR_SUCCESS; /* ignore duplicate */
-        }
-    }
-    
-    hname = apr_pstrndup(pool, name, nlen);
-    hvalue = apr_pstrndup(pool, value, vlen);
-    h2_util_camel_case_header(hname, nlen);
-    apr_table_mergen(req->headers, hname, hvalue);
-    
-    return APR_SUCCESS;
-}
-
-typedef struct {
-    h2_request *req;
-    apr_pool_t *pool;
-} h1_ctx;
-
-static int set_h1_header(void *ctx, const char *key, const char *value)
-{
-    h1_ctx *x = ctx;
-    size_t klen = strlen(key);
-    if (!h2_req_ignore_header(key, klen)) {
-        add_h1_header(x->req, x->pool, key, klen, value, strlen(value));
-    }
-    return 1;
-}
-
-static apr_status_t add_all_h1_header(h2_request *req, apr_pool_t *pool, 
-                                      apr_table_t *header)
-{
-    h1_ctx x;
-    x.req = req;
-    x.pool = pool;
-    apr_table_do(set_h1_header, &x, header, NULL);
-    return APR_SUCCESS;
-}
-
-
-apr_status_t h2_request_make(h2_request *req, apr_pool_t *pool,
-                             const char *method, const char *scheme, 
-                             const char *authority, const char *path, 
-                             apr_table_t *headers)
-{
-    req->method    = method;
-    req->scheme    = scheme;
-    req->authority = authority;
-    req->path      = path;
-
-    AP_DEBUG_ASSERT(req->scheme);
-    AP_DEBUG_ASSERT(req->authority);
-    AP_DEBUG_ASSERT(req->path);
-    AP_DEBUG_ASSERT(req->method);
-
-    return add_all_h1_header(req, pool, headers);
-}
-
-apr_status_t h2_request_rwrite(h2_request *req, request_rec *r)
+apr_status_t h2_request_rwrite(h2_request *req, apr_pool_t *pool, 
+                               request_rec *r)
 {
     apr_status_t status;
     const char *scheme, *authority;
     
-    scheme = (r->parsed_uri.scheme? r->parsed_uri.scheme
+    scheme = apr_pstrdup(pool, r->parsed_uri.scheme? r->parsed_uri.scheme
               : ap_http_scheme(r));
-    authority = r->hostname;
+    authority = apr_pstrdup(pool, r->hostname);
     if (!ap_strchr_c(authority, ':') && r->server && r->server->port) {
         apr_port_t defport = apr_uri_port_of_scheme(scheme);
         if (defport != r->server->port) {
             /* port info missing and port is not default for scheme: append */
-            authority = apr_psprintf(r->pool, "%s:%d", authority,
+            authority = apr_psprintf(pool, "%s:%d", authority,
                                      (int)r->server->port);
         }
     }
     
-    status = h2_request_make(req, r->pool,  r->method, scheme, authority,
-                             apr_uri_unparse(r->pool, &r->parsed_uri, 
-                                             APR_URI_UNP_OMITSITEPART),
-                             r->headers_in);
-    ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, APLOGNO(03058)
-                  "h2_request(%d): rwrite %s host=%s://%s%s",
-                  req->id, req->method, req->scheme, req->authority, req->path);
+    status = h2_req_make(req, pool, apr_pstrdup(pool, r->method), scheme, 
+                         authority, apr_uri_unparse(pool, &r->parsed_uri, 
+                                                    APR_URI_UNP_OMITSITEPART),
+                         r->headers_in);
     return status;
 }
 
@@ -223,7 +115,7 @@ apr_status_t h2_request_add_header(h2_request *req, apr_pool_t *pool,
     }
     else {
         /* non-pseudo header, append to work bucket of stream */
-        status = add_h1_header(req, pool, name, nlen, value, vlen);
+        status = h2_headers_add_h1(req->headers, pool, name, nlen, value, vlen);
     }
     
     return status;
index 4288dfec2c38f1c5bcd9084ea07017ff1406ee6e..ba48f4a15276474ddeac3b0f0301a83b6c7e3d96 100644 (file)
 
 #include "h2.h"
 
-h2_request *h2_request_create(int id, apr_pool_t *pool, int serialize);
-
-h2_request *h2_request_createn(int id, apr_pool_t *pool,
-                               const char *method, const char *scheme,
-                               const char *authority, const char *path,
-                               apr_table_t *headers, int serialize);
-
-apr_status_t h2_request_make(h2_request *req, apr_pool_t *pool,
-                             const char *method, const char *scheme, 
-                             const char *authority, const char *path, 
-                             apr_table_t *headers);
-
-apr_status_t h2_request_rwrite(h2_request *req, request_rec *r);
+apr_status_t h2_request_rwrite(h2_request *req, apr_pool_t *pool, 
+                               request_rec *r);
 
 apr_status_t h2_request_add_header(h2_request *req, apr_pool_t *pool,
                                    const char *name, size_t nlen,
index f96c50960425dc6de8a806834a25085983e6eec1..5ee45acf9e052d2afd465d443f50dc818bc74631 100644 (file)
@@ -128,19 +128,16 @@ h2_stream *h2_session_open_stream(h2_session *session, int stream_id,
     h2_stream * stream;
     apr_pool_t *stream_pool;
     
-    if (session->spare) {
-        stream_pool = session->spare;
-        session->spare = NULL;
-    }
-    else {
-        apr_pool_create(&stream_pool, session->pool);
-        apr_pool_tag(stream_pool, "h2_stream");
-    }
+    apr_pool_create(&stream_pool, session->pool);
+    apr_pool_tag(stream_pool, "h2_stream");
     
     stream = h2_stream_open(stream_id, stream_pool, session, 
                             initiated_on, req);
-    
+    ++session->open_streams;
+    ++session->unanswered_streams;
+    nghttp2_session_set_stream_user_data(session->ngh2, stream_id, stream);
     h2_ihash_add(session->streams, stream);
+    
     if (H2_STREAM_CLIENT_INITIATED(stream_id)) {
         if (stream_id > session->remote.emitted_max) {
             ++session->remote.emitted_count;
@@ -262,6 +259,11 @@ static int on_invalid_frame_recv_cb(nghttp2_session *ngh2,
     return 0;
 }
 
+static h2_stream *get_stream(h2_session *session, int stream_id)
+{
+    return nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
+}
+
 static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags,
                                  int32_t stream_id,
                                  const uint8_t *data, size_t len, void *userp)
@@ -277,7 +279,7 @@ static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags,
         return 0;
     }
     
-    stream = h2_session_get_stream(session, stream_id);
+    stream = get_stream(session, stream_id);
     if (!stream) {
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03064)
                       "h2_stream(%ld-%d): on_data_chunk for unknown stream",
@@ -313,6 +315,9 @@ static apr_status_t stream_release(h2_session *session,
                                    uint32_t error_code) 
 {
     conn_rec *c = session->c;
+    apr_bucket *b;
+    apr_status_t status;
+    
     if (!error_code) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
                       "h2_stream(%ld-%d): handled, closing", 
@@ -331,8 +336,11 @@ static apr_status_t stream_release(h2_session *session,
         h2_stream_rst(stream, error_code);
     }
     
-    return h2_conn_io_writeb(&session->io,
-                             h2_bucket_eos_create(c->bucket_alloc, stream), 0);
+    b = h2_bucket_eos_create(c->bucket_alloc, stream);
+    APR_BRIGADE_INSERT_TAIL(session->bbtmp, b);
+    status = h2_conn_io_pass(&session->io, session->bbtmp);
+    apr_brigade_cleanup(session->bbtmp);
+    return status;
 }
 
 static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id,
@@ -342,7 +350,7 @@ static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id,
     h2_stream *stream;
     
     (void)ngh2;
-    stream = h2_session_get_stream(session, stream_id);
+    stream = get_stream(session, stream_id);
     if (stream) {
         stream_release(session, stream, error_code);
     }
@@ -358,7 +366,7 @@ static int on_begin_headers_cb(nghttp2_session *ngh2,
     /* We may see HEADERs at the start of a stream or after all DATA
      * streams to carry trailers. */
     (void)ngh2;
-    s = h2_session_get_stream(session, frame->hd.stream_id);
+    s = get_stream(session, frame->hd.stream_id);
     if (s) {
         /* nop */
     }
@@ -385,7 +393,7 @@ static int on_header_cb(nghttp2_session *ngh2, const nghttp2_frame *frame,
         return 0;
     }
     
-    stream = h2_session_get_stream(session, frame->hd.stream_id);
+    stream = get_stream(session, frame->hd.stream_id);
     if (!stream) {
         ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c,
                       APLOGNO(02920) 
@@ -432,7 +440,7 @@ static int on_frame_recv_cb(nghttp2_session *ng2s,
             /* This can be HEADERS for a new stream, defining the request,
              * or HEADER may come after DATA at the end of a stream as in
              * trailers */
-            stream = h2_session_get_stream(session, frame->hd.stream_id);
+            stream = get_stream(session, frame->hd.stream_id);
             if (stream) {
                 int eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM);
                 
@@ -456,7 +464,7 @@ static int on_frame_recv_cb(nghttp2_session *ng2s,
             }
             break;
         case NGHTTP2_DATA:
-            stream = h2_session_get_stream(session, frame->hd.stream_id);
+            stream = get_stream(session, frame->hd.stream_id);
             if (stream) {
                 int eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM);
                 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
@@ -493,7 +501,7 @@ static int on_frame_recv_cb(nghttp2_session *ng2s,
                           "h2_session(%ld-%d): RST_STREAM by client, errror=%d",
                           session->id, (int)frame->hd.stream_id,
                           (int)frame->rst_stream.error_code);
-            stream = h2_session_get_stream(session, frame->hd.stream_id);
+            stream = get_stream(session, frame->hd.stream_id);
             if (stream && stream->request && stream->request->initiated_on) {
                 ++session->pushes_reset;
             }
@@ -536,13 +544,6 @@ static int on_frame_recv_cb(nghttp2_session *ng2s,
     return 0;
 }
 
-static apr_status_t pass_data(void *ctx, 
-                              const char *data, apr_off_t length)
-{
-    return h2_conn_io_write(&((h2_session*)ctx)->io, data, length);
-}
-
-
 static char immortal_zeros[H2_MAX_PADLEN];
 
 static int on_send_data_cb(nghttp2_session *ngh2, 
@@ -567,7 +568,7 @@ static int on_send_data_cb(nghttp2_session *ngh2,
     }
     padlen = (unsigned char)frame->data.padlen;
     
-    stream = h2_session_get_stream(session, stream_id);
+    stream = get_stream(session, stream_id);
     if (!stream) {
         ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_NOTFOUND, session->c,
                       APLOGNO(02924) 
@@ -580,52 +581,32 @@ static int on_send_data_cb(nghttp2_session *ngh2,
                   "h2_stream(%ld-%d): send_data_cb for %ld bytes",
                   session->id, (int)stream_id, (long)length);
                   
-    if (h2_conn_io_is_buffered(&session->io)) {
-        status = h2_conn_io_write(&session->io, (const char *)framehd, 9);
-        if (status == APR_SUCCESS) {
-            if (padlen) {
-                status = h2_conn_io_write(&session->io, (const char *)&padlen, 1);
-            }
-            
-            if (status == APR_SUCCESS) {
-                apr_off_t len = length;
-                status = h2_stream_readx(stream, pass_data, session, &len, &eos);
-                if (status == APR_SUCCESS && len != length) {
-                    status = APR_EINVAL;
-                }
-            }
-            
-            if (status == APR_SUCCESS && padlen) {
-                if (padlen) {
-                    status = h2_conn_io_write(&session->io, immortal_zeros, padlen);
-                }
-            }
-        }
+    status = h2_conn_io_write(&session->io, (const char *)framehd, 9);
+    if (padlen && status == APR_SUCCESS) {
+        status = h2_conn_io_write(&session->io, (const char *)&padlen, 1);
     }
-    else {
-        status = h2_conn_io_write(&session->io, (const char *)framehd, 9);
-        if (padlen && status == APR_SUCCESS) {
-            status = h2_conn_io_write(&session->io, (const char *)&padlen, 1);
-        }
-        if (status == APR_SUCCESS) {
-            apr_off_t len = length;
-            status = h2_stream_read_to(stream, session->io.output, &len, &eos);
-            if (status == APR_SUCCESS && len != length) {
-                status = APR_EINVAL;
-            }
-        }
-            
-        if (status == APR_SUCCESS && padlen) {
-            b = apr_bucket_immortal_create(immortal_zeros, padlen, 
-                                           session->c->bucket_alloc);
-            status = h2_conn_io_writeb(&session->io, b, 0);
+    
+    if (status == APR_SUCCESS) {
+        apr_off_t len = length;
+        status = h2_stream_read_to(stream, session->bbtmp, &len, &eos);
+        if (status == APR_SUCCESS && len != length) {
+            status = APR_EINVAL;
         }
     }
     
+    if (status == APR_SUCCESS && padlen) {
+        b = apr_bucket_immortal_create(immortal_zeros, padlen, 
+                                       session->c->bucket_alloc);
+        APR_BRIGADE_INSERT_TAIL(session->bbtmp, b);
+    }
     
+    if (status == APR_SUCCESS) {
+        status = h2_conn_io_pass(&session->io, session->bbtmp);
+    }
+        
+    apr_brigade_cleanup(session->bbtmp);
     if (status == APR_SUCCESS) {
         stream->data_frames_sent++;
-        h2_conn_io_consider_pass(&session->io);
         return 0;
     }
     else {
@@ -682,45 +663,31 @@ static apr_status_t init_callbacks(conn_rec *c, nghttp2_session_callbacks **pcb)
     return APR_SUCCESS;
 }
 
-static void h2_session_cleanup(h2_session *session)
+static void h2_session_destroy(h2_session *session)
 {
-    AP_DEBUG_ASSERT(session);
-    /* This is an early cleanup of the session that may
-     * discard what is no longer necessary for *new* streams
-     * and general HTTP/2 processing.
-     * At this point, all frames are in transit or somehwere in
-     * our buffers or passed down output filters.
-     * h2 streams might still being written out.
-     */
-    if (session->c) {
-        h2_ctx_clear(session->c);
+    AP_DEBUG_ASSERT(session);    
+
+    h2_ihash_clear(session->streams);
+    if (session->mplx) {
+        h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
+        h2_mplx_release_and_join(session->mplx, session->iowait);
+        session->mplx = NULL;
     }
+
+    ap_remove_input_filter_byhandle((session->r? session->r->input_filters :
+                                     session->c->input_filters), "H2_IN");
     if (session->ngh2) {
         nghttp2_session_del(session->ngh2);
         session->ngh2 = NULL;
     }
-    if (session->spare) {
-        apr_pool_destroy(session->spare);
-        session->spare = NULL;
+    if (session->c) {
+        h2_ctx_clear(session->c);
     }
-}
 
-static void h2_session_destroy(h2_session *session)
-{
-    AP_DEBUG_ASSERT(session);
-    
-    h2_session_cleanup(session);
-    h2_ihash_clear(session->streams);
-    
     if (APLOGctrace1(session->c)) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
                       "h2_session(%ld): destroy", session->id);
     }
-    if (session->mplx) {
-        h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
-        h2_mplx_release_and_join(session->mplx, session->iowait);
-        session->mplx = NULL;
-    }
     if (session->pool) {
         apr_pool_destroy(session->pool);
     }
@@ -901,7 +868,7 @@ static h2_session *h2_session_create_int(conn_rec *c,
                                             h2_session_receive, session);
         ap_add_input_filter("H2_IN", session->cin, r, c);
 
-        h2_conn_io_init(&session->io, c, session->config, session->pool);
+        h2_conn_io_init(&session->io, c, session->config);
         session->bbtmp = apr_brigade_create(session->pool, c->bucket_alloc);
         
         status = init_callbacks(c, &callbacks);
@@ -1138,10 +1105,8 @@ static int resume_on_data(void *ctx, void *val)
 static int h2_session_resume_streams_with_data(h2_session *session)
 {
     AP_DEBUG_ASSERT(session);
-    if (!h2_ihash_empty(session->streams)
-        && session->mplx && !session->mplx->aborted) {
+    if (session->open_streams && !session->mplx->aborted) {
         resume_ctx ctx;
-        
         ctx.session      = session;
         ctx.resume_count = 0;
 
@@ -1153,11 +1118,6 @@ static int h2_session_resume_streams_with_data(h2_session *session)
     return 0;
 }
 
-h2_stream *h2_session_get_stream(h2_session *session, int stream_id)
-{
-    return h2_ihash_get(session->streams, stream_id);
-}
-
 static ssize_t stream_data_cb(nghttp2_session *ng2s,
                               int32_t stream_id,
                               uint8_t *buf,
@@ -1183,7 +1143,7 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
     (void)ng2s;
     (void)buf;
     (void)source;
-    stream = h2_session_get_stream(session, stream_id);
+    stream = get_stream(session, stream_id);
     if (!stream) {
         ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c,
                       APLOGNO(02937) 
@@ -1334,7 +1294,7 @@ static apr_status_t submit_response(h2_session *session, h2_stream *stream)
                                        stream->id, err);
     }
     
-    stream->submitted = 1;
+    --session->unanswered_streams;
     if (stream->request && stream->request->initiated_on) {
         ++session->pushes_submitted;
     }
@@ -1384,7 +1344,6 @@ struct h2_stream *h2_session_push(h2_session *session, h2_stream *is,
             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
                           "h2_stream(%ld-%d): scheduling push stream",
                           session->id, stream->id);
-            h2_stream_cleanup(stream);
             stream = NULL;
         }
         ++session->unsent_promises;
@@ -1509,29 +1468,14 @@ apr_status_t h2_session_set_prio(h2_session *session, h2_stream *stream,
 
 apr_status_t h2_session_stream_done(h2_session *session, h2_stream *stream)
 {
-    apr_pool_t *pool = h2_stream_detach_pool(stream);
-    int stream_id = stream->id;
-    int rst_error = stream->rst_error;
-    
     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
-                  "h2_stream(%ld-%d): cleanup by EOS bucket destroy", 
-                  session->id, stream_id);
-    if (session->streams) {
-        h2_ihash_remove(session->streams, stream_id);
-    }
+                  "h2_stream(%ld-%d): EOS bucket cleanup -> done", 
+                  session->id, stream->id);
+    h2_ihash_remove(session->streams, stream->id);
+    --session->open_streams;
+    --session->unanswered_streams;
+    h2_mplx_stream_done(session->mplx, stream);
     
-    h2_stream_cleanup(stream);
-    h2_mplx_stream_done(session->mplx, stream_id, rst_error);
-    h2_stream_destroy(stream);
-    
-    if (pool) {
-        apr_pool_clear(pool);
-        if (session->spare) {
-            apr_pool_destroy(session->spare);
-        }
-        session->spare = pool;
-    }
-
     return APR_SUCCESS;
 }
 
@@ -1708,7 +1652,7 @@ static apr_status_t h2_session_submit(h2_session *session)
     
     if (has_unsubmitted_streams(session)) {
         /* If we have responses ready, submit them now. */
-        while ((stream = h2_mplx_next_submit(session->mplx, session->streams))) {
+        while ((stream = h2_mplx_next_submit(session->mplx))) {
             status = submit_response(session, stream);
             ++session->unsent_submits;
             
@@ -1770,7 +1714,7 @@ static void update_child_status(h2_session *session, int status, const char *msg
         apr_snprintf(session->status, sizeof(session->status),
                      "%s, streams: %d/%d/%d/%d/%d (open/recv/resp/push/rst)", 
                      msg? msg : "-",
-                     (int)h2_ihash_count(session->streams)
+                     (int)session->open_streams
                      (int)session->remote.emitted_count,
                      (int)session->responses_submitted,
                      (int)session->pushes_submitted,
@@ -1788,7 +1732,7 @@ static void transit(h2_session *session, const char *action, h2_session_state ns
         session->state = nstate;
         switch (session->state) {
             case H2_SESSION_ST_IDLE:
-                update_child_status(session, (h2_ihash_empty(session->streams)
+                update_child_status(session, (session->open_streams == 0
                                               SERVER_BUSY_KEEPALIVE
                                               : SERVER_BUSY_READ), "idle");
                 break;
@@ -1917,10 +1861,7 @@ static void h2_session_ev_no_io(h2_session *session, int arg, const char *msg)
              * CPU cycles. Ideally, we'd like to do a blocking read, but that
              * is not possible if we have scheduled tasks and wait
              * for them to produce something. */
-            if (h2_conn_io_flush(&session->io) != APR_SUCCESS) {
-                dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
-            }
-            if (h2_ihash_empty(session->streams)) {
+            if (!session->open_streams) {
                 if (!is_accepting_streams(session)) {
                     /* We are no longer accepting new streams and have
                      * finished processing existing ones. Time to leave. */
@@ -1944,6 +1885,10 @@ static void h2_session_ev_no_io(h2_session *session, int arg, const char *msg)
                  * new output data from task processing, 
                  * switch to blocking reads. We are probably waiting on
                  * window updates. */
+                if (h2_conn_io_flush(&session->io) != APR_SUCCESS) {
+                    dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+                    return;
+                }
                 transit(session, "no io", H2_SESSION_ST_IDLE);
                 session->idle_until = apr_time_now() + session->s->timeout;
                 session->keep_sync_until = session->idle_until;
@@ -2126,9 +2071,8 @@ apr_status_t h2_session_process(h2_session *session, int async)
                 break;
                 
             case H2_SESSION_ST_IDLE:
-                /* make certain, the client receives everything before we idle */
-                if (!session->keep_sync_until 
-                    && async && h2_ihash_empty(session->streams)
+                /* make certain, we send everything before we idle */
+                if (!session->keep_sync_until && async && !session->open_streams
                     && !session->r && session->remote.emitted_count) {
                     ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
                                   "h2_session(%ld): async idle, nonblock read", session->id);
@@ -2226,8 +2170,8 @@ apr_status_t h2_session_process(h2_session *session, int async)
                     }
                 }
                 
-                if (!h2_ihash_empty(session->streams)) {
-                    /* resume any streams for which data is available again */
+                if (session->open_streams) {
+                    /* resume any streams with output data */
                     h2_session_resume_streams_with_data(session);
                     /* Submit any responses/push_promises that are ready */
                     status = h2_session_submit(session);
@@ -2278,6 +2222,7 @@ apr_status_t h2_session_process(h2_session *session, int async)
                     session->start_wait = apr_time_now();
                     if (h2_conn_io_flush(&session->io) != APR_SUCCESS) {
                         dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+                        break;
                     }
                 }
                 else if ((apr_time_now() - session->start_wait) >= session->s->timeout) {
@@ -2303,11 +2248,15 @@ apr_status_t h2_session_process(h2_session *session, int async)
                     session->wait_us = 0;
                     dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
                 }
-                else if (status == APR_TIMEUP) {
+                else if (APR_STATUS_IS_TIMEUP(status)) {
                     /* go back to checking all inputs again */
                     transit(session, "wait cycle", session->local.accepting? 
                             H2_SESSION_ST_BUSY : H2_SESSION_ST_LOCAL_SHUTDOWN);
                 }
+                else if (APR_STATUS_IS_ECONNRESET(status) 
+                         || APR_STATUS_IS_ECONNABORTED(status)) {
+                    dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+                }
                 else {
                     ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, c,
                                   "h2_session(%ld): waiting on conditional",
@@ -2343,7 +2292,7 @@ out:
     ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
                   "h2_session(%ld): [%s] process returns", 
                   session->id, state_name(session->state));
-
+    
     if ((session->state != H2_SESSION_ST_DONE)
         && (APR_STATUS_IS_EOF(status)
             || APR_STATUS_IS_ECONNRESET(status) 
index bf4ded338a807c619b26d91b4531a7abba5b4e94..32202dc3030a76584dcd90f8ec6c068a41a3022d 100644 (file)
@@ -100,6 +100,8 @@ typedef struct h2_session {
     
     struct h2_push_diary *push_diary; /* remember pushes, avoid duplicates */
     
+    int open_streams;               /* number of streams open */
+    int unanswered_streams;         /* number of streams waiting for response */
     int unsent_submits;             /* number of submitted, but not yet written responses. */
     int unsent_promises;            /* number of submitted, but not yet written push promised */
                                          
@@ -122,8 +124,6 @@ typedef struct h2_session {
     apr_bucket_brigade *bbtmp;      /* brigade for keeping temporary data */
     struct apr_thread_cond_t *iowait; /* our cond when trywaiting for data */
     
-    apr_pool_t *spare;              /* spare stream pool */
-    
     char status[64];                /* status message for scoreboard */
     int last_status_code;           /* the one already reported */
     const char *last_status_msg;    /* the one already reported */
@@ -190,9 +190,6 @@ void h2_session_close(h2_session *session);
 apr_status_t h2_session_handle_response(h2_session *session,
                                         struct h2_stream *stream);
 
-/* Get the h2_stream for the given stream idenrtifier. */
-struct h2_stream *h2_session_get_stream(h2_session *session, int stream_id);
-
 /**
  * Create and register a new stream under the given id.
  * 
index c8635aeec3f7d5e689f8799d1fe230ec4c1643d8..dcc25da42471802e75934659150c7dcfe4c61a16 100644 (file)
@@ -53,12 +53,19 @@ static int state_transition[][7] = {
 /*CL*/{  1, 1, 0, 0, 1, 1, 1 },
 };
 
-#define H2_STREAM_OUT_LOG(lvl,s,msg) \
-    do { \
-        if (APLOG_C_IS_LEVEL((s)->session->c,lvl)) \
-        h2_util_bb_log((s)->session->c,(s)->session->id,lvl,msg,(s)->buffer); \
-    } while(0)
-    
+static void H2_STREAM_OUT_LOG(int lvl, h2_stream *s, char *tag)
+{
+    if (APLOG_C_IS_LEVEL(s->session->c, lvl)) {
+        conn_rec *c = s->session->c;
+        char buffer[4 * 1024];
+        const char *line = "(null)";
+        apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]);
+        
+        len = h2_util_bb_print(buffer, bmax, tag, "", s->buffer);
+        ap_log_cerror(APLOG_MARK, lvl, 0, c, "bb_dump(%ld-%d): %s", 
+                      c->id, s->id, len? buffer : line);
+    }
+}
 
 static int set_state(h2_stream *stream, h2_stream_state_t state)
 {
@@ -143,6 +150,30 @@ static int output_open(h2_stream *stream)
     }
 }
 
+static apr_status_t stream_pool_cleanup(void *ctx)
+{
+    h2_stream *stream = ctx;
+    apr_status_t status;
+    
+    if (stream->input) {
+        h2_beam_destroy(stream->input);
+        stream->input = NULL;
+    }
+    if (stream->files) {
+        apr_file_t *file;
+        int i;
+        for (i = 0; i < stream->files->nelts; ++i) {
+            file = APR_ARRAY_IDX(stream->files, i, apr_file_t*);
+            status = apr_file_close(file);
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, stream->session->c, 
+                          "h2_stream(%ld-%d): destroy, closed file %d", 
+                          stream->session->id, stream->id, i);
+        }
+        stream->files = NULL;
+    }
+    return APR_SUCCESS;
+}
+
 h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session,
                           int initiated_on, const h2_request *creq)
 {
@@ -162,11 +193,13 @@ h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session,
         req->initiated_on = initiated_on;
     }
     else {
-        req = h2_request_create(id, pool, 
+        req = h2_req_create(id, pool, 
                 h2_config_geti(session->config, H2_CONF_SER_HEADERS));
     }
     stream->request = req; 
     
+    apr_pool_cleanup_register(pool, stream, stream_pool_cleanup, 
+                              apr_pool_cleanup_null);
     ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03082)
                   "h2_stream(%ld-%d): opened", session->id, stream->id);
     return stream;
@@ -175,19 +208,30 @@ h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session,
 void h2_stream_cleanup(h2_stream *stream)
 {
     AP_DEBUG_ASSERT(stream);
-    if (stream->input) {
-        h2_beam_destroy(stream->input);
-        stream->input = NULL;
-    }
     if (stream->buffer) {
         apr_brigade_cleanup(stream->buffer);
     }
+    if (stream->input) {
+        apr_status_t status;
+        status = h2_beam_shutdown(stream->input, APR_NONBLOCK_READ);
+        if (status == APR_EAGAIN) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, 
+                          "h2_stream(%ld-%d): wait on input shutdown", 
+                          stream->session->id, stream->id);
+            status = h2_beam_shutdown(stream->input, APR_BLOCK_READ);
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c, 
+                          "h2_stream(%ld-%d): input shutdown returned", 
+                          stream->session->id, stream->id);
+        }
+    }
 }
 
 void h2_stream_destroy(h2_stream *stream)
 {
     AP_DEBUG_ASSERT(stream);
-    h2_stream_cleanup(stream);
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, stream->session->c, 
+                  "h2_stream(%ld-%d): destroy", 
+                  stream->session->id, stream->id);
     if (stream->pool) {
         apr_pool_destroy(stream->pool);
     }
@@ -229,9 +273,14 @@ apr_status_t h2_stream_set_request(h2_stream *stream, request_rec *r)
         return APR_ECONNRESET;
     }
     set_state(stream, H2_STREAM_ST_OPEN);
-    status = h2_request_rwrite(stream->request, r);
+    status = h2_request_rwrite(stream->request, stream->pool, r);
     stream->request->serialize = h2_config_geti(h2_config_rget(r), 
                                                 H2_CONF_SER_HEADERS);
+    ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, APLOGNO(03058)
+                  "h2_request(%d): rwrite %s host=%s://%s%s",
+                  stream->request->id, stream->request->method, 
+                  stream->request->scheme, stream->request->authority, 
+                  stream->request->path);
 
     return status;
 }
@@ -394,11 +443,43 @@ int h2_stream_is_suspended(const h2_stream *stream)
 
 static apr_status_t fill_buffer(h2_stream *stream, apr_size_t amount)
 {
+    conn_rec *c = stream->session->c;
+    apr_bucket *b;
+    apr_status_t status;
+    
     if (!stream->output) {
         return APR_EOF;
     }
-    return h2_beam_receive(stream->output, stream->buffer, 
-                           APR_NONBLOCK_READ, amount);
+    status = h2_beam_receive(stream->output, stream->buffer, 
+                             APR_NONBLOCK_READ, amount);
+    /* The buckets we reveive are using the stream->buffer pool as
+     * lifetime which is exactly what we want since this is stream->pool.
+     *
+     * However: when we send these buckets down the core output filters, the
+     * filter might decide to setaside them into a pool of its own. And it
+     * might decide, after having sent the buckets, to clear its pool.
+     *
+     * This is problematic for file buckets because it then closed the contained
+     * file. Any split off buckets we sent afterwards will result in a 
+     * APR_EBADF.
+     */
+    for (b = APR_BRIGADE_FIRST(stream->buffer);
+         b != APR_BRIGADE_SENTINEL(stream->buffer);
+         b = APR_BUCKET_NEXT(b)) {
+        if (APR_BUCKET_IS_FILE(b)) {
+            apr_bucket_file *f = (apr_bucket_file *)b->data;
+            apr_pool_t *fpool = apr_file_pool_get(f->fd);
+            if (fpool != c->pool) {
+                apr_bucket_setaside(b, c->pool);
+                if (!stream->files) {
+                    stream->files = apr_array_make(stream->pool, 
+                                                   5, sizeof(apr_file_t*));
+                }
+                APR_ARRAY_PUSH(stream->files, apr_file_t*) = f->fd;
+            }
+        }
+    }
+    return status;
 }
 
 apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response,
@@ -429,12 +510,14 @@ apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response,
     return status;
 }
 
+static const apr_size_t DATA_CHUNK_SIZE = ((16*1024) - 100 - 9); 
+
 apr_status_t h2_stream_out_prepare(h2_stream *stream,
                                    apr_off_t *plen, int *peos)
 {
     conn_rec *c = stream->session->c;
     apr_status_t status = APR_SUCCESS;
-    apr_off_t requested = (*plen > 0)? *plen : 32*1024;
+    apr_off_t requested;
 
     if (stream->rst_error) {
         *plen = 0;
@@ -442,11 +525,19 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream,
         return APR_ECONNRESET;
     }
 
+    if (*plen > 0) {
+        requested = H2MIN(*plen, DATA_CHUNK_SIZE);
+    }
+    else {
+        requested = DATA_CHUNK_SIZE;
+    }
+    *plen = requested;
+    
     H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "h2_stream_out_prepare_pre");
     h2_util_bb_avail(stream->buffer, plen, peos);
-    if (!*peos && !*plen) {
+    if (!*peos && *plen < requested) {
         /* try to get more data */
-        status = fill_buffer(stream, H2MIN(requested, 32*1024));
+        status = fill_buffer(stream, (requested - *plen) + DATA_CHUNK_SIZE);
         if (APR_STATUS_IS_EOF(status)) {
             apr_bucket *eos = apr_bucket_eos_create(c->bucket_alloc);
             APR_BRIGADE_INSERT_TAIL(stream->buffer, eos);
@@ -467,27 +558,6 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream,
 }
 
 
-apr_status_t h2_stream_readx(h2_stream *stream, 
-                             h2_io_data_cb *cb, void *ctx,
-                             apr_off_t *plen, int *peos)
-{
-    conn_rec *c = stream->session->c;
-    apr_status_t status = APR_SUCCESS;
-
-    if (stream->rst_error) {
-        return APR_ECONNRESET;
-    }
-    status = h2_util_bb_readx(stream->buffer, cb, ctx, plen, peos);
-    if (status == APR_SUCCESS && !*peos && !*plen) {
-        status = APR_EAGAIN;
-    }
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c,
-                  "h2_stream(%ld-%d): readx, len=%ld eos=%d",
-                  c->id, stream->id, (long)*plen, *peos);
-    return status;
-}
-
-
 apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb, 
                                apr_off_t *plen, int *peos)
 {
index 8ae600c78a49a3f5cd2a2605bda14f89523854d9..33f28f6eab710840156c8ea5a9228d41fb09c93b 100644 (file)
@@ -54,6 +54,7 @@ struct h2_stream {
     struct h2_bucket_beam *output;
     apr_bucket_brigade *buffer;
     apr_bucket_brigade *tmp;
+    apr_array_header_t *files;  /* apr_file_t* we collected during I/O */
 
     int rst_error;              /* stream error for RST_STREAM */
     unsigned int aborted   : 1; /* was aborted */
@@ -62,7 +63,6 @@ struct h2_stream {
     unsigned int submitted : 1; /* response HEADER has been sent */
     
     apr_off_t input_remaining;  /* remaining bytes on input as advertised via content-length */
-
     apr_off_t data_frames_sent; /* # of DATA frames sent out for this stream */
 };
 
@@ -204,23 +204,6 @@ apr_status_t h2_stream_set_response(h2_stream *stream,
 apr_status_t h2_stream_out_prepare(h2_stream *stream, 
                                    apr_off_t *plen, int *peos);
 
-/**
- * Read data from the stream output.
- * 
- * @param stream the stream to read from
- * @param cb callback to invoke for byte chunks read. Might be invoked
- *        multiple times (with different values) for one read operation.
- * @param ctx context data for callback
- * @param plen (in-/out) max. number of bytes to read and on return actual
- *        number of bytes read
- * @param peos (out) != 0 iff end of stream has been reached while reading
- * @return APR_SUCCESS if out information was computed successfully.
- *         APR_EAGAIN if not data is available and end of stream has not been
- *         reached yet.
- */
-apr_status_t h2_stream_readx(h2_stream *stream, h2_io_data_cb *cb, 
-                             void *ctx, apr_off_t *plen, int *peos);
-
 /**
  * Read a maximum number of bytes into the bucket brigade.
  * 
index 93bb479ea6afed7f29de155b5bd31ad7980a8b65..92029d894d6d15515664f11fa3bf66c3eb987994 100644 (file)
@@ -485,6 +485,9 @@ void h2_task_rst(h2_task *task, int error)
     if (task->output.beam) {
         h2_beam_abort(task->output.beam);
     }
+    if (task->c) {
+        task->c->aborted = 1;
+    }
 }
 
 apr_status_t h2_task_shutdown(h2_task *task, int block)
index 58b64b0a1c6f2cfddc77e29f1bcfc4c4aed0ba33..454bc376fe1bea7ccd9cf81dcdcecd5f384eb5a4 100644 (file)
@@ -83,7 +83,6 @@ struct h2_task {
     unsigned int frozen         : 1;
     unsigned int blocking       : 1;
     unsigned int detached       : 1;
-    unsigned int orphaned       : 1; /* h2_stream is gone for this task */    
     unsigned int submitted      : 1; /* response has been submitted to client */
     unsigned int worker_started : 1; /* h2_worker started processing for this io */
     unsigned int worker_done    : 1; /* h2_worker finished for this io */
index 648305247a4dbc6cde09eeb72c59f078473ea5dc..e6fe45965f54faed457e097731b8c554cac09e3f 100644 (file)
@@ -23,8 +23,7 @@
 
 #include <nghttp2/nghttp2.h>
 
-#include "h2_private.h"
-#include "h2_request.h"
+#include "h2.h"
 #include "h2_util.h"
 
 /* h2_log2(n) iff n is a power of 2 */
@@ -1036,19 +1035,6 @@ apr_size_t h2_util_bb_print(char *buffer, apr_size_t bmax,
     return off;
 }
 
-void h2_util_bb_log(conn_rec *c, int stream_id, int level, 
-                    const char *tag, apr_bucket_brigade *bb)
-{
-    char buffer[4 * 1024];
-    const char *line = "(null)";
-    apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]);
-    
-    len = h2_util_bb_print(buffer, bmax, tag, "", bb);
-    /* Intentional no APLOGNO */
-    ap_log_cerror(APLOG_MARK, level, 0, c, "bb_dump(%ld-%d): %s", 
-                  c->id, stream_id, len? buffer : line);
-}
-
 apr_status_t h2_append_brigade(apr_bucket_brigade *to,
                                apr_bucket_brigade *from, 
                                apr_off_t *plen,
@@ -1066,6 +1052,8 @@ apr_status_t h2_append_brigade(apr_bucket_brigade *to,
         if (APR_BUCKET_IS_METADATA(e)) {
             if (APR_BUCKET_IS_EOS(e)) {
                 *peos = 1;
+                apr_bucket_delete(e);
+                continue;
             }
         }
         else {        
@@ -1313,6 +1301,107 @@ int h2_proxy_res_ignore_header(const char *name, size_t len)
             || ignore_header(H2_LIT_ARGS(IgnoredProxyRespHds), name, len));
 }
 
+apr_status_t h2_headers_add_h1(apr_table_t *headers, apr_pool_t *pool, 
+                               const char *name, size_t nlen,
+                               const char *value, size_t vlen)
+{
+    char *hname, *hvalue;
+    
+    if (h2_req_ignore_header(name, nlen)) {
+        return APR_SUCCESS;
+    }
+    else if (H2_HD_MATCH_LIT("cookie", name, nlen)) {
+        const char *existing = apr_table_get(headers, "cookie");
+        if (existing) {
+            char *nval;
+            
+            /* Cookie header come separately in HTTP/2, but need
+             * to be merged by "; " (instead of default ", ")
+             */
+            hvalue = apr_pstrndup(pool, value, vlen);
+            nval = apr_psprintf(pool, "%s; %s", existing, hvalue);
+            apr_table_setn(headers, "Cookie", nval);
+            return APR_SUCCESS;
+        }
+    }
+    else if (H2_HD_MATCH_LIT("host", name, nlen)) {
+        if (apr_table_get(headers, "Host")) {
+            return APR_SUCCESS; /* ignore duplicate */
+        }
+    }
+    
+    hname = apr_pstrndup(pool, name, nlen);
+    hvalue = apr_pstrndup(pool, value, vlen);
+    h2_util_camel_case_header(hname, nlen);
+    apr_table_mergen(headers, hname, hvalue);
+    
+    return APR_SUCCESS;
+}
+
+/*******************************************************************************
+ * h2 request handling
+ ******************************************************************************/
+
+h2_request *h2_req_createn(int id, apr_pool_t *pool, const char *method, 
+                           const char *scheme, const char *authority, 
+                           const char *path, apr_table_t *header, int serialize)
+{
+    h2_request *req = apr_pcalloc(pool, sizeof(h2_request));
+    
+    req->id             = id;
+    req->method         = method;
+    req->scheme         = scheme;
+    req->authority      = authority;
+    req->path           = path;
+    req->headers        = header? header : apr_table_make(pool, 10);
+    req->request_time   = apr_time_now();
+    req->serialize      = serialize;
+    
+    return req;
+}
+
+h2_request *h2_req_create(int id, apr_pool_t *pool, int serialize)
+{
+    return h2_req_createn(id, pool, NULL, NULL, NULL, NULL, NULL, serialize);
+}
+
+typedef struct {
+    apr_table_t *headers;
+    apr_pool_t *pool;
+} h1_ctx;
+
+static int set_h1_header(void *ctx, const char *key, const char *value)
+{
+    h1_ctx *x = ctx;
+    size_t klen = strlen(key);
+    if (!h2_req_ignore_header(key, klen)) {
+        h2_headers_add_h1(x->headers, x->pool, key, klen, value, strlen(value));
+    }
+    return 1;
+}
+
+apr_status_t h2_req_make(h2_request *req, apr_pool_t *pool,
+                         const char *method, const char *scheme, 
+                         const char *authority, const char *path, 
+                         apr_table_t *headers)
+{
+    h1_ctx x;
+
+    req->method    = method;
+    req->scheme    = scheme;
+    req->authority = authority;
+    req->path      = path;
+
+    AP_DEBUG_ASSERT(req->scheme);
+    AP_DEBUG_ASSERT(req->authority);
+    AP_DEBUG_ASSERT(req->path);
+    AP_DEBUG_ASSERT(req->method);
+
+    x.pool = pool;
+    x.headers = req->headers;
+    apr_table_do(set_h1_header, &x, headers, NULL);
+    return APR_SUCCESS;
+}
 
 /*******************************************************************************
  * frame logging
index 8e7e2795f1f9aca305152f80955b7c5cc72b27aa..56614766c387f839a8b062a386c0e3b70fdf9d0d 100644 (file)
@@ -276,6 +276,25 @@ h2_ngheader *h2_util_ngheader_make_res(apr_pool_t *p,
 h2_ngheader *h2_util_ngheader_make_req(apr_pool_t *p, 
                                        const struct h2_request *req);
 
+apr_status_t h2_headers_add_h1(apr_table_t *headers, apr_pool_t *pool, 
+                               const char *name, size_t nlen,
+                               const char *value, size_t vlen);
+
+/*******************************************************************************
+ * h2_request helpers
+ ******************************************************************************/
+
+struct h2_request *h2_req_createn(int id, apr_pool_t *pool, const char *method, 
+                                  const char *scheme, const char *authority, 
+                                  const char *path, apr_table_t *header,
+                                  int serialize);
+struct h2_request *h2_req_create(int id, apr_pool_t *pool, int serialize);
+
+apr_status_t h2_req_make(struct h2_request *req, apr_pool_t *pool,
+                         const char *method, const char *scheme, 
+                         const char *authority, const char *path, 
+                         apr_table_t *headers);
+
 /*******************************************************************************
  * apr brigade helpers
  ******************************************************************************/
@@ -357,8 +376,16 @@ apr_size_t h2_util_bb_print(char *buffer, apr_size_t bmax,
  * @param tag a short message text about the context
  * @param bb the brigade to log
  */
-void h2_util_bb_log(conn_rec *c, int stream_id, int level, 
-                    const char *tag, apr_bucket_brigade *bb);
+#define h2_util_bb_log(c, i, level, tag, bb) \
+do { \
+    char buffer[4 * 1024]; \
+    const char *line = "(null)"; \
+    apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]); \
+    len = h2_util_bb_print(buffer, bmax, (tag), "", (bb)); \
+    ap_log_cerror(APLOG_MARK, level, 0, (c), "bb_dump(%ld-%d): %s", \
+        (c)->id, (int)(i), (len? buffer : line)); \
+} while(0)
+
 
 /**
  * Transfer buckets from one brigade to another with a limit on the 
index 30c0acf4e91fd0454712200c0f56da3560a62ba5..13cd3df244a7ea9879c57869813f7c1bb6278eec 100644 (file)
@@ -26,7 +26,7 @@
  * @macro
  * Version number of the http2 module as c string
  */
-#define MOD_HTTP2_VERSION "1.5.2"
+#define MOD_HTTP2_VERSION "1.5.3"
 
 /**
  * @macro
@@ -34,7 +34,7 @@
  * release. This is a 24 bit number with 8 bits for major number, 8 bits
  * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
  */
-#define MOD_HTTP2_VERSION_NUM 0x010502
+#define MOD_HTTP2_VERSION_NUM 0x010503
 
 
 #endif /* mod_h2_h2_version_h */