]> granicus.if.org Git - apache/commitdiff
mod_http2: fix for partial file buckets in master connection output, flushing of...
authorStefan Eissing <icing@apache.org>
Wed, 4 May 2016 12:32:05 +0000 (12:32 +0000)
committerStefan Eissing <icing@apache.org>
Wed, 4 May 2016 12:32:05 +0000 (12:32 +0000)
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1742260 13f79535-47bb-0310-9956-ffa450edef68

13 files changed:
CHANGES
modules/http2/h2_bucket_beam.c
modules/http2/h2_conn_io.c
modules/http2/h2_conn_io.h
modules/http2/h2_h2.c
modules/http2/h2_mplx.c
modules/http2/h2_private.h
modules/http2/h2_session.c
modules/http2/h2_stream.c
modules/http2/h2_stream.h
modules/http2/h2_task.c
modules/http2/h2_util.c
modules/http2/mod_proxy_http2.c

diff --git a/CHANGES b/CHANGES
index e05d90d8904053b953b1bece083731a40e16d587..4f6d5df97d8ce856d92dc4c130abbe51ae3d94ff 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -1,6 +1,13 @@
                                                          -*- coding: utf-8 -*-
 Changes with Apache 2.5.0
 
+  *) mod_http2: merge of some 2.4.x adaptions re filters on slave connections.
+     Small fixes in bucket beams when forwarding file buckets. Output handling
+     on master connection uses less FLUSH and passes automatically when more
+     than half of H2StreamMaxMemSize bytes have accumulated.
+     Workaround for http: when forwarding partial file buckets to keep the
+     output filter from closing these too early.
+     
   *) mod_http2: elimination of fixed master connectin buffer for TLS 
      connections. New scratch bucket handling optimized for TLS write sizes. 
      File bucket data read directly into scratch buffers, avoiding one
index f540b61ee9b3fcbe1d5dba832d8d46f8986b8c6f..65f9906a10bace775d6a073cd4ce4c2590789f3e 100644 (file)
@@ -204,8 +204,12 @@ apr_size_t h2_util_bl_print(char *buffer, apr_size_t bmax,
 
 static apr_status_t enter_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl)
 {
-    if (beam->m_enter) {
-        return beam->m_enter(beam->m_ctx, pbl);
+    h2_beam_mutex_enter *enter = beam->m_enter;
+    if (enter) {
+        void *ctx = beam->m_ctx;
+        if (ctx) {
+            return enter(ctx, pbl);
+        }
     }
     pbl->mutex = NULL;
     pbl->leave = NULL;
@@ -787,6 +791,10 @@ transfer:
 #endif
                 remain -= bred->length;
                 ++transferred;
+                APR_BUCKET_REMOVE(bred);
+                H2_BLIST_INSERT_TAIL(&beam->hold, bred);
+                ++transferred;
+                continue;
             }
             else {
                 /* create a "green" standin bucket. we took care about the
index c1120740bfdf3085893a5668ebb28cfe249edd19..fb679ad3decac0c2400572840bd7727b56134330 100644 (file)
@@ -45,7 +45,6 @@
  * which seems to create less TCP packets overall
  */
 #define WRITE_SIZE_MAX        (TLS_DATA_MAX - 100) 
-#define WRITE_BUFFER_SIZE     (5*WRITE_SIZE_MAX)
 
 
 static void h2_conn_io_bb_log(conn_rec *c, int stream_id, int level, 
@@ -133,6 +132,7 @@ apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c,
     io->output        = apr_brigade_create(c->pool, c->bucket_alloc);
     io->is_tls        = h2_h2_is_tls(c);
     io->buffer_output = io->is_tls;
+    io->pass_threshold = h2_config_geti64(cfg, H2_CONF_STREAM_MAX_MEM) / 2;
     
     if (io->is_tls) {
         /* This is what we start with, 
@@ -247,44 +247,6 @@ static apr_status_t read_to_scratch(h2_conn_io *io, apr_bucket *b)
     return status;
 }
 
-int h2_conn_io_is_buffered(h2_conn_io *io)
-{
-    return io->buffer_output;
-}
-
-typedef struct {
-    conn_rec *c;
-    h2_conn_io *io;
-} pass_out_ctx;
-
-static apr_status_t pass_out(apr_bucket_brigade *bb, void *ctx) 
-{
-    pass_out_ctx *pctx = ctx;
-    conn_rec *c = pctx->c;
-    apr_status_t status;
-    apr_off_t bblen;
-    
-    if (APR_BRIGADE_EMPTY(bb)) {
-        return APR_SUCCESS;
-    }
-    
-    ap_update_child_status(c->sbh, SERVER_BUSY_WRITE, NULL);
-    apr_brigade_length(bb, 0, &bblen);
-    h2_conn_io_bb_log(c, 0, APLOG_TRACE2, "master conn pass", bb);
-    status = ap_pass_brigade(c->output_filters, bb);
-    if (status == APR_SUCCESS && pctx->io) {
-        pctx->io->bytes_written += (apr_size_t)bblen;
-        pctx->io->last_write = apr_time_now();
-    }
-    if (status != APR_SUCCESS) {
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c, APLOGNO(03044)
-                      "h2_conn_io(%ld): pass_out brigade %ld bytes",
-                      c->id, (long)bblen);
-    }
-    apr_brigade_cleanup(bb);
-    return status;
-}
-
 static void check_write_size(h2_conn_io *io) 
 {
     if (io->write_size > WRITE_SIZE_INITIAL 
@@ -307,53 +269,58 @@ static void check_write_size(h2_conn_io *io)
     }
 }
 
-static apr_status_t h2_conn_io_flush_int(h2_conn_io *io, int flush, int eoc)
+static apr_status_t pass_output(h2_conn_io *io, int flush, int eoc)
 {
-    pass_out_ctx ctx;
+    conn_rec *c = io->c;
     apr_bucket *b;
+    apr_off_t bblen;
+    apr_status_t status;
     
     append_scratch(io);
-    if (APR_BRIGADE_EMPTY(io->output)) {
-        return APR_SUCCESS;
-    }
-    
     if (flush) {
-        b = apr_bucket_flush_create(io->c->bucket_alloc);
+        b = apr_bucket_flush_create(c->bucket_alloc);
         APR_BRIGADE_INSERT_TAIL(io->output, b);
     }
     
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c, "h2_conn_io: flush");
-    ctx.c = io->c;
-    ctx.io = eoc? NULL : io;
+    if (APR_BRIGADE_EMPTY(io->output)) {
+        return APR_SUCCESS;
+    }
+    
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, c, "h2_conn_io: pass_output");
+    ap_update_child_status(c->sbh, SERVER_BUSY_WRITE, NULL);
+    apr_brigade_length(io->output, 0, &bblen);
     
-    return pass_out(io->output, &ctx);
-    /* no more access after this, as we might have flushed an EOC bucket
+    h2_conn_io_bb_log(c, 0, APLOG_TRACE2, "master conn pass", io->output);
+    status = ap_pass_brigade(c->output_filters, io->output);
+
+    /* careful with access after this, as we might have flushed an EOC bucket
      * that de-allocated us all. */
+    if (!eoc) {
+        apr_brigade_cleanup(io->output);
+        if (status == APR_SUCCESS) {
+            io->bytes_written += (apr_size_t)bblen;
+            io->last_write = apr_time_now();
+        }
+    }
+    
+    if (status != APR_SUCCESS) {
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c, APLOGNO(03044)
+                      "h2_conn_io(%ld): pass_out brigade %ld bytes",
+                      c->id, (long)bblen);
+    }
+    return status;
 }
 
 apr_status_t h2_conn_io_flush(h2_conn_io *io)
 {
-    return h2_conn_io_flush_int(io, 1, 0);
-}
-
-apr_status_t h2_conn_io_consider_pass(h2_conn_io *io)
-{
-    apr_off_t len = 0;
-    
-    if (!APR_BRIGADE_EMPTY(io->output)) {
-        len = h2_brigade_mem_size(io->output);
-        if (len >= WRITE_BUFFER_SIZE) {
-            return h2_conn_io_flush_int(io, 1, 0);
-        }
-    }
-    return APR_SUCCESS;
+    return pass_output(io, 1, 0);
 }
 
 apr_status_t h2_conn_io_write_eoc(h2_conn_io *io, h2_session *session)
 {
     apr_bucket *b = h2_bucket_eoc_create(io->c->bucket_alloc, session);
     APR_BRIGADE_INSERT_TAIL(io->output, b);
-    return h2_conn_io_flush_int(io, 1, 1);
+    return pass_output(io, 1, 1);
 }
 
 apr_status_t h2_conn_io_write(h2_conn_io *io, const char *data, size_t length)
@@ -408,10 +375,6 @@ apr_status_t h2_conn_io_pass(h2_conn_io *io, apr_bucket_brigade *bb)
             append_scratch(io);
             APR_BUCKET_REMOVE(b);
             APR_BRIGADE_INSERT_TAIL(io->output, b);
-            
-            if (APR_BUCKET_IS_FLUSH(b)) {
-                status = h2_conn_io_flush_int(io, 0, 0);
-            }
         }
         else if (io->buffer_output) {
             apr_size_t remain = assure_scratch_space(io);
@@ -445,8 +408,14 @@ apr_status_t h2_conn_io_pass(h2_conn_io *io, apr_bucket_brigade *bb)
             APR_BRIGADE_INSERT_TAIL(io->output, b);
         }
     }
+    
     if (status == APR_SUCCESS) {
-        return h2_conn_io_consider_pass(io);
+        if (!APR_BRIGADE_EMPTY(io->output)) {
+            apr_off_t len = h2_brigade_mem_size(io->output);
+            if (len >= io->pass_threshold) {
+                return pass_output(io, 0, 0);
+            }
+        }
     }
     return status;
 }
index f1d877a3f634df5f079418ff7d8cf434e1a802e2..4ccf007086e483be8f7855dc18f51c1715f3c0e6 100644 (file)
@@ -39,6 +39,8 @@ typedef struct {
     apr_int64_t bytes_written;
     
     int buffer_output;
+    apr_size_t pass_threshold;
+    
     char *scratch;
     apr_size_t ssize;
     apr_size_t slen;
@@ -47,8 +49,6 @@ typedef struct {
 apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c, 
                              const struct h2_config *cfg);
 
-int h2_conn_io_is_buffered(h2_conn_io *io);
-
 /**
  * Append data to the buffered output.
  * @param buf the data to append
@@ -73,11 +73,4 @@ apr_status_t h2_conn_io_write_eoc(h2_conn_io *io, struct h2_session *session);
  */
 apr_status_t h2_conn_io_flush(h2_conn_io *io);
 
-/**
- * Check the amount of buffered output and pass it on if enough has accumulated.
- * @param io the connection io
- * @param flush if a flush bucket should be appended to any output
- */
-apr_status_t h2_conn_io_consider_pass(h2_conn_io *io);
-
 #endif /* defined(__mod_h2__h2_conn_io__) */
index bc9e261b5260f25657a46ca63e45c19ecba4485c..5329f3171a50319365307b6dcdd3b18178a46b17 100644 (file)
@@ -56,6 +56,7 @@ const char *H2_MAGIC_TOKEN = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
 /*******************************************************************************
  * The optional mod_ssl functions we need. 
  */
+static APR_OPTIONAL_FN_TYPE(ssl_engine_disable) *opt_ssl_engine_disable;
 static APR_OPTIONAL_FN_TYPE(ssl_is_https) *opt_ssl_is_https;
 static APR_OPTIONAL_FN_TYPE(ssl_var_lookup) *opt_ssl_var_lookup;
 
@@ -440,6 +441,7 @@ apr_status_t h2_h2_init(apr_pool_t *pool, server_rec *s)
 {
     (void)pool;
     ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, s, "h2_h2, child_init");
+    opt_ssl_engine_disable = APR_RETRIEVE_OPTIONAL_FN(ssl_engine_disable);
     opt_ssl_is_https = APR_RETRIEVE_OPTIONAL_FN(ssl_is_https);
     opt_ssl_var_lookup = APR_RETRIEVE_OPTIONAL_FN(ssl_var_lookup);
     
index 9c8498e62e615a0042aab0e1a80d10fc1f5ab1b0..3ae02f4fd2ebdf56319403be47670aad4372c7cf 100644 (file)
@@ -90,6 +90,7 @@ static apr_status_t enter_mutex(h2_mplx *m, int *pacquired)
      * This allow recursive entering of the mutex from the saem thread,
      * which is what we need in certain situations involving callbacks
      */
+    AP_DEBUG_ASSERT(m);
     apr_threadkey_private_get(&mutex, thread_lock);
     if (mutex == m->lock) {
         *pacquired = 0;
@@ -342,6 +343,8 @@ static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master)
     int reuse_slave = 0;
     apr_status_t status;
     
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, 
+                  "h2_task(%s): destroy", task->id);
     /* cleanup any buffered input */
     status = h2_task_shutdown(task, 0);
     if (status != APR_SUCCESS){
@@ -393,6 +396,8 @@ static void stream_done(h2_mplx *m, h2_stream *stream, int rst_error)
 {
     h2_task *task;
     
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, 
+                  "h2_stream(%ld-%d): done", m->c->id, stream->id);
     /* Situation: we are, on the master connection, done with processing
      * the stream. Either we have handled it successfully, or the stream
      * was reset by the client or the connection is gone and we are 
@@ -890,6 +895,7 @@ static h2_task *pop_task(h2_mplx *m)
         stream = h2_ihash_get(m->streams, sid);
         if (stream) {
             conn_rec *slave, **pslave;
+            int new_conn = 0;
 
             pslave = (conn_rec **)apr_array_pop(m->spare_slaves);
             if (pslave) {
@@ -897,16 +903,19 @@ static h2_task *pop_task(h2_mplx *m)
             }
             else {
                 slave = h2_slave_create(m->c, m->pool, NULL);
-                h2_slave_run_pre_connection(slave, ap_get_conn_socket(slave));
+                new_conn = 1;
             }
             
             slave->sbh = m->c->sbh;
+            slave->aborted = 0;
             task = h2_task_create(slave, stream->request, stream->input, m);
             h2_ihash_add(m->tasks, task);
             
             m->c->keepalives++;
             apr_table_setn(slave->notes, H2_TASK_ID_NOTE, task->id);
-            
+            if (new_conn) {
+                h2_slave_run_pre_connection(slave, ap_get_conn_socket(slave));
+            }
             task->worker_started = 1;
             task->started_at = apr_time_now();
             if (sid > m->max_stream_started) {
index 39d70512b89eee21b351b893a16543950bef27d4..b68613692da8976bac1a655e87a30c423abc10fc 100644 (file)
 
 #include <nghttp2/nghttp2.h>
 
-#ifdef IS_MOD_PROXY_HTTP2
-extern module AP_MODULE_DECLARE_DATA proxy_http2_module;
-APLOG_USE_MODULE(proxy_http2);
-#else
 extern module AP_MODULE_DECLARE_DATA http2_module;
+
 APLOG_USE_MODULE(http2);
-#endif 
 
 #endif
index aa6260749699277a48e0941ab61a45526034568e..79b3fbc563a578f63a05497224dc5c10eccdc224 100644 (file)
@@ -607,7 +607,6 @@ static int on_send_data_cb(nghttp2_session *ngh2,
     apr_brigade_cleanup(session->bbtmp);
     if (status == APR_SUCCESS) {
         stream->data_frames_sent++;
-        h2_conn_io_consider_pass(&session->io);
         return 0;
     }
     else {
@@ -1862,9 +1861,6 @@ static void h2_session_ev_no_io(h2_session *session, int arg, const char *msg)
              * CPU cycles. Ideally, we'd like to do a blocking read, but that
              * is not possible if we have scheduled tasks and wait
              * for them to produce something. */
-            if (h2_conn_io_flush(&session->io) != APR_SUCCESS) {
-                dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
-            }
             if (!session->open_streams) {
                 if (!is_accepting_streams(session)) {
                     /* We are no longer accepting new streams and have
@@ -1889,6 +1885,10 @@ static void h2_session_ev_no_io(h2_session *session, int arg, const char *msg)
                  * new output data from task processing, 
                  * switch to blocking reads. We are probably waiting on
                  * window updates. */
+                if (h2_conn_io_flush(&session->io) != APR_SUCCESS) {
+                    dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+                    return;
+                }
                 transit(session, "no io", H2_SESSION_ST_IDLE);
                 session->idle_until = apr_time_now() + session->s->timeout;
                 session->keep_sync_until = session->idle_until;
@@ -2221,6 +2221,7 @@ apr_status_t h2_session_process(h2_session *session, int async)
                     session->start_wait = apr_time_now();
                     if (h2_conn_io_flush(&session->io) != APR_SUCCESS) {
                         dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+                        break;
                     }
                 }
                 else if ((apr_time_now() - session->start_wait) >= session->s->timeout) {
@@ -2246,11 +2247,15 @@ apr_status_t h2_session_process(h2_session *session, int async)
                     session->wait_us = 0;
                     dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
                 }
-                else if (status == APR_TIMEUP) {
+                else if (APR_STATUS_IS_TIMEUP(status)) {
                     /* go back to checking all inputs again */
                     transit(session, "wait cycle", session->local.accepting? 
                             H2_SESSION_ST_BUSY : H2_SESSION_ST_LOCAL_SHUTDOWN);
                 }
+                else if (APR_STATUS_IS_ECONNRESET(status) 
+                         || APR_STATUS_IS_ECONNABORTED(status)) {
+                    dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+                }
                 else {
                     ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, c,
                                   "h2_session(%ld): waiting on conditional",
@@ -2286,7 +2291,7 @@ out:
     ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
                   "h2_session(%ld): [%s] process returns", 
                   session->id, state_name(session->state));
-
+    
     if ((session->state != H2_SESSION_ST_DONE)
         && (APR_STATUS_IS_EOF(status)
             || APR_STATUS_IS_ECONNRESET(status) 
index b445da768b3eab148d6f5ec37f1490d77cd5326d..dcc25da42471802e75934659150c7dcfe4c61a16 100644 (file)
@@ -150,6 +150,30 @@ static int output_open(h2_stream *stream)
     }
 }
 
+static apr_status_t stream_pool_cleanup(void *ctx)
+{
+    h2_stream *stream = ctx;
+    apr_status_t status;
+    
+    if (stream->input) {
+        h2_beam_destroy(stream->input);
+        stream->input = NULL;
+    }
+    if (stream->files) {
+        apr_file_t *file;
+        int i;
+        for (i = 0; i < stream->files->nelts; ++i) {
+            file = APR_ARRAY_IDX(stream->files, i, apr_file_t*);
+            status = apr_file_close(file);
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, stream->session->c, 
+                          "h2_stream(%ld-%d): destroy, closed file %d", 
+                          stream->session->id, stream->id, i);
+        }
+        stream->files = NULL;
+    }
+    return APR_SUCCESS;
+}
+
 h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session,
                           int initiated_on, const h2_request *creq)
 {
@@ -174,6 +198,8 @@ h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session,
     }
     stream->request = req; 
     
+    apr_pool_cleanup_register(pool, stream, stream_pool_cleanup, 
+                              apr_pool_cleanup_null);
     ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03082)
                   "h2_stream(%ld-%d): opened", session->id, stream->id);
     return stream;
@@ -203,13 +229,9 @@ void h2_stream_cleanup(h2_stream *stream)
 void h2_stream_destroy(h2_stream *stream)
 {
     AP_DEBUG_ASSERT(stream);
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, 
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, stream->session->c, 
                   "h2_stream(%ld-%d): destroy", 
                   stream->session->id, stream->id);
-    if (stream->input) {
-        h2_beam_destroy(stream->input);
-        stream->input = NULL;
-    }
     if (stream->pool) {
         apr_pool_destroy(stream->pool);
     }
@@ -421,11 +443,43 @@ int h2_stream_is_suspended(const h2_stream *stream)
 
 static apr_status_t fill_buffer(h2_stream *stream, apr_size_t amount)
 {
+    conn_rec *c = stream->session->c;
+    apr_bucket *b;
+    apr_status_t status;
+    
     if (!stream->output) {
         return APR_EOF;
     }
-    return h2_beam_receive(stream->output, stream->buffer, 
-                           APR_NONBLOCK_READ, amount);
+    status = h2_beam_receive(stream->output, stream->buffer, 
+                             APR_NONBLOCK_READ, amount);
+    /* The buckets we reveive are using the stream->buffer pool as
+     * lifetime which is exactly what we want since this is stream->pool.
+     *
+     * However: when we send these buckets down the core output filters, the
+     * filter might decide to setaside them into a pool of its own. And it
+     * might decide, after having sent the buckets, to clear its pool.
+     *
+     * This is problematic for file buckets because it then closed the contained
+     * file. Any split off buckets we sent afterwards will result in a 
+     * APR_EBADF.
+     */
+    for (b = APR_BRIGADE_FIRST(stream->buffer);
+         b != APR_BRIGADE_SENTINEL(stream->buffer);
+         b = APR_BUCKET_NEXT(b)) {
+        if (APR_BUCKET_IS_FILE(b)) {
+            apr_bucket_file *f = (apr_bucket_file *)b->data;
+            apr_pool_t *fpool = apr_file_pool_get(f->fd);
+            if (fpool != c->pool) {
+                apr_bucket_setaside(b, c->pool);
+                if (!stream->files) {
+                    stream->files = apr_array_make(stream->pool, 
+                                                   5, sizeof(apr_file_t*));
+                }
+                APR_ARRAY_PUSH(stream->files, apr_file_t*) = f->fd;
+            }
+        }
+    }
+    return status;
 }
 
 apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response,
index 66dca0dbb482359a4c88331d459894f1ccada716..33f28f6eab710840156c8ea5a9228d41fb09c93b 100644 (file)
@@ -54,6 +54,7 @@ struct h2_stream {
     struct h2_bucket_beam *output;
     apr_bucket_brigade *buffer;
     apr_bucket_brigade *tmp;
+    apr_array_header_t *files;  /* apr_file_t* we collected during I/O */
 
     int rst_error;              /* stream error for RST_STREAM */
     unsigned int aborted   : 1; /* was aborted */
@@ -62,7 +63,6 @@ struct h2_stream {
     unsigned int submitted : 1; /* response HEADER has been sent */
     
     apr_off_t input_remaining;  /* remaining bytes on input as advertised via content-length */
-
     apr_off_t data_frames_sent; /* # of DATA frames sent out for this stream */
 };
 
index 26f1bf5ba476563812d86d648f6a0b83056b9fb1..92029d894d6d15515664f11fa3bf66c3eb987994 100644 (file)
@@ -416,7 +416,7 @@ static apr_status_t h2_filter_stream_input(ap_filter_t* filter,
                                            apr_read_type_e block,
                                            apr_off_t readbytes)
 {
-    h2_task *task = filter->ctx;
+    h2_task *task = h2_ctx_cget_task(filter->c);
     AP_DEBUG_ASSERT(task);
     return input_read(task, filter, brigade, mode, block, readbytes);
 }
@@ -424,20 +424,20 @@ static apr_status_t h2_filter_stream_input(ap_filter_t* filter,
 static apr_status_t h2_filter_stream_output(ap_filter_t* filter,
                                             apr_bucket_brigade* brigade)
 {
-    h2_task *task = filter->ctx;
+    h2_task *task = h2_ctx_cget_task(filter->c);
     AP_DEBUG_ASSERT(task);
     return output_write(task, filter, brigade);
 }
 
-static apr_status_t h2_filter_read_response(ap_filter_t* f,
+static apr_status_t h2_filter_read_response(ap_filter_t* filter,
                                             apr_bucket_brigade* bb)
 {
-    h2_task *task = f->ctx;
+    h2_task *task = h2_ctx_cget_task(filter->c);
     AP_DEBUG_ASSERT(task);
     if (!task->output.from_h1) {
         return APR_ECONNABORTED;
     }
-    return h2_from_h1_read_response(task->output.from_h1, f, bb);
+    return h2_from_h1_read_response(task->output.from_h1, filter, bb);
 }
 
 /*******************************************************************************
@@ -485,6 +485,9 @@ void h2_task_rst(h2_task *task, int error)
     if (task->output.beam) {
         h2_beam_abort(task->output.beam);
     }
+    if (task->c) {
+        task->c->aborted = 1;
+    }
 }
 
 apr_status_t h2_task_shutdown(h2_task *task, int block)
@@ -507,6 +510,8 @@ apr_status_t h2_task_shutdown(h2_task *task, int block)
 /*******************************************************************************
  * Register various hooks
  */
+static const char *const mod_ssl[]        = { "mod_ssl.c", NULL};
+static int h2_task_pre_conn(conn_rec* c, void *arg);
 static int h2_task_process_conn(conn_rec* c);
 
 APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_in) *h2_task_logio_add_bytes_in;
@@ -514,6 +519,12 @@ APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_out) *h2_task_logio_add_bytes_out;
 
 void h2_task_register_hooks(void)
 {
+    /* This hook runs on new connections before mod_ssl has a say.
+     * Its purpose is to prevent mod_ssl from touching our pseudo-connections
+     * for streams.
+     */
+    ap_hook_pre_connection(h2_task_pre_conn,
+                           NULL, mod_ssl, APR_HOOK_FIRST);
     /* When the connection processing actually starts, we might 
      * take over, if the connection is for a task.
      */
@@ -541,6 +552,28 @@ apr_status_t h2_task_init(apr_pool_t *pool, server_rec *s)
     return APR_SUCCESS;
 }
 
+static int h2_task_pre_conn(conn_rec* c, void *arg)
+{
+    h2_ctx *ctx;
+    
+    if (!c->master) {
+        return OK;
+    }
+    
+    ctx = h2_ctx_get(c, 0);
+    (void)arg;
+    if (h2_ctx_is_task(ctx)) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c,
+                      "h2_h2, pre_connection, found stream task");
+        
+        /* Add our own, network level in- and output filters.
+         */
+        ap_add_input_filter("H2_TO_H1", NULL, NULL, c);
+        ap_add_output_filter("H1_TO_H2", NULL, NULL, c);
+    }
+    return OK;
+}
+
 h2_task *h2_task_create(conn_rec *c, const h2_request *req, 
                         h2_bucket_beam *input, h2_mplx *mplx)
 {
@@ -570,17 +603,11 @@ h2_task *h2_task_create(conn_rec *c, const h2_request *req,
     apr_thread_cond_create(&task->cond, pool);
 
     h2_ctx_create_for(c, task);
-    /* Add our own, network level in- and output filters. */
-    ap_add_input_filter("H2_TO_H1", task, NULL, c);
-    ap_add_output_filter("H1_TO_H2", task, NULL, c);
-
     return task;
 }
 
 void h2_task_destroy(h2_task *task)
 {
-    ap_remove_input_filter_byhandle(task->c->input_filters, "H2_TO_H1");
-    ap_remove_output_filter_byhandle(task->c->output_filters, "H1_TO_H2");
     if (task->output.beam) {
         h2_beam_destroy(task->output.beam);
         task->output.beam = NULL;
index 206bf4bd2b3c37d551ae532be0195b5442438352..e6fe45965f54faed457e097731b8c554cac09e3f 100644 (file)
@@ -1052,6 +1052,8 @@ apr_status_t h2_append_brigade(apr_bucket_brigade *to,
         if (APR_BUCKET_IS_METADATA(e)) {
             if (APR_BUCKET_IS_EOS(e)) {
                 *peos = 1;
+                apr_bucket_delete(e);
+                continue;
             }
         }
         else {        
index 13bb85724bd75ed49f05a9f54dfa60f7c30c10b3..381b31a881961190ddaaadbee27ff43215a2722a 100644 (file)
@@ -125,12 +125,12 @@ static int proxy_http2_canon(request_rec *r, char *url)
     apr_port_t port, def_port;
 
     /* ap_port_of_scheme() */
-    if (ap_casecmpstrn(url, "h2c:", 4) == 0) {
+    if (h2_casecmpstrn(url, "h2c:", 4) == 0) {
         url += 4;
         scheme = "h2c";
         http_scheme = "http";
     }
-    else if (ap_casecmpstrn(url, "h2:", 3) == 0) {
+    else if (h2_casecmpstrn(url, "h2:", 3) == 0) {
         url += 3;
         scheme = "h2";
         http_scheme = "https";