]> granicus.if.org Git - apache/commitdiff
Merge of r1766857,1767128,1767180,1767181,1767553 from trunk
authorStefan Eissing <icing@apache.org>
Tue, 1 Nov 2016 20:24:52 +0000 (20:24 +0000)
committerStefan Eissing <icing@apache.org>
Tue, 1 Nov 2016 20:24:52 +0000 (20:24 +0000)
mod_http2/mod_proxy_http2 improvments as in CHANGES

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/branches/2.4.x@1767563 13f79535-47bb-0310-9956-ffa450edef68

22 files changed:
CHANGES
modules/http2/NWGNUmod_http2
modules/http2/h2.h
modules/http2/h2_conn.c
modules/http2/h2_conn.h
modules/http2/h2_from_h1.c
modules/http2/h2_mplx.c
modules/http2/h2_mplx.h
modules/http2/h2_ngn_shed.c
modules/http2/h2_proxy_session.c
modules/http2/h2_proxy_session.h
modules/http2/h2_request.c
modules/http2/h2_session.c
modules/http2/h2_session.h
modules/http2/h2_task.c
modules/http2/h2_task.h
modules/http2/h2_util.c
modules/http2/h2_util.h
modules/http2/h2_version.h
modules/http2/mod_http2.c
modules/http2/mod_http2.h
modules/http2/mod_proxy_http2.c

diff --git a/CHANGES b/CHANGES
index 84151b4d03980a6fb80a77804ac45b87130c464f..9ee96ba26b43b0564724cfccd22ddda6efe87b8e 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -2,6 +2,18 @@
 
 Changes with Apache 2.4.24
 
+  *) mod_http2: unannounced and multiple interim responses (status code < 200)
+     are parsed and forwarded to client until a final response arrives.
+     [Stefan Eissing]
+  
+  *) mod_proxy_http2: improved robustness when main connection is closed early
+     by resetting all ongoing streams against the backend.
+     [Stefan Eissing]
+  
+  *) mod_http2: allocators from slave connections are released earlier, resulting
+     in less overall memory use on busy, long lived connections.
+     [Stefan Eissing]
+     
   *) mod_remoteip: Pick up where we left off during a subrequest rather
      than running with the modified XFF but original TCP address.
      PR 49839/PR 60251
index b6880d7a708523f7492f575169415559dfcd69be..10974a7ebc763cc9b8dca04a2e31f9647dc5f1ae 100644 (file)
@@ -355,18 +355,6 @@ $(OBJDIR)/mod_http2.imp : NWGNUmod_http2
        @echo $(DL)GEN  $@$(DL)
        @echo $(DL) (HTTP2)$(DL) > $@
        @echo $(DL) http2_module,$(DL) >> $@
-       @echo $(DL) h2_ihash_add,$(DL) >> $@
-       @echo $(DL) h2_ihash_clear,$(DL) >> $@
-       @echo $(DL) h2_ihash_count,$(DL) >> $@
-       @echo $(DL) h2_ihash_create,$(DL) >> $@
-       @echo $(DL) h2_ihash_empty,$(DL) >> $@
-       @echo $(DL) h2_ihash_iter,$(DL) >> $@
-       @echo $(DL) h2_ihash_remove,$(DL) >> $@
-       @echo $(DL) h2_iq_add,$(DL) >> $@
-       @echo $(DL) h2_iq_create,$(DL) >> $@
-       @echo $(DL) h2_iq_remove,$(DL) >> $@
-       @echo $(DL) h2_log2,$(DL) >> $@
-       @echo $(DL) h2_headers_add_h1,$(DL) >> $@
        @echo $(DL) nghttp2_is_fatal,$(DL) >> $@
        @echo $(DL) nghttp2_option_del,$(DL) >> $@
        @echo $(DL) nghttp2_option_new,$(DL) >> $@
index 62fec3362697e0b99ffd8fe11ef302c98340d1e6..59719ad8c7f02f1c6e094af89e93d436fc5a76df 100644 (file)
@@ -122,15 +122,11 @@ struct h2_request {
     const char *scheme;
     const char *authority;
     const char *path;
-    
     apr_table_t *headers;
 
     apr_time_t request_time;
-    
     unsigned int chunked : 1;   /* iff requst body needs to be forwarded as chunked */
     unsigned int serialize : 1; /* iff this request is written in HTTP/1.1 serialization */
-    unsigned int expect_100 : 1; /* iff we need a 100-continue response */
-    unsigned int expect_failed : 1; /* iff we are unable to fullfill expects */
 };
 
 typedef struct h2_headers h2_headers;
index a0915c3eb44ded54d3d6dcf4035543c25daa91e3..6f3a8cfe8ec63215abdf2edf71a98f418afca0bc 100644 (file)
@@ -241,9 +241,9 @@ apr_status_t h2_conn_pre_close(struct h2_ctx *ctx, conn_rec *c)
     return status;
 }
 
-conn_rec *h2_slave_create(conn_rec *master, int slave_id, 
-                          apr_pool_t *parent, apr_allocator_t *allocator)
+conn_rec *h2_slave_create(conn_rec *master, int slave_id, apr_pool_t *parent)
 {
+    apr_allocator_t *allocator;
     apr_pool_t *pool;
     conn_rec *c;
     void *cfg;
@@ -257,9 +257,7 @@ conn_rec *h2_slave_create(conn_rec *master, int slave_id,
      * independant of its parent pool in the sense that it can work in
      * another thread.
      */
-    if (!allocator) {
-        apr_allocator_create(&allocator);
-    }
+    apr_allocator_create(&allocator);
     apr_pool_create_ex(&pool, parent, NULL, allocator);
     apr_pool_tag(pool, "h2_slave_conn");
     apr_allocator_owner_set(allocator, pool);
@@ -311,21 +309,11 @@ conn_rec *h2_slave_create(conn_rec *master, int slave_id,
     return c;
 }
 
-void h2_slave_destroy(conn_rec *slave, apr_allocator_t **pallocator)
+void h2_slave_destroy(conn_rec *slave)
 {
-    apr_pool_t *parent;
-    apr_allocator_t *allocator = apr_pool_allocator_get(slave->pool);
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, slave,
                   "h2_slave_conn(%ld): destroy (task=%s)", slave->id,
                   apr_table_get(slave->notes, H2_TASK_ID_NOTE));
-    /* Attache the allocator to the parent pool and return it for
-     * reuse, otherwise the own is still the slave pool and it will
-     * get destroyed with it. */
-    parent = apr_pool_parent_get(slave->pool);
-    if (pallocator && parent) {
-        apr_allocator_owner_set(allocator, parent);
-        *pallocator = allocator;
-    }
     apr_pool_destroy(slave->pool);
 }
 
index 13b20539b1a9f5de2390e9360dc412584f7b2462..79948644ae8c1738d93622cf4d40017fb6335590 100644 (file)
@@ -66,9 +66,8 @@ typedef enum {
 h2_mpm_type_t h2_conn_mpm_type(void);
 
 
-conn_rec *h2_slave_create(conn_rec *master, int slave_id, 
-                          apr_pool_t *parent, apr_allocator_t *allocator);
-void h2_slave_destroy(conn_rec *slave, apr_allocator_t **pallocator);
+conn_rec *h2_slave_create(conn_rec *master, int slave_id, apr_pool_t *parent);
+void h2_slave_destroy(conn_rec *slave);
 
 apr_status_t h2_slave_run_pre_connection(conn_rec *slave, apr_socket_t *csd);
 void h2_slave_run_connection(conn_rec *slave);
index 2b4f79ac1404b1fb61deb72844f06c62764b8e3b..cdb444650b718ef8444b5829778a5eeba4b3f45a 100644 (file)
@@ -424,8 +424,13 @@ static apr_status_t pass_response(h2_task *task, ap_filter_t *f,
     status = ap_pass_brigade(f->next, parser->tmp);
     apr_brigade_cleanup(parser->tmp);
     
-    parser->state = H2_RP_DONE;
-    task->output.parse_response = 0;
+    /* reset parser for possible next response */
+    parser->state = H2_RP_STATUS_LINE;
+    apr_array_clear(parser->hlines);
+
+    if (response->status >= 200) {
+        task->output.sent_response = 1;
+    }
     ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->c, 
                   APLOGNO(03197) "h2_task(%s): passed response %d", 
                   task->id, response->status);
@@ -486,6 +491,7 @@ apr_status_t h2_from_h1_parse_response(h2_task *task, ap_filter_t *f,
                 }
                 else if (line[0] == '\0') {
                     /* end of headers, pass response onward */
+                    
                     return pass_response(task, f, parser);
                 }
                 else {
index 0cff7b60fa1613153be833273d20c92bd82271de..d5635dd11233022fdca222b143ce5264444c1f6b 100644 (file)
@@ -226,11 +226,17 @@ static void purge_streams(h2_mplx *m)
 
 static void h2_mplx_destroy(h2_mplx *m)
 {
+    conn_rec **pslave;
     ap_assert(m);
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
                   "h2_mplx(%ld): destroy, tasks=%d", 
                   m->id, (int)h2_ihash_count(m->tasks));
     check_tx_free(m);
+    
+    while (m->spare_slaves->nelts > 0) {
+        pslave = (conn_rec **)apr_array_pop(m->spare_slaves);
+        h2_slave_destroy(*pslave);
+    }
     if (m->pool) {
         apr_pool_destroy(m->pool);
     }
@@ -295,14 +301,14 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
         m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id));
         m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id));
         m->q = h2_iq_create(m->pool, m->max_streams);
-        m->sready = h2_ihash_create(m->pool, offsetof(h2_stream,id));
+        m->readyq = h2_iq_create(m->pool, m->max_streams);
         m->tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id));
+        m->redo_tasks = h2_ihash_create(m->pool, offsetof(h2_task, stream_id));
 
         m->stream_timeout = stream_timeout;
         m->workers = workers;
         m->workers_max = workers->max_workers;
-        m->workers_def_limit = 4;
-        m->workers_limit = m->workers_def_limit;
+        m->workers_limit = 6; /* the original h1 max parallel connections */
         m->last_limit_change = m->last_idle_block = apr_time_now();
         m->limit_change_interval = apr_time_from_msec(200);
         
@@ -363,24 +369,20 @@ static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master)
         }
     }
     
-    if (task->output.beam) {
-        h2_beam_on_produced(task->output.beam, NULL, NULL);
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, 
-                      APLOGNO(03385) "h2_task(%s): destroy "
-                      "output beam empty=%d, holds proxies=%d", 
-                      task->id,
-                      h2_beam_empty(task->output.beam),
-                      h2_beam_holds_proxies(task->output.beam));
-    }
+    h2_beam_on_produced(task->output.beam, NULL, NULL);
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, 
+                  APLOGNO(03385) "h2_task(%s): destroy "
+                  "output beam empty=%d, holds proxies=%d", 
+                  task->id,
+                  h2_beam_empty(task->output.beam),
+                  h2_beam_holds_proxies(task->output.beam));
     
     slave = task->c;
     reuse_slave = ((m->spare_slaves->nelts < m->spare_slaves->nalloc)
                    && !task->rst_error);
     
     h2_ihash_remove(m->tasks, task->stream_id);
-    if (m->redo_tasks) {
-        h2_ihash_remove(m->redo_tasks, task->stream_id);
-    }
+    h2_ihash_remove(m->redo_tasks, task->stream_id);
     h2_task_destroy(task);
 
     if (slave) {
@@ -389,7 +391,7 @@ static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master)
         }
         else {
             slave->sbh = NULL;
-            h2_slave_destroy(slave, NULL);
+            h2_slave_destroy(slave);
         }
     }
     
@@ -432,20 +434,16 @@ static void stream_done(h2_mplx *m, h2_stream *stream, int rst_error)
      * stream destruction until the task is done. 
      */
     h2_iq_remove(m->q, stream->id);
-    h2_ihash_remove(m->sready, stream->id);
     h2_ihash_remove(m->streams, stream->id);
-    if (stream->input) {
-        m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input);
-        h2_beam_on_consumed(stream->input, NULL, NULL);
-        /* Let anyone blocked reading know that there is no more to come */
-        h2_beam_abort(stream->input);
-        /* Remove mutex after, so that abort still finds cond to signal */
-        h2_beam_mutex_set(stream->input, NULL, NULL, NULL);
-    }
-    if (stream->output) {
-        m->tx_handles_reserved += h2_beam_get_files_beamed(stream->output);
-    }
+    
     h2_stream_cleanup(stream);
+    m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input);
+    h2_beam_on_consumed(stream->input, NULL, NULL);
+    /* Let anyone blocked reading know that there is no more to come */
+    h2_beam_abort(stream->input);
+    /* Remove mutex after, so that abort still finds cond to signal */
+    h2_beam_mutex_set(stream->input, NULL, NULL, NULL);
+    m->tx_handles_reserved += h2_beam_get_files_beamed(stream->output);
 
     task = h2_ihash_get(m->tasks, stream->id);
     if (task) {
@@ -459,7 +457,7 @@ static void stream_done(h2_mplx *m, h2_stream *stream, int rst_error)
         }
         else {
             /* already finished */
-            task_destroy(m, task, 0);
+            task_destroy(m, task, 1);
         }
     }
     h2_stream_destroy(stream);
@@ -532,12 +530,8 @@ static int task_abort_connection(void *ctx, void *val)
         if (task->c) {
             task->c->aborted = 1;
         }
-        if (task->input.beam) {
-            h2_beam_abort(task->input.beam);
-        }
-        if (task->output.beam) {
-            h2_beam_abort(task->output.beam);
-        }
+        h2_beam_abort(task->input.beam);
+        h2_beam_abort(task->output.beam);
     }
     return 1;
 }
@@ -573,9 +567,8 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
         h2_iq_clear(m->q);
         purge_streams(m);
 
-        /* 3. mark all slave connections as aborted and wakeup all sleeping 
-         *    tasks. Mark all still active streams as 'done'. m->streams has to
-         *    be empty afterwards with streams either in
+        /* 3. wakeup all sleeping tasks. Mark all still active streams as 'done'. 
+         *    m->streams has to be empty afterwards with streams either in
          *    a) m->shold because a task is still active
          *    b) m->spurge because task is done, or was not started */
         h2_ihash_iter(m->tasks, task_abort_connection, m);
@@ -617,8 +610,9 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
         if (!h2_ihash_empty(m->tasks)) {
             /* when we are here, we lost track of the tasks still present.
              * this currently happens with mod_proxy_http2 when we shut
-             * down a h2_req_engine with tasks assigned... */ 
-            ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,  APLOGNO(03056)
+             * down a h2_req_engine with tasks assigned. Since no parallel
+             * processing is going on any more, we just clean them up. */ 
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,  APLOGNO(03056)
                           "h2_mplx(%ld): 3. release_join with %d tasks",
                           m->id, (int)h2_ihash_count(m->tasks));
             h2_ihash_iter(m->tasks, task_print, m);
@@ -772,11 +766,9 @@ static apr_status_t out_close(h2_mplx *m, h2_task *task)
 
     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
                   "h2_mplx(%s): close", task->id);
-    if (task->output.beam) {
-        status = h2_beam_close(task->output.beam);
-        h2_beam_log(task->output.beam, task->stream_id, "out_close", m->c, 
-                    APLOG_TRACE2);
-    }
+    status = h2_beam_close(task->output.beam);
+    h2_beam_log(task->output.beam, task->stream_id, "out_close", m->c, 
+                APLOG_TRACE2);
     output_consumed_signal(m, task);
     have_out_data_for(m, stream, 0);
     return status;
@@ -792,7 +784,7 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
         if (m->aborted) {
             status = APR_ECONNABORTED;
         }
-        else if (!h2_ihash_empty(m->sready)) {
+        else if (!h2_iq_empty(m->readyq)) {
             status = APR_SUCCESS;
         }
         else {
@@ -815,11 +807,9 @@ static void have_out_data_for(h2_mplx *m, h2_stream *stream, int response)
 {
     ap_assert(m);
     ap_assert(stream);
-    if (!h2_ihash_get(m->sready, stream->id)) {
-        h2_ihash_add(m->sready, stream);
-        if (m->added_output) {
-            apr_thread_cond_signal(m->added_output);
-        }
+    h2_iq_append(m->readyq, stream->id);
+    if (m->added_output) {
+        apr_thread_cond_signal(m->added_output);
     }
 }
 
@@ -856,7 +846,7 @@ apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream,
         else {
             h2_ihash_add(m->streams, stream);
             if (h2_stream_is_ready(stream)) {
-                h2_ihash_add(m->sready, stream);
+                h2_iq_append(m->readyq, stream->id);
             }
             else {
                 if (!m->need_registration) {
@@ -897,7 +887,7 @@ static h2_task *next_stream_task(h2_mplx *m)
                 slave = *pslave;
             }
             else {
-                slave = h2_slave_create(m->c, stream->id, m->pool, NULL);
+                slave = h2_slave_create(m->c, stream->id, m->pool);
                 new_conn = 1;
             }
             
@@ -919,16 +909,13 @@ static h2_task *next_stream_task(h2_mplx *m)
                 m->max_stream_started = sid;
             }
 
-            if (stream->input) {
-                h2_beam_timeout_set(stream->input, m->stream_timeout);
-                h2_beam_on_consumed(stream->input, stream_input_consumed, m);
-                h2_beam_on_file_beam(stream->input, can_beam_file, m);
-                h2_beam_mutex_set(stream->input, beam_enter, task->cond, m);
-            }
-            if (stream->output) {
-                h2_beam_buffer_size_set(stream->output, m->stream_max_mem);
-                h2_beam_timeout_set(stream->output, m->stream_timeout);
-            }
+            h2_beam_timeout_set(stream->input, m->stream_timeout);
+            h2_beam_on_consumed(stream->input, stream_input_consumed, m);
+            h2_beam_on_file_beam(stream->input, can_beam_file, m);
+            h2_beam_mutex_set(stream->input, beam_enter, task->cond, m);
+            
+            h2_beam_buffer_size_set(stream->output, m->stream_max_mem);
+            h2_beam_timeout_set(stream->output, m->stream_timeout);
             ++m->workers_busy;
         }
     }
@@ -977,10 +964,8 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
         
         if (ngn) {
             apr_off_t bytes = 0;
-            if (task->output.beam) {
-                h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
-                bytes += h2_beam_get_buffered(task->output.beam);
-            }
+            h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
+            bytes += h2_beam_get_buffered(task->output.beam);
             if (bytes > 0) {
                 /* we need to report consumed and current buffered output
                  * to the engine. The request will be streamed out or cancelled,
@@ -991,7 +976,8 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
         }
         
         if (task->engine) {
-            if (!h2_req_engine_is_shutdown(task->engine)) {
+            if (!m->aborted && !task->c->aborted 
+                && !h2_req_engine_is_shutdown(task->engine)) {
                 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
                               "h2_mplx(%ld): task(%s) has not-shutdown "
                               "engine(%s)", m->id, task->id, 
@@ -1001,7 +987,7 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
         }
         
         stream = h2_ihash_get(m->streams, task->stream_id);
-        if (!m->aborted && stream && m->redo_tasks
+        if (!m->aborted && stream 
             && h2_ihash_get(m->redo_tasks, task->stream_id)) {
             /* reset and schedule again */
             h2_task_redo(task);
@@ -1152,9 +1138,6 @@ static apr_status_t unschedule_slow_tasks(h2_mplx *m)
     h2_task *task;
     int n;
     
-    if (!m->redo_tasks) {
-        m->redo_tasks = h2_ihash_create(m->pool, offsetof(h2_task, stream_id));
-    }
     /* Try to get rid of streams that occupy workers. Look for safe requests
      * that are repeatable. If none found, fail the connection.
      */
@@ -1331,7 +1314,8 @@ apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn,
     return status;
 }
  
-void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn)
+void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn,
+                             apr_status_t status)
 {
     h2_task *task = h2_ctx_cget_task(r_conn);
     
@@ -1342,6 +1326,10 @@ void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn)
         if (enter_mutex(m, &acquired) == APR_SUCCESS) {
             ngn_out_update_windows(m, ngn);
             h2_ngn_shed_done_task(m->ngn_shed, ngn, task);
+            if (status != APR_SUCCESS && h2_task_can_redo(task) 
+                && !h2_ihash_get(m->redo_tasks, task->stream_id)) {
+                h2_ihash_add(m->redo_tasks, task);
+            }
             if (task->engine) { 
                 /* cannot report that as done until engine returns */
             }
@@ -1371,7 +1359,7 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m,
 {
     apr_status_t status;
     int acquired;
-    int streams[32];
+    int ids[100];
     h2_stream *stream;
     size_t i, n;
     
@@ -1381,17 +1369,16 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m,
                       
         /* update input windows for streams */
         h2_ihash_iter(m->streams, update_window, m);
-        if (on_resume && !h2_ihash_empty(m->sready)) {
-            n = h2_ihash_ishift(m->sready, streams, H2_ALEN(streams));
+        if (on_resume && !h2_iq_empty(m->readyq)) {
+            n = h2_iq_mshift(m->readyq, ids, H2_ALEN(ids));
             for (i = 0; i < n; ++i) {
-                stream = h2_ihash_get(m->streams, streams[i]);
-                if (!stream) {
-                    continue;
+                stream = h2_ihash_get(m->streams, ids[i]);
+                if (stream) {
+                    ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, 
+                                  "h2_mplx(%ld-%d): on_resume", 
+                                  m->id, stream->id);
+                    on_resume(on_ctx, stream);
                 }
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, 
-                              "h2_mplx(%ld-%d): on_resume", 
-                              m->id, stream->id);
-                on_resume(on_ctx, stream);
             }
         }
         
@@ -1408,7 +1395,7 @@ apr_status_t h2_mplx_keep_active(h2_mplx *m, int stream_id)
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         h2_stream *s = h2_ihash_get(m->streams, stream_id);
         if (s) {
-            h2_ihash_add(m->sready, s);
+            h2_iq_append(m->readyq, stream_id);
         }
         leave_mutex(m, acquired);
     }
index f7e3501783addfac0d266b6565c5eccca5588f04..25e07005e706c69f71ed9d924802f4876de850a8 100644 (file)
@@ -75,7 +75,7 @@ struct h2_mplx {
     struct h2_ihash_t *spurge;      /* all streams done, ready for destroy */
 
     struct h2_iqueue *q;            /* all stream ids that need to be started */
-    struct h2_ihash_t *sready;      /* all streams ready for output */
+    struct h2_iqueue *readyq;       /* all stream ids ready for output */
         
     struct h2_ihash_t *tasks;       /* all tasks started and not destroyed */
     struct h2_ihash_t *redo_tasks;  /* all tasks that need to be redone */
@@ -84,7 +84,6 @@ struct h2_mplx {
     int max_stream_started; /* highest stream id that started processing */
     int workers_busy;       /* # of workers processing on this mplx */
     int workers_limit;      /* current # of workers limit, dynamic */
-    int workers_def_limit;  /* default # of workers limit */
     int workers_max;        /* max, hard limit # of workers in a process */
     apr_time_t last_idle_block;      /* last time, this mplx entered IDLE while
                                       * streams were ready */
@@ -351,6 +350,7 @@ apr_status_t h2_mplx_req_engine_pull(struct h2_req_engine *ngn,
                                      apr_read_type_e block, 
                                      int capacity, 
                                      request_rec **pr);
-void h2_mplx_req_engine_done(struct h2_req_engine *ngn, conn_rec *r_conn);
+void h2_mplx_req_engine_done(struct h2_req_engine *ngn, conn_rec *r_conn,
+                             apr_status_t status);
 
 #endif /* defined(__mod_h2__h2_mplx__) */
index 2f5b7296177214f3129ec7441a457b6331144e34..e0c40cfb233cbfa32f4e44be9d2733411c892ac0 100644 (file)
@@ -72,7 +72,7 @@ struct h2_req_engine {
     const char *type;      /* name of the engine type */
     apr_pool_t *pool;      /* pool for engine specific allocations */
     conn_rec *c;           /* connection this engine is assigned to */
-    h2_task *task;         /* the task this engine is base on, running in */
+    h2_task *task;         /* the task this engine is based on, running in */
     h2_ngn_shed *shed;
 
     unsigned int shutdown : 1; /* engine is being shut down */
@@ -335,7 +335,7 @@ void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn)
     
     if (!shed->aborted && !H2_REQ_ENTRIES_EMPTY(&ngn->entries)) {
         h2_ngn_entry *entry;
-        ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, shed->c,
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c,
                       "h2_ngn_shed(%ld): exit engine %s (%s), "
                       "has still requests queued, shutdown=%d,"
                       "assigned=%ld, live=%ld, finished=%ld", 
@@ -347,7 +347,7 @@ void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn)
              entry != H2_REQ_ENTRIES_SENTINEL(&ngn->entries);
              entry = H2_NGN_ENTRY_NEXT(entry)) {
             h2_task *task = entry->task;
-            ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, shed->c,
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c,
                           "h2_ngn_shed(%ld): engine %s has queued task %s, "
                           "frozen=%d, aborting",
                           shed->c->id, ngn->id, task->id, task->frozen);
@@ -356,7 +356,7 @@ void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn)
         }
     }
     if (!shed->aborted && (ngn->no_assigned > 1 || ngn->no_live > 1)) {
-        ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, shed->c,
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c,
                       "h2_ngn_shed(%ld): exit engine %s (%s), "
                       "assigned=%ld, live=%ld, finished=%ld", 
                       shed->c->id, ngn->id, ngn->type,
index a79c5da47954fb0b58ae7567eba6c290218931f6..59ae9d48e2941295e3144668265138cd76a388a0 100644 (file)
@@ -1137,13 +1137,13 @@ static void ev_stream_done(h2_proxy_session *session, int stream_id,
     if (stream) {
         int touched = (stream->data_sent || 
                        stream_id <= session->last_stream_id);
-        int complete = (stream->error_code == 0);
+        apr_status_t status = (stream->error_code == 0)? APR_SUCCESS : APR_EINVAL;
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03364)
                       "h2_proxy_sesssion(%s): stream(%d) closed "
                       "(touched=%d, error=%d)", 
                       session->id, stream_id, touched, stream->error_code);
         
-        if (!complete) {
+        if (status != APR_SUCCESS) {
             stream->r->status = 500;
         }
         else if (!stream->data_received) {
@@ -1164,7 +1164,7 @@ static void ev_stream_done(h2_proxy_session *session, int stream_id,
         h2_proxy_ihash_remove(session->streams, stream_id);
         h2_proxy_iq_remove(session->suspended, stream_id);
         if (session->done) {
-            session->done(session, stream->r, complete, touched);
+            session->done(session, stream->r, status, touched);
         }
     }
     
@@ -1276,6 +1276,21 @@ static void dispatch_event(h2_proxy_session *session, h2_proxys_event_t ev,
     }
 }
 
+static int send_loop(h2_proxy_session *session)
+{
+    while (nghttp2_session_want_write(session->ngh2)) {
+        int rv = nghttp2_session_send(session->ngh2);
+        if (rv < 0 && nghttp2_is_fatal(rv)) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, 
+                          "h2_proxy_session(%s): write, rv=%d", session->id, rv);
+            dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, rv, NULL);
+            break;
+        }
+        return 1;
+    }
+    return 0;
+}
+
 apr_status_t h2_proxy_session_process(h2_proxy_session *session)
 {
     apr_status_t status;
@@ -1300,16 +1315,7 @@ run_loop:
         case H2_PROXYS_ST_BUSY:
         case H2_PROXYS_ST_LOCAL_SHUTDOWN:
         case H2_PROXYS_ST_REMOTE_SHUTDOWN:
-            while (nghttp2_session_want_write(session->ngh2)) {
-                int rv = nghttp2_session_send(session->ngh2);
-                if (rv < 0 && nghttp2_is_fatal(rv)) {
-                    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, 
-                                  "h2_proxy_session(%s): write, rv=%d", session->id, rv);
-                    dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, rv, NULL);
-                    break;
-                }
-                have_written = 1;
-            }
+            have_written = send_loop(session);
             
             if (nghttp2_session_want_read(session->ngh2)) {
                 status = h2_proxy_session_read(session, 0, 0);
@@ -1386,13 +1392,36 @@ typedef struct {
     h2_proxy_request_done *done;
 } cleanup_iter_ctx;
 
+static int cancel_iter(void *udata, void *val)
+{
+    cleanup_iter_ctx *ctx = udata;
+    h2_proxy_stream *stream = val;
+    nghttp2_submit_rst_stream(ctx->session->ngh2, NGHTTP2_FLAG_NONE,
+                              stream->id, 0);
+    return 1;
+}
+
+void h2_proxy_session_cancel_all(h2_proxy_session *session)
+{
+    if (!h2_proxy_ihash_empty(session->streams)) {
+        cleanup_iter_ctx ctx;
+        ctx.session = session;
+        ctx.done = session->done;
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03366)
+                      "h2_proxy_session(%s): cancel  %d streams",
+                      session->id, (int)h2_proxy_ihash_count(session->streams));
+        h2_proxy_ihash_iter(session->streams, cancel_iter, &ctx);
+        session_shutdown(session, 0, NULL);
+    }
+}
+
 static int done_iter(void *udata, void *val)
 {
     cleanup_iter_ctx *ctx = udata;
     h2_proxy_stream *stream = val;
     int touched = (stream->data_sent || 
                    stream->id <= ctx->session->last_stream_id);
-    ctx->done(ctx->session, stream->r, 0, touched);
+    ctx->done(ctx->session, stream->r, APR_ECONNABORTED, touched);
     return 1;
 }
 
index 4f8205027f1d41e9222665aff4082662b1946c19..709fe4b0b7caa577b3f9f74816b2bfbcbd6f8bc8 100644 (file)
@@ -52,7 +52,7 @@ typedef enum {
 
 typedef struct h2_proxy_session h2_proxy_session;
 typedef void h2_proxy_request_done(h2_proxy_session *s, request_rec *r,
-                                   int complete, int touched);
+                                   apr_status_t status, int touched);
 
 struct h2_proxy_session {
     const char *id;
@@ -103,6 +103,8 @@ apr_status_t h2_proxy_session_submit(h2_proxy_session *s, const char *url,
  */
 apr_status_t h2_proxy_session_process(h2_proxy_session *s);
 
+void h2_proxy_session_cancel_all(h2_proxy_session *s);
+
 void h2_proxy_session_cleanup(h2_proxy_session *s, h2_proxy_request_done *done);
 
 void h2_proxy_session_update_window(h2_proxy_session *s, 
index 74a9b1a6e90ca41530a128037223fb93e29ad676..b6f13c4734c2007033be3ae12d4a5169614f7435 100644 (file)
@@ -187,16 +187,6 @@ apr_status_t h2_request_end_headers(h2_request *req, apr_pool_t *pool, int eos)
         }
     }
 
-    s = apr_table_get(req->headers, "Expect");
-    if (s && s[0]) {
-        if (ap_cstr_casecmp(s, "100-continue") == 0) {
-            req->expect_100 = 1;
-        }
-        else {
-            req->expect_failed = 1;
-        }
-    }
-
     return APR_SUCCESS;
 }
 
@@ -217,6 +207,7 @@ request_rec *h2_request_create_rec(const h2_request *req, conn_rec *c)
     const char *rpath;
     apr_pool_t *p;
     request_rec *r;
+    const char *s;
 
     apr_pool_create(&p, c->pool);
     apr_pool_tag(p, "request");
@@ -296,12 +287,15 @@ request_rec *h2_request_create_rec(const h2_request *req, conn_rec *c)
     /* we may have switched to another server */
     r->per_dir_config = r->server->lookup_defaults;
     
-    if (req->expect_100) {
-        r->expecting_100 = 1;
-    }
-    else if (req->expect_failed) {
-        r->status = HTTP_EXPECTATION_FAILED;
-        ap_send_error_response(r, 0);
+    s = apr_table_get(r->headers_in, "Expect");
+    if (s && s[0]) {
+        if (ap_cstr_casecmp(s, "100-continue") == 0) {
+            r->expecting_100 = 1;
+        }
+        else {
+            r->status = HTTP_EXPECTATION_FAILED;
+            ap_send_error_response(r, 0);
+        }
     }
 
     /*
index 27ed9197b51c30378092e816ea865bd2919fd78e..f743f4814ccae4c11ccf3cfbebfad802ce49d6c6 100644 (file)
@@ -82,7 +82,6 @@ apr_status_t h2_session_stream_done(h2_session *session, h2_stream *stream)
     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
                   "h2_stream(%ld-%d): EOS bucket cleanup -> done", 
                   session->id, stream->id);
-    h2_ihash_remove(session->streams, stream->id);
     h2_mplx_stream_done(session->mplx, stream);
     
     dispatch_event(session, H2_SESSION_EV_STREAM_DONE, 0, NULL);
@@ -94,10 +93,9 @@ typedef struct stream_sel_ctx {
     h2_stream *candidate;
 } stream_sel_ctx;
 
-static int find_cleanup_stream(void *udata, void *sdata)
+static int find_cleanup_stream(h2_stream *stream, void *ictx)
 {
-    stream_sel_ctx *ctx = udata;
-    h2_stream *stream = sdata;
+    stream_sel_ctx *ctx = ictx;
     if (H2_STREAM_CLIENT_INITIATED(stream->id)) {
         if (!ctx->session->local.accepting
             && stream->id > ctx->session->local.accepted_max) {
@@ -121,7 +119,7 @@ static void cleanup_streams(h2_session *session)
     ctx.session = session;
     ctx.candidate = NULL;
     while (1) {
-        h2_ihash_iter(session->streams, find_cleanup_stream, &ctx);
+        h2_mplx_stream_do(session->mplx, find_cleanup_stream, &ctx);
         if (ctx.candidate) {
             h2_session_stream_done(session, ctx.candidate);
             ctx.candidate = NULL;
@@ -144,7 +142,6 @@ h2_stream *h2_session_open_stream(h2_session *session, int stream_id,
     stream = h2_stream_open(stream_id, stream_pool, session, 
                             initiated_on);
     nghttp2_session_set_stream_user_data(session->ngh2, stream_id, stream);
-    h2_ihash_add(session->streams, stream);
     
     if (req) {
         h2_stream_set_request(stream, req);
@@ -713,7 +710,6 @@ static void h2_session_destroy(h2_session *session)
 {
     ap_assert(session);    
 
-    h2_ihash_clear(session->streams);
     if (session->mplx) {
         h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
         h2_mplx_release_and_join(session->mplx, session->iowait);
@@ -927,8 +923,6 @@ static h2_session *h2_session_create_int(conn_rec *c,
             return NULL;
         }
         
-        session->streams = h2_ihash_create(session->pool,
-                                           offsetof(h2_stream, id));
         session->mplx = h2_mplx_create(c, session->pool, session->config, 
                                        session->s->timeout, workers);
         
index 4d21cb86f5d22844b1e14e0086fc3a8e4213d76e..69b7a59c44a9583325be09cdcb08d2986ca9db7c 100644 (file)
@@ -86,7 +86,6 @@ typedef struct h2_session {
     struct h2_workers *workers;     /* for executing stream tasks */
     struct h2_filter_cin *cin;      /* connection input filter context */
     h2_conn_io io;                  /* io on httpd conn filters */
-    struct h2_ihash_t *streams;     /* streams handled by this session */
     struct nghttp2_session *ngh2;   /* the nghttp2 session (internal use) */
 
     h2_session_state state;         /* state session is in */
index 3f70b3aa21d5665d932bbb12fd6a9e0fe02761de..1419aab5516c141542b9a56870d9c6641db3a398 100644 (file)
@@ -90,7 +90,7 @@ static apr_status_t open_output(h2_task *task)
     return h2_mplx_out_open(task->mplx, task->stream_id, task->output.beam);
 }
 
-static apr_status_t send_out(h2_task *task, apr_bucket_brigade* bb)
+static apr_status_t send_out(h2_task *task, apr_bucket_brigade* bb, int block)
 {
     apr_off_t written, left;
     apr_status_t status;
@@ -99,8 +99,7 @@ static apr_status_t send_out(h2_task *task, apr_bucket_brigade* bb)
     H2_TASK_OUT_LOG(APLOG_TRACE2, task, bb, "h2_task send_out");
     /* engines send unblocking */
     status = h2_beam_send(task->output.beam, bb, 
-                          task->assigned? APR_NONBLOCK_READ
-                          : APR_BLOCK_READ);
+                          block? APR_BLOCK_READ : APR_NONBLOCK_READ);
     if (APR_STATUS_IS_EAGAIN(status)) {
         apr_brigade_length(bb, 0, &left);
         written -= left;
@@ -130,13 +129,7 @@ static apr_status_t slave_out(h2_task *task, ap_filter_t* f,
 {
     apr_bucket *b;
     apr_status_t status = APR_SUCCESS;
-    int flush = 0;
-    
-    if (APR_BRIGADE_EMPTY(bb)) {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
-                      "h2_slave_out(%s): empty write", task->id);
-        return APR_SUCCESS;
-    }
+    int flush = 0, blocking;
     
     if (task->frozen) {
         h2_util_bb_log(task->c, task->stream_id, APLOG_TRACE2,
@@ -153,57 +146,46 @@ static apr_status_t slave_out(h2_task *task, ap_filter_t* f,
         }
         return APR_SUCCESS;
     }
-    
-    /* Attempt to write saved brigade first */
-    if (task->output.bb && !APR_BRIGADE_EMPTY(task->output.bb)) {
-        status = send_out(task, task->output.bb); 
-        if (status != APR_SUCCESS) {
-            return status;
-        }
-    }
-    
-    /* If there is nothing saved (anymore), try to write the brigade passed */
-    if ((!task->output.bb || APR_BRIGADE_EMPTY(task->output.bb)) 
-        && !APR_BRIGADE_EMPTY(bb)) {
-        /* check if we have a flush before the end-of-request */
-        if (!task->output.opened) {
-            for (b = APR_BRIGADE_FIRST(bb);
-                 b != APR_BRIGADE_SENTINEL(bb);
-                 b = APR_BUCKET_NEXT(b)) {
-                if (AP_BUCKET_IS_EOR(b)) {
-                    break;
-                }
-                else if (APR_BUCKET_IS_FLUSH(b)) {
-                    flush = 1;
-                }
-            }
-        }
 
-        status = send_out(task, bb); 
-        if (status != APR_SUCCESS) {
-            return status;
+    /* we send block once we opened the output, so someone is there
+     * reading it *and* the task is not assigned to a h2_req_engine */
+    blocking = (!task->assigned && task->output.opened);
+    if (!task->output.opened) {
+        for (b = APR_BRIGADE_FIRST(bb);
+             b != APR_BRIGADE_SENTINEL(bb);
+             b = APR_BUCKET_NEXT(b)) {
+            if (APR_BUCKET_IS_FLUSH(b)) {
+                flush = 1;
+                break;
+            }
         }
     }
     
-    /* If the passed brigade is not empty, save it before return */
-    if (!APR_BRIGADE_EMPTY(bb)) {
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, task->c, APLOGNO(03405)
-                      "h2_slave_out(%s): could not write all, saving brigade", 
-                      task->id);
-        if (!task->output.bb) {
-            task->output.bb = apr_brigade_create(task->pool, 
-                                          task->c->bucket_alloc);
+    if (task->output.bb && !APR_BRIGADE_EMPTY(task->output.bb)) {
+        /* still have data buffered from previous attempt.
+         * setaside and append new data and try to pass the complete data */
+        if (!APR_BRIGADE_EMPTY(bb)) {
+            status = ap_save_brigade(f, &task->output.bb, &bb, task->pool);
         }
-        status = ap_save_brigade(f, &task->output.bb, &bb, task->pool);
-        if (status != APR_SUCCESS) {
-            return status;
+        if (status == APR_SUCCESS) {
+            status = send_out(task, task->output.bb, blocking);
+        } 
+    }
+    else {
+        /* no data buffered here, try to pass the brigade directly */
+        status = send_out(task, bb, blocking); 
+        if (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) {
+            /* could not write all, buffer the rest */
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, task->c, APLOGNO(03405)
+                          "h2_slave_out(%s): saving brigade", 
+                          task->id);
+            status = ap_save_brigade(f, &task->output.bb, &bb, task->pool);
+            flush = 1;
         }
     }
     
-    if (!task->output.opened 
-        && (flush || h2_beam_get_mem_used(task->output.beam) > (32*1024))) {
-        /* if we have enough buffered or we got a flush bucket, open
-        * the response now. */
+    if (status == APR_SUCCESS && !task->output.opened && flush) {
+        /* got a flush or could not write all, time to tell someone to read */
         status = open_output(task);
     }
     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, task->c, 
@@ -349,7 +331,7 @@ static apr_status_t h2_filter_slave_in(ap_filter_t* f,
         /* Hmm, well. There is mode AP_MODE_EATCRLF, but we chose not
          * to support it. Seems to work. */
         ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOTIMPL, f->c,
-                      APLOGNO(02942) 
+                      APLOGNO(03472) 
                       "h2_slave_in(%s), unsupported READ mode %d", 
                       task->id, mode);
         status = APR_ENOTIMPL;
@@ -386,7 +368,7 @@ static apr_status_t h2_filter_parse_h1(ap_filter_t* f, apr_bucket_brigade* bb)
     /* There are cases where we need to parse a serialized http/1.1 
      * response. One example is a 100-continue answer in serialized mode
      * or via a mod_proxy setup */
-    while (task->output.parse_response) {
+    while (!task->output.sent_response) {
         status = h2_from_h1_parse_response(task, f, bb);
         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, f->c,
                       "h2_task(%s): parsed response", task->id);
@@ -403,7 +385,7 @@ static apr_status_t h2_filter_parse_h1(ap_filter_t* f, apr_bucket_brigade* bb)
  ******************************************************************************/
  
 int h2_task_can_redo(h2_task *task) {
-    if (task->input.beam && h2_beam_was_received(task->input.beam)) {
+    if (h2_beam_was_received(task->input.beam)) {
         /* cannot repeat that. */
         return 0;
     }
@@ -420,10 +402,8 @@ void h2_task_redo(h2_task *task)
 void h2_task_rst(h2_task *task, int error)
 {
     task->rst_error = error;
-    if (task->input.beam) {
-        h2_beam_abort(task->input.beam);
-    }
-    if (!task->worker_done && task->output.beam) {
+    h2_beam_abort(task->input.beam);
+    if (!task->worker_done) {
         h2_beam_abort(task->output.beam);
     }
     if (task->c) {
@@ -583,23 +563,13 @@ apr_status_t h2_task_do(h2_task *task, apr_thread_t *thread, int worker_id)
     task->input.bb = apr_brigade_create(task->pool, task->c->bucket_alloc);
     if (task->request->serialize) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
-                      "h2_task(%s): serialize request %s %s, expect-100=%d", 
-                      task->id, task->request->method, task->request->path,
-                      task->request->expect_100);
+                      "h2_task(%s): serialize request %s %s", 
+                      task->id, task->request->method, task->request->path);
         apr_brigade_printf(task->input.bb, NULL, 
                            NULL, "%s %s HTTP/1.1\r\n", 
                            task->request->method, task->request->path);
         apr_table_do(input_ser_header, task, task->request->headers, NULL);
         apr_brigade_puts(task->input.bb, NULL, NULL, "\r\n");
-        if (task->request->expect_100) {
-            /* we are unable to suppress the serialization of the 
-             * intermediate response and need to parse it */
-            task->output.parse_response = 1;
-        }
-    }
-    
-    if (task->request->expect_100) {
-        task->output.parse_response = 1;
     }
     
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
index ad8f0565961bd2fca320481f733240f82844f3d5..a8f0f2c315bc2aa1e886e56bce9d3cd5b1744da9 100644 (file)
@@ -70,7 +70,6 @@ struct h2_task {
         unsigned int opened : 1;
         unsigned int sent_response : 1;
         unsigned int copy_files : 1;
-        unsigned int parse_response : 1;
         struct h2_response_parser *rparser;
         apr_bucket_brigade *bb;
     } output;
index 81b94566c58dff5f366572bb5cbd2b484baf6bb0..d800b405692e5a70bf1abf5e6bff125ebd134cdd 100644 (file)
@@ -440,10 +440,12 @@ void h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx)
 {
     int i;
     
+    if (h2_iq_contains(q, sid)) {
+        return;
+    }
     if (q->nelts >= q->nalloc) {
         iq_grow(q, q->nalloc * 2);
     }
-    
     i = (q->head + q->nelts) % q->nalloc;
     q->elts[i] = sid;
     ++q->nelts;
@@ -454,6 +456,11 @@ void h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx)
     }
 }
 
+void h2_iq_append(h2_iqueue *q, int sid)
+{
+    h2_iq_add(q, sid, NULL, NULL);
+}
+
 int h2_iq_remove(h2_iqueue *q, int sid)
 {
     int i;
@@ -522,6 +529,18 @@ int h2_iq_shift(h2_iqueue *q)
     return sid;
 }
 
+size_t h2_iq_mshift(h2_iqueue *q, int *pint, size_t max)
+{
+    int i;
+    for (i = 0; i < max; ++i) {
+        pint[i] = h2_iq_shift(q);
+        if (pint[i] == 0) {
+            break;
+        }
+    }
+    return i;
+}
+
 static void iq_grow(h2_iqueue *q, int nlen)
 {
     if (nlen > q->nalloc) {
@@ -573,6 +592,17 @@ static int iq_bubble_down(h2_iqueue *q, int i, int bottom,
     return i;
 }
 
+int h2_iq_contains(h2_iqueue *q, int sid)
+{
+    int i;
+    for (i = 0; i < q->nelts; ++i) {
+        if (sid == q->elts[(q->head + i) % q->nalloc]) {
+            return 1;
+        }
+    }
+    return 0;
+}
+
 /*******************************************************************************
  * h2_util for apt_table_t
  ******************************************************************************/
index 7c9453a6cf3dfff37b756e3e5cd3c7cca79fdc69..7b92553445761c9e74a2a60779f474d3bf9ebce0 100644 (file)
@@ -116,13 +116,21 @@ int h2_iq_count(h2_iqueue *q);
 /**
  * Add a stream id to the queue. 
  *
- * @param q the queue to append the task to
+ * @param q the queue to append the id to
  * @param sid the stream id to add
  * @param cmp the comparator for sorting
  * @param ctx user data for comparator 
  */
 void h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx);
 
+/**
+ * Append the id to the queue if not already present. 
+ *
+ * @param q the queue to append the id to
+ * @param sid the id to append
+ */
+void h2_iq_append(h2_iqueue *q, int sid);
+
 /**
  * Remove the stream id from the queue. Return != 0 iff task
  * was found in queue.
@@ -148,14 +156,33 @@ void h2_iq_clear(h2_iqueue *q);
 void h2_iq_sort(h2_iqueue *q, h2_iq_cmp *cmp, void *ctx);
 
 /**
- * Get the first stream id from the queue or NULL if the queue is empty. 
- * The task will be removed.
+ * Get the first id from the queue or 0 if the queue is empty. 
+ * The id is being removed.
  *
- * @param q the queue to get the first task from
- * @return the first stream id of the queue, 0 if empty
+ * @param q the queue to get the first id from
+ * @return the first id of the queue, 0 if empty
  */
 int h2_iq_shift(h2_iqueue *q);
 
+/**
+ * Get the first max ids from the queue. All these ids will be removed.
+ *
+ * @param q the queue to get the first task from
+ * @param pint the int array to receive the values
+ * @param max the maximum number of ids to shift
+ * @return the actual number of ids shifted
+ */
+size_t h2_iq_mshift(h2_iqueue *q, int *pint, size_t max);
+
+/**
+ * Determine if int is in the queue already
+ *
+ * @parm q the queue
+ * @param sid the integer id to check for
+ * @return != 0 iff sid is already in the queue
+ */
+int h2_iq_contains(h2_iqueue *q, int sid);
+
 /*******************************************************************************
  * common helpers
  ******************************************************************************/
index 58fcf8d97cc3931e4e00e526e06c23294bf1243a..e9986e98cd24e8302cc6cd472cb758ae76c2f317 100644 (file)
@@ -26,7 +26,7 @@
  * @macro
  * Version number of the http2 module as c string
  */
-#define MOD_HTTP2_VERSION "1.7.7"
+#define MOD_HTTP2_VERSION "1.7.8"
 
 /**
  * @macro
@@ -34,7 +34,7 @@
  * release. This is a 24 bit number with 8 bits for major number, 8 bits
  * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
  */
-#define MOD_HTTP2_VERSION_NUM 0x010707
+#define MOD_HTTP2_VERSION_NUM 0x010708
 
 
 #endif /* mod_h2_h2_version_h */
index 854e677a34ad25040f78d5b20219133137cdf7ef..7452cd7c2bd073a104fdf54720fcc3ee25bc3f6c 100644 (file)
@@ -166,9 +166,10 @@ static apr_status_t http2_req_engine_pull(h2_req_engine *ngn,
     return h2_mplx_req_engine_pull(ngn, block, capacity, pr);
 }
 
-static void http2_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn)
+static void http2_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn,
+                                  apr_status_t status)
 {
-    h2_mplx_req_engine_done(ngn, r_conn);
+    h2_mplx_req_engine_done(ngn, r_conn, status);
 }
 
 /* Runs once per created child process. Perform any process 
index 15cf9d0677b0125f950822781d299df77d3ddb63..f0cc9567cadb6e4526270a565d126555f504510b 100644 (file)
@@ -90,5 +90,6 @@ APR_DECLARE_OPTIONAL_FN(apr_status_t,
                                                 request_rec **pr));
 APR_DECLARE_OPTIONAL_FN(void, 
                         http2_req_engine_done, (h2_req_engine *engine, 
-                                                conn_rec *rconn));
+                                                conn_rec *rconn,
+                                                apr_status_t status));
 #endif
index 6a247401b30303418b0de4eff5a55f445d9df0a0..bbce1fc7ae96e73971aa7dfaac6a7c4455057bf2 100644 (file)
@@ -46,7 +46,8 @@ static apr_status_t (*req_engine_pull)(h2_req_engine *engine,
                                        apr_read_type_e block, 
                                        int capacity, 
                                        request_rec **pr);
-static void (*req_engine_done)(h2_req_engine *engine, conn_rec *r_conn);
+static void (*req_engine_done)(h2_req_engine *engine, conn_rec *r_conn,
+                               apr_status_t status);
                                        
 typedef struct h2_proxy_ctx {
     conn_rec *owner;
@@ -270,12 +271,12 @@ static apr_status_t add_request(h2_proxy_session *session, request_rec *r)
 }
 
 static void request_done(h2_proxy_session *session, request_rec *r,
-                         int complete, int touched)
+                         apr_status_t status, int touched)
 {   
     h2_proxy_ctx *ctx = session->user_data;
     const char *task_id = apr_table_get(r->connection->notes, H2_TASK_ID_NOTE);
 
-    if (!complete) {
+    if (status != APR_SUCCESS) {
         if (!touched) {
             /* untouched request, need rescheduling */
             if (req_engine_push && is_h2 && is_h2(ctx->owner)) {
@@ -292,7 +293,7 @@ static void request_done(h2_proxy_session *session, request_rec *r,
         else {
             const char *uri;
             uri = apr_uri_unparse(r->pool, &r->parsed_uri, 0);
-            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r->connection, 
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, r->connection, 
                           APLOGNO(03471) "h2_proxy_session(%s): request %s -> %s "
                           "not complete, was touched",
                           ctx->engine_id, task_id, uri);
@@ -300,23 +301,15 @@ static void request_done(h2_proxy_session *session, request_rec *r,
     }
     
     if (r == ctx->rbase) {
-        ctx->r_status = complete? APR_SUCCESS : HTTP_GATEWAY_TIME_OUT;
+        ctx->r_status = (status == APR_SUCCESS)? APR_SUCCESS : HTTP_SERVICE_UNAVAILABLE;
     }
     
     if (req_engine_done && ctx->engine) {
-        if (complete) {
-            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r->connection, 
-                          APLOGNO(03370)
-                          "h2_proxy_session(%s): finished request %s",
-                          ctx->engine_id, task_id);
-        }
-        else {
-            ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, r->connection, 
-                          APLOGNO(03371)
-                          "h2_proxy_session(%s): failed request %s",
-                          ctx->engine_id, task_id);
-        }
-        req_engine_done(ctx->engine, r->connection);
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, r->connection, 
+                      APLOGNO(03370)
+                      "h2_proxy_session(%s): finished request %s",
+                      ctx->engine_id, task_id);
+        req_engine_done(ctx->engine, r->connection, status);
     }
 }    
 
@@ -382,7 +375,12 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) {
                 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, s2, ctx->owner, 
                               APLOGNO(03374) "eng(%s): pull request", 
                               ctx->engine_id);
-                status = s2;
+                /* give notice that we're leaving and cancel all ongoing
+                 * streams. */
+                next_request(ctx, 1); 
+                h2_proxy_session_cancel_all(ctx->session);
+                h2_proxy_session_process(ctx->session);
+                status = ctx->r_status = APR_SUCCESS;
                 break;
             }
             if (!ctx->next && h2_proxy_ihash_empty(ctx->session->streams)) {