]> granicus.if.org Git - apache/commitdiff
mod_http2: some more cleanup on stream/task/session takedowns
authorStefan Eissing <icing@apache.org>
Fri, 29 Apr 2016 15:21:21 +0000 (15:21 +0000)
committerStefan Eissing <icing@apache.org>
Fri, 29 Apr 2016 15:21:21 +0000 (15:21 +0000)
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1741648 13f79535-47bb-0310-9956-ffa450edef68

modules/http2/h2_bucket_beam.c
modules/http2/h2_conn_io.c
modules/http2/h2_mplx.c
modules/http2/h2_mplx.h
modules/http2/h2_session.c
modules/http2/h2_session.h
modules/http2/h2_stream.c
modules/http2/h2_task.h

index 2e9f4b1f39098162dc214102af0ecd78e0af8ad0..2e1bfec5537a825ff62cf9d763924a6e28e1b9ce 100644 (file)
@@ -535,6 +535,9 @@ apr_status_t h2_beam_shutdown(h2_bucket_beam *beam, apr_read_type_e block)
                 status = APR_EAGAIN;
                 break;
             }
+            if (beam->m_cond) {
+                apr_thread_cond_broadcast(beam->m_cond);
+            }
             status = wait_cond(beam, bl.mutex);
         }
         leave_yellow(beam, &bl);
@@ -716,6 +719,9 @@ apr_status_t h2_beam_receive(h2_bucket_beam *beam,
     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
 transfer:
         if (beam->aborted) {
+            if (!!APR_BRIGADE_EMPTY(beam->green)) {
+                apr_brigade_cleanup(beam->green);
+            }
             status = APR_ECONNABORTED;
             goto leave;
         }
index 2e0a5f2d690241a56e39d9453c6cd96a75077d6a..d21ae8b95943ef9e80566be1228adb112b601706 100644 (file)
@@ -131,7 +131,7 @@ apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c,
                              apr_pool_t *pool)
 {
     io->c             = c;
-    io->output        = apr_brigade_create(pool, c->bucket_alloc);
+    io->output        = apr_brigade_create(c->pool, c->bucket_alloc);
     io->buflen        = 0;
     io->is_tls        = h2_h2_is_tls(c);
     io->buffer_output = io->is_tls;
index 5494551bdf41e751be68eccfeb2fdd48da5d7d45..9298592b567dcf55e3c212b04b7aa1597c6558a5 100644 (file)
@@ -189,6 +189,25 @@ static void check_tx_free(h2_mplx *m)
     }
 }
 
+static int purge_stream(void *ctx, void *val) 
+{
+    h2_mplx *m = ctx;
+    h2_stream *stream = val;
+    h2_ihash_remove(m->spurge, stream->id);
+    h2_stream_destroy(stream);
+    return 0;
+}
+
+static void purge_streams(h2_mplx *m)
+{
+    if (!h2_ihash_empty(m->spurge)) {
+        while(!h2_ihash_iter(m->spurge, purge_stream, m)) {
+            /* repeat until empty */
+        }
+        h2_ihash_clear(m->spurge);
+    }
+}
+
 static void h2_mplx_destroy(h2_mplx *m)
 {
     AP_DEBUG_ASSERT(m);
@@ -257,6 +276,8 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
         m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
 
         m->streams = h2_ihash_create(m->pool, offsetof(h2_stream,id));
+        m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id));
+        m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id));
         m->q = h2_iq_create(m->pool, m->max_streams);
         m->tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id));
         m->ready_tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id));
@@ -294,10 +315,10 @@ apr_uint32_t h2_mplx_shutdown(h2_mplx *m)
     return max_stream_started;
 }
 
-static void input_consumed_signal(h2_mplx *m, h2_task *task)
+static void input_consumed_signal(h2_mplx *m, h2_stream *stream)
 {
-    if (task->input.beam && task->worker_started) {
-        h2_beam_send(task->input.beam, NULL, 0); /* trigger updates */
+    if (stream->input) {
+        h2_beam_send(stream->input, NULL, 0); /* trigger updates */
     }
 }
 
@@ -310,7 +331,7 @@ static int output_consumed_signal(h2_mplx *m, h2_task *task)
 }
 
 
-static void task_destroy(h2_mplx *m, h2_task *task, int events)
+static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master)
 {
     conn_rec *slave = NULL;
     int reuse_slave = 0;
@@ -323,18 +344,17 @@ static void task_destroy(h2_mplx *m, h2_task *task, int events)
                       "h2_task(%s): shutdown", task->id);
     }
     
-    if (events) {
+    if (called_from_master) {
         /* Process outstanding events before destruction */
-        input_consumed_signal(m, task);
+        h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
+        if (stream) {
+            input_consumed_signal(m, stream);
+        }
     }
     
     /* The pool is cleared/destroyed which also closes all
      * allocated file handles. Give this count back to our
      * file handle pool. */
-    if (task->input.beam) {
-        m->tx_handles_reserved += 
-        h2_beam_get_files_beamed(task->input.beam);
-    }
     if (task->output.beam) {
         m->tx_handles_reserved += 
         h2_beam_get_files_beamed(task->output.beam);
@@ -366,49 +386,69 @@ static void task_destroy(h2_mplx *m, h2_task *task, int events)
 
 static void stream_done(h2_mplx *m, h2_stream *stream, int rst_error) 
 {
-    h2_task *task;
+    h2_task *task = h2_ihash_get(m->tasks, stream->id);
     
+    /* Situation: we are, on the master connection, done with processing
+     * the stream. Either we have handled it successfully, or the stream
+     * was reset by the client or the connection is gone and we are 
+     * shutting down the whole session.
+     *
+     * We possibly have created a task for this stream to be processed
+     * on a slave connection. The processing might actually be ongoing
+     * right now or has already finished. A finished task waits for its
+     * stream to be done. This is the common case.
+     * 
+     * If the stream had input (e.g. the request had a body), a task
+     * may have read, or is still reading buckets from the input beam.
+     * This means that the task is referencing memory from the stream's
+     * pool (or the master connection bucket alloc). Before we can free
+     * the stream pool, we need to make sure that those references are
+     * gone. This is what h2_beam_shutdown() on the input waits for.
+     *
+     * With the input handled, we can tear down that beam and care
+     * about the output beam. The stream might still have buffered some
+     * buckets read from the output, so we need to get rid of those. That
+     * is done by h2_stream_cleanup().
+     *
+     * Now it is save to destroy the task (if it exists and is finished).
+     * 
+     * FIXME: we currently destroy the stream, even if the task is still
+     * ongoing. This is not ok, since task->request is coming from stream
+     * memory. We should either copy it on task creation or wait with the
+     * stream destruction until the task is done. 
+     */
     h2_ihash_remove(m->streams, stream->id);
     if (stream->input) {
-        apr_status_t status;
-        status = h2_beam_shutdown(stream->input, APR_NONBLOCK_READ);
-        if (status == APR_EAGAIN) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
-                          "h2_stream(%ld-%d): wait on input shutdown", 
-                          m->id, stream->id);
-            status = h2_beam_shutdown(stream->input, APR_BLOCK_READ);
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c, 
-                          "h2_stream(%ld-%d): input shutdown returned", 
-                          m->id, stream->id);
-        }
+        m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input);
     }
+    h2_stream_cleanup(stream);
 
-    task = h2_ihash_get(m->tasks, stream->id);
     if (task) {
         /* Remove task from ready set, we will never submit it */
         h2_ihash_remove(m->ready_tasks, stream->id);
+        task->input.beam = NULL;
         
-        if (task->worker_done) {
-            /* already finished or not even started yet */
-            h2_iq_remove(m->q, task->stream_id);
-            task_destroy(m, task, 0);
-        }
-        else {
+        if (!task->worker_done) {
             /* task still running, cleanup once it is done */
-            task->orphaned = 1;
-            task->input.beam = NULL; 
             if (rst_error) {
                 h2_task_rst(task, rst_error);
             }
+            /* FIXME: this should work, but does not
+            h2_ihash_add(m->shold, stream);
+            return;*/
+        }
+        else {
+            /* already finished */
+            h2_iq_remove(m->q, task->stream_id);
+            task_destroy(m, task, 0);
         }
     }
+    h2_stream_destroy(stream);
 }
 
 static int stream_done_iter(void *ctx, void *val)
 {
-    h2_stream *stream = val;
     stream_done((h2_mplx*)ctx, val, 0);
-    h2_stream_destroy(stream);
     return 0;
 }
 
@@ -416,6 +456,7 @@ static int task_print(void *ctx, void *val)
 {
     h2_mplx *m = ctx;
     h2_task *task = val;
+    h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
     if (task->request) {
         ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
                       "->03198: h2_stream(%s): %s %s %s -> %s %d"
@@ -424,7 +465,7 @@ static int task_print(void *ctx, void *val)
                       task->request->authority, task->request->path,
                       task->response? "http" : (task->rst_error? "reset" : "?"),
                       task->response? task->response->http_status : task->rst_error,
-                      task->orphaned, task->worker_started, 
+                      (stream? 0 : 1), task->worker_started, 
                       task->worker_done);
     }
     else if (task) {
@@ -493,6 +534,9 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
                 apr_thread_cond_broadcast(m->task_thawed);
             }
         }
+        AP_DEBUG_ASSERT(h2_ihash_empty(m->tasks));
+        AP_DEBUG_ASSERT(h2_ihash_empty(m->shold));
+        purge_streams(m);
         
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
                       "h2_mplx(%ld): release_join (%d tasks left) -> destroy", 
@@ -516,24 +560,17 @@ void h2_mplx_abort(h2_mplx *m)
     }
 }
 
-apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error)
+apr_status_t h2_mplx_stream_done(h2_mplx *m, h2_stream *stream)
 {
     apr_status_t status = APR_SUCCESS;
     int acquired;
     
-    /* This maybe called from inside callbacks that already hold the lock.
-     * E.g. when we are streaming out DATA and the EOF triggers the stream
-     * release.
-     */
     AP_DEBUG_ASSERT(m);
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_stream *stream = h2_ihash_get(m->streams, stream_id);
-        if (stream) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
-                          "h2_mplx(%ld-%d): marking stream as done.", 
-                          m->id, stream_id);
-            stream_done(m, stream, rst_error);
-        }
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
+                      "h2_mplx(%ld-%d): marking stream as done.", 
+                      m->id, stream->id);
+        stream_done(m, stream, stream->rst_error);
         leave_mutex(m, acquired);
     }
     return status;
@@ -547,8 +584,7 @@ void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx)
 
 static int update_window(void *ctx, void *val)
 {
-    h2_mplx *m = ctx;
-    input_consumed_signal(m, val);
+    input_consumed_signal(ctx, val);
     return 1;
 }
 
@@ -562,7 +598,7 @@ apr_status_t h2_mplx_in_update_windows(h2_mplx *m)
         return APR_ECONNABORTED;
     }
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_ihash_iter(m->tasks, update_window, m);
+        h2_ihash_iter(m->streams, update_window, m);
         
         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
                       "h2_session(%ld): windows updated", m->id);
@@ -580,7 +616,7 @@ static int task_iter_first(void *ctx, void *val)
     return 0;
 }
 
-h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams)
+h2_stream *h2_mplx_next_submit(h2_mplx *m)
 {
     apr_status_t status;
     h2_stream *stream = NULL;
@@ -597,7 +633,7 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams)
             h2_task *task = ctx.task;
             
             h2_ihash_remove(m->ready_tasks, task->stream_id);
-            stream = h2_ihash_get(streams, task->stream_id);
+            stream = h2_ihash_get(m->streams, task->stream_id);
             if (stream && task) {
                 task->submitted = 1;
                 if (task->rst_error) {
@@ -618,16 +654,14 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams)
                               "h2_mplx(%s): stream for response closed, "
                               "resetting io to close request processing",
                               task->id);
-                task->orphaned = 1;
                 h2_task_rst(task, H2_ERR_STREAM_CLOSED);
                 if (!task->worker_started || task->worker_done) {
                     task_destroy(m, task, 1);
                 }
                 else {
                     /* hang around until the h2_task is done, but
-                     * shutdown input/output and send out any events asap. */
+                     * shutdown output */
                     h2_task_shutdown(task, 0);
-                    input_consumed_signal(m, task);
                 }
             }
         }
@@ -640,8 +674,9 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response)
 {
     apr_status_t status = APR_SUCCESS;
     h2_task *task = h2_ihash_get(m->tasks, stream_id);
+    h2_stream *stream = h2_ihash_get(m->streams, stream_id);
     
-    if (!task || task->orphaned) {
+    if (!task || !stream) {
         return APR_ECONNABORTED;
     }
     
@@ -691,8 +726,9 @@ apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response)
 static apr_status_t out_close(h2_mplx *m, h2_task *task)
 {
     apr_status_t status = APR_SUCCESS;
+    h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
     
-    if (!task || task->orphaned) {
+    if (!task || !stream) {
         return APR_ECONNABORTED;
     }
     
@@ -885,93 +921,103 @@ h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more)
 
 static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
 {
-    if (task) {
-        if (task->frozen) {
-            /* this task was handed over to an engine for processing 
-             * and the original worker has finished. That means the 
-             * engine may start processing now. */
-            h2_task_thaw(task);
-            /* we do not want the task to block on writing response
-             * bodies into the mplx. */
-            /* FIXME: this implementation is incomplete. */
-            h2_task_set_io_blocking(task, 0);
-            apr_thread_cond_broadcast(m->task_thawed);
-            return;
-        }
-        else {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                          "h2_mplx(%ld): task(%s) done", m->id, task->id);
-            out_close(m, task);
-            
-            if (ngn) {
-                apr_off_t bytes = 0;
-                if (task->output.beam) {
-                    h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
-                    bytes += h2_beam_get_buffered(task->output.beam);
-                }
-                if (bytes > 0) {
-                    /* we need to report consumed and current buffered output
-                     * to the engine. The request will be streamed out or cancelled,
-                     * no more data is coming from it and the engine should update
-                     * its calculations before we destroy this information. */
-                    h2_req_engine_out_consumed(ngn, task->c, bytes);
-                }
+    if (task->frozen) {
+        /* this task was handed over to an engine for processing 
+         * and the original worker has finished. That means the 
+         * engine may start processing now. */
+        h2_task_thaw(task);
+        /* we do not want the task to block on writing response
+         * bodies into the mplx. */
+        /* FIXME: this implementation is incomplete. */
+        h2_task_set_io_blocking(task, 0);
+        apr_thread_cond_broadcast(m->task_thawed);
+        return;
+    }
+    else {
+        h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                      "h2_mplx(%ld): task(%s) done", m->id, task->id);
+        out_close(m, task);
+        
+        if (ngn) {
+            apr_off_t bytes = 0;
+            if (task->output.beam) {
+                h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
+                bytes += h2_beam_get_buffered(task->output.beam);
             }
-            
-            if (task->engine) {
-                if (!h2_req_engine_is_shutdown(task->engine)) {
-                    ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
-                                  "h2_mplx(%ld): task(%s) has not-shutdown "
-                                  "engine(%s)", m->id, task->id, 
-                                  h2_req_engine_get_id(task->engine));
-                }
-                h2_ngn_shed_done_ngn(m->ngn_shed, task->engine);
+            if (bytes > 0) {
+                /* we need to report consumed and current buffered output
+                 * to the engine. The request will be streamed out or cancelled,
+                 * no more data is coming from it and the engine should update
+                 * its calculations before we destroy this information. */
+                h2_req_engine_out_consumed(ngn, task->c, bytes);
             }
-            
-            if (!m->aborted && !task->orphaned && m->redo_tasks
-                && h2_ihash_get(m->redo_tasks, task->stream_id)) {
-                /* reset and schedule again */
-                h2_task_redo(task);
-                h2_ihash_remove(m->redo_tasks, task->stream_id);
-                h2_iq_add(m->q, task->stream_id, NULL, NULL);
-                return;
+        }
+        
+        if (task->engine) {
+            if (!h2_req_engine_is_shutdown(task->engine)) {
+                ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+                              "h2_mplx(%ld): task(%s) has not-shutdown "
+                              "engine(%s)", m->id, task->id, 
+                              h2_req_engine_get_id(task->engine));
             }
-            
-            task->worker_done = 1;
-            task->done_at = apr_time_now();
-            if (task->output.beam) {
-                h2_beam_on_consumed(task->output.beam, NULL, NULL);
-                h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL);
+            h2_ngn_shed_done_ngn(m->ngn_shed, task->engine);
+        }
+        
+        if (!m->aborted && stream && m->redo_tasks
+            && h2_ihash_get(m->redo_tasks, task->stream_id)) {
+            /* reset and schedule again */
+            h2_task_redo(task);
+            h2_ihash_remove(m->redo_tasks, task->stream_id);
+            h2_iq_add(m->q, task->stream_id, NULL, NULL);
+            return;
+        }
+        
+        task->worker_done = 1;
+        task->done_at = apr_time_now();
+        if (task->output.beam) {
+            h2_beam_on_consumed(task->output.beam, NULL, NULL);
+            h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL);
+        }
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                      "h2_mplx(%s): request done, %f ms"
+                      " elapsed", task->id, 
+                      (task->done_at - task->started_at) / 1000.0);
+        if (task->started_at > m->last_idle_block) {
+            /* this task finished without causing an 'idle block', e.g.
+             * a block by flow control.
+             */
+            if (task->done_at- m->last_limit_change >= m->limit_change_interval
+                && m->workers_limit < m->workers_max) {
+                /* Well behaving stream, allow it more workers */
+                m->workers_limit = H2MIN(m->workers_limit * 2, 
+                                         m->workers_max);
+                m->last_limit_change = task->done_at;
+                m->need_registration = 1;
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                              "h2_mplx(%ld): increase worker limit to %d",
+                              m->id, m->workers_limit);
             }
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                          "h2_mplx(%s): request done, %f ms"
-                          " elapsed", task->id, 
-                          (task->done_at - task->started_at) / 1000.0);
-            if (task->started_at > m->last_idle_block) {
-                /* this task finished without causing an 'idle block', e.g.
-                 * a block by flow control.
-                 */
-                if (task->done_at- m->last_limit_change >= m->limit_change_interval
-                    && m->workers_limit < m->workers_max) {
-                    /* Well behaving stream, allow it more workers */
-                    m->workers_limit = H2MIN(m->workers_limit * 2, 
-                                             m->workers_max);
-                    m->last_limit_change = task->done_at;
-                    m->need_registration = 1;
-                    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                                  "h2_mplx(%ld): increase worker limit to %d",
-                                  m->id, m->workers_limit);
-                }
+        }
+        
+        if (stream) {
+            /* hang around until the stream deregisters */
+        }
+        else {
+            stream = h2_ihash_get(m->shold, task->stream_id);
+            task_destroy(m, task, 0);
+            if (stream) {
+                stream->response = NULL; /* ref from task memory */
+                /* We cannot destroy the stream here since this is 
+                 * called from a worker thread and freeing memory pools
+                 * is only safe in the only thread using it (and its
+                 * parent pool / allocator) */
+                h2_ihash_remove(m->shold, stream->id);
+                h2_ihash_add(m->spurge, stream);
             }
             
-            if (task->orphaned) {
-                task_destroy(m, task, 0);
-                if (m->join_wait) {
-                    apr_thread_cond_signal(m->join_wait);
-                }
-            }
-            else {
-                /* hang around until the stream deregisters */
+            if (m->join_wait) {
+                apr_thread_cond_signal(m->join_wait);
             }
         }
     }
@@ -1177,11 +1223,13 @@ apr_status_t h2_mplx_req_engine_push(const char *ngn_type,
     task->r = r;
     
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        if (task->orphaned) {
-            status = APR_ECONNABORTED;
+        h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
+        
+        if (stream) {
+            status = h2_ngn_shed_push_task(m->ngn_shed, ngn_type, task, einit);
         }
         else {
-            status = h2_ngn_shed_push_task(m->ngn_shed, ngn_type, task, einit);
+            status = APR_ECONNABORTED;
         }
         leave_mutex(m, acquired);
     }
index a6fe12a3efce6fd4b5d0731a77dbc709a1d763bc..9b316b0b3f84516f9ecb25cb5b86d130ce2cf091 100644 (file)
@@ -73,6 +73,8 @@ struct h2_mplx {
     unsigned int need_registration : 1;
 
     struct h2_ihash_t *streams;     /* all streams currently processing */
+    struct h2_ihash_t *shold;       /* all streams done with task ongoing */
+    struct h2_ihash_t *spurge;      /* all streams done, ready for destroy */
     struct h2_iqueue *q;            /* all stream ids that need to be started */
     
     struct h2_ihash_t *tasks;       /* all tasks started and not destroyed */
@@ -167,7 +169,7 @@ apr_uint32_t h2_mplx_shutdown(h2_mplx *m);
  * @param rst_error if != 0, the stream was reset with the error given
  *
  */
-apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error);
+apr_status_t h2_mplx_stream_done(h2_mplx *m, struct h2_stream *stream);
 
 /**
  * Waits on output data from any stream in this session to become available. 
@@ -235,8 +237,7 @@ apr_status_t h2_mplx_in_update_windows(h2_mplx *m);
  * @param m the mplxer to get a response from
  * @param bb the brigade to place any existing repsonse body data into
  */
-struct h2_stream *h2_mplx_next_submit(h2_mplx *m, 
-                                      struct h2_ihash_t *streams);
+struct h2_stream *h2_mplx_next_submit(h2_mplx *m);
 
 /**
  * Opens the output for the given stream with the specified response.
index 1e023133d504dc90f270fc367eac1bdcf5370841..0f8accab928d6d8d5e9e4d8bfbeb06aa5829e109 100644 (file)
@@ -128,19 +128,16 @@ h2_stream *h2_session_open_stream(h2_session *session, int stream_id,
     h2_stream * stream;
     apr_pool_t *stream_pool;
     
-    if (session->spare) {
-        stream_pool = session->spare;
-        session->spare = NULL;
-    }
-    else {
-        apr_pool_create(&stream_pool, session->pool);
-        apr_pool_tag(stream_pool, "h2_stream");
-    }
+    apr_pool_create(&stream_pool, session->pool);
+    apr_pool_tag(stream_pool, "h2_stream");
     
     stream = h2_stream_open(stream_id, stream_pool, session, 
                             initiated_on, req);
-    
+    ++session->open_streams;
+    ++session->unanswered_streams;
+    nghttp2_session_set_stream_user_data(session->ngh2, stream_id, stream);
     h2_ihash_add(session->streams, stream);
+    
     if (H2_STREAM_CLIENT_INITIATED(stream_id)) {
         if (stream_id > session->remote.emitted_max) {
             ++session->remote.emitted_count;
@@ -262,6 +259,11 @@ static int on_invalid_frame_recv_cb(nghttp2_session *ngh2,
     return 0;
 }
 
+static h2_stream *get_stream(h2_session *session, int stream_id)
+{
+    return nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
+}
+
 static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags,
                                  int32_t stream_id,
                                  const uint8_t *data, size_t len, void *userp)
@@ -277,7 +279,7 @@ static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags,
         return 0;
     }
     
-    stream = h2_session_get_stream(session, stream_id);
+    stream = get_stream(session, stream_id);
     if (!stream) {
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03064)
                       "h2_stream(%ld-%d): on_data_chunk for unknown stream",
@@ -342,7 +344,7 @@ static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id,
     h2_stream *stream;
     
     (void)ngh2;
-    stream = h2_session_get_stream(session, stream_id);
+    stream = get_stream(session, stream_id);
     if (stream) {
         stream_release(session, stream, error_code);
     }
@@ -358,7 +360,7 @@ static int on_begin_headers_cb(nghttp2_session *ngh2,
     /* We may see HEADERs at the start of a stream or after all DATA
      * streams to carry trailers. */
     (void)ngh2;
-    s = h2_session_get_stream(session, frame->hd.stream_id);
+    s = get_stream(session, frame->hd.stream_id);
     if (s) {
         /* nop */
     }
@@ -385,7 +387,7 @@ static int on_header_cb(nghttp2_session *ngh2, const nghttp2_frame *frame,
         return 0;
     }
     
-    stream = h2_session_get_stream(session, frame->hd.stream_id);
+    stream = get_stream(session, frame->hd.stream_id);
     if (!stream) {
         ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c,
                       APLOGNO(02920) 
@@ -432,7 +434,7 @@ static int on_frame_recv_cb(nghttp2_session *ng2s,
             /* This can be HEADERS for a new stream, defining the request,
              * or HEADER may come after DATA at the end of a stream as in
              * trailers */
-            stream = h2_session_get_stream(session, frame->hd.stream_id);
+            stream = get_stream(session, frame->hd.stream_id);
             if (stream) {
                 int eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM);
                 
@@ -456,7 +458,7 @@ static int on_frame_recv_cb(nghttp2_session *ng2s,
             }
             break;
         case NGHTTP2_DATA:
-            stream = h2_session_get_stream(session, frame->hd.stream_id);
+            stream = get_stream(session, frame->hd.stream_id);
             if (stream) {
                 int eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM);
                 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
@@ -493,7 +495,7 @@ static int on_frame_recv_cb(nghttp2_session *ng2s,
                           "h2_session(%ld-%d): RST_STREAM by client, errror=%d",
                           session->id, (int)frame->hd.stream_id,
                           (int)frame->rst_stream.error_code);
-            stream = h2_session_get_stream(session, frame->hd.stream_id);
+            stream = get_stream(session, frame->hd.stream_id);
             if (stream && stream->request && stream->request->initiated_on) {
                 ++session->pushes_reset;
             }
@@ -567,7 +569,7 @@ static int on_send_data_cb(nghttp2_session *ngh2,
     }
     padlen = (unsigned char)frame->data.padlen;
     
-    stream = h2_session_get_stream(session, stream_id);
+    stream = get_stream(session, stream_id);
     if (!stream) {
         ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_NOTFOUND, session->c,
                       APLOGNO(02924) 
@@ -699,10 +701,6 @@ static void h2_session_cleanup(h2_session *session)
         nghttp2_session_del(session->ngh2);
         session->ngh2 = NULL;
     }
-    if (session->spare) {
-        apr_pool_destroy(session->spare);
-        session->spare = NULL;
-    }
 }
 
 static void h2_session_destroy(h2_session *session)
@@ -710,8 +708,12 @@ static void h2_session_destroy(h2_session *session)
     AP_DEBUG_ASSERT(session);
     
     h2_session_cleanup(session);
+    AP_DEBUG_ASSERT(session->open_streams == h2_ihash_count(session->streams));
     h2_ihash_clear(session->streams);
+    session->open_streams = 0;
     
+    ap_remove_input_filter_byhandle((session->r? session->r->input_filters :
+                                     session->c->input_filters), "H2_IN");
     if (APLOGctrace1(session->c)) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
                       "h2_session(%ld): destroy", session->id);
@@ -1138,10 +1140,8 @@ static int resume_on_data(void *ctx, void *val)
 static int h2_session_resume_streams_with_data(h2_session *session)
 {
     AP_DEBUG_ASSERT(session);
-    if (!h2_ihash_empty(session->streams)
-        && session->mplx && !session->mplx->aborted) {
+    if (session->open_streams && !session->mplx->aborted) {
         resume_ctx ctx;
-        
         ctx.session      = session;
         ctx.resume_count = 0;
 
@@ -1153,11 +1153,6 @@ static int h2_session_resume_streams_with_data(h2_session *session)
     return 0;
 }
 
-h2_stream *h2_session_get_stream(h2_session *session, int stream_id)
-{
-    return h2_ihash_get(session->streams, stream_id);
-}
-
 static ssize_t stream_data_cb(nghttp2_session *ng2s,
                               int32_t stream_id,
                               uint8_t *buf,
@@ -1183,7 +1178,7 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
     (void)ng2s;
     (void)buf;
     (void)source;
-    stream = h2_session_get_stream(session, stream_id);
+    stream = get_stream(session, stream_id);
     if (!stream) {
         ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c,
                       APLOGNO(02937) 
@@ -1334,7 +1329,7 @@ static apr_status_t submit_response(h2_session *session, h2_stream *stream)
                                        stream->id, err);
     }
     
-    stream->submitted = 1;
+    --session->unanswered_streams;
     if (stream->request && stream->request->initiated_on) {
         ++session->pushes_submitted;
     }
@@ -1384,7 +1379,6 @@ struct h2_stream *h2_session_push(h2_session *session, h2_stream *is,
             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
                           "h2_stream(%ld-%d): scheduling push stream",
                           session->id, stream->id);
-            h2_stream_cleanup(stream);
             stream = NULL;
         }
         ++session->unsent_promises;
@@ -1509,29 +1503,14 @@ apr_status_t h2_session_set_prio(h2_session *session, h2_stream *stream,
 
 apr_status_t h2_session_stream_done(h2_session *session, h2_stream *stream)
 {
-    apr_pool_t *pool = h2_stream_detach_pool(stream);
-    int stream_id = stream->id;
-    int rst_error = stream->rst_error;
-    
     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
                   "h2_stream(%ld-%d): cleanup by EOS bucket destroy", 
-                  session->id, stream_id);
-    if (session->streams) {
-        h2_ihash_remove(session->streams, stream_id);
-    }
+                  session->id, stream->id);
+    h2_ihash_remove(session->streams, stream->id);
+    --session->open_streams;
+    --session->unanswered_streams;
+    h2_mplx_stream_done(session->mplx, stream);
     
-    h2_stream_cleanup(stream);
-    h2_mplx_stream_done(session->mplx, stream_id, rst_error);
-    h2_stream_destroy(stream);
-    
-    if (pool) {
-        apr_pool_clear(pool);
-        if (session->spare) {
-            apr_pool_destroy(session->spare);
-        }
-        session->spare = pool;
-    }
-
     return APR_SUCCESS;
 }
 
@@ -1708,7 +1687,7 @@ static apr_status_t h2_session_submit(h2_session *session)
     
     if (has_unsubmitted_streams(session)) {
         /* If we have responses ready, submit them now. */
-        while ((stream = h2_mplx_next_submit(session->mplx, session->streams))) {
+        while ((stream = h2_mplx_next_submit(session->mplx))) {
             status = submit_response(session, stream);
             ++session->unsent_submits;
             
@@ -1770,7 +1749,7 @@ static void update_child_status(h2_session *session, int status, const char *msg
         apr_snprintf(session->status, sizeof(session->status),
                      "%s, streams: %d/%d/%d/%d/%d (open/recv/resp/push/rst)", 
                      msg? msg : "-",
-                     (int)h2_ihash_count(session->streams)
+                     (int)session->open_streams
                      (int)session->remote.emitted_count,
                      (int)session->responses_submitted,
                      (int)session->pushes_submitted,
@@ -1788,7 +1767,7 @@ static void transit(h2_session *session, const char *action, h2_session_state ns
         session->state = nstate;
         switch (session->state) {
             case H2_SESSION_ST_IDLE:
-                update_child_status(session, (h2_ihash_empty(session->streams)
+                update_child_status(session, (session->open_streams == 0
                                               SERVER_BUSY_KEEPALIVE
                                               : SERVER_BUSY_READ), "idle");
                 break;
@@ -1920,7 +1899,7 @@ static void h2_session_ev_no_io(h2_session *session, int arg, const char *msg)
             if (h2_conn_io_flush(&session->io) != APR_SUCCESS) {
                 dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
             }
-            if (h2_ihash_empty(session->streams)) {
+            if (!session->open_streams) {
                 if (!is_accepting_streams(session)) {
                     /* We are no longer accepting new streams and have
                      * finished processing existing ones. Time to leave. */
@@ -2125,9 +2104,8 @@ apr_status_t h2_session_process(h2_session *session, int async)
                 break;
                 
             case H2_SESSION_ST_IDLE:
-                /* make certain, the client receives everything before we idle */
-                if (!session->keep_sync_until 
-                    && async && h2_ihash_empty(session->streams)
+                /* make certain, we send everything before we idle */
+                if (!session->keep_sync_until && async && !session->open_streams
                     && !session->r && session->remote.emitted_count) {
                     ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
                                   "h2_session(%ld): async idle, nonblock read", session->id);
@@ -2225,8 +2203,8 @@ apr_status_t h2_session_process(h2_session *session, int async)
                     }
                 }
                 
-                if (!h2_ihash_empty(session->streams)) {
-                    /* resume any streams for which data is available again */
+                if (session->open_streams) {
+                    /* resume any streams with output data */
                     h2_session_resume_streams_with_data(session);
                     /* Submit any responses/push_promises that are ready */
                     status = h2_session_submit(session);
index bf4ded338a807c619b26d91b4531a7abba5b4e94..32202dc3030a76584dcd90f8ec6c068a41a3022d 100644 (file)
@@ -100,6 +100,8 @@ typedef struct h2_session {
     
     struct h2_push_diary *push_diary; /* remember pushes, avoid duplicates */
     
+    int open_streams;               /* number of streams open */
+    int unanswered_streams;         /* number of streams waiting for response */
     int unsent_submits;             /* number of submitted, but not yet written responses. */
     int unsent_promises;            /* number of submitted, but not yet written push promised */
                                          
@@ -122,8 +124,6 @@ typedef struct h2_session {
     apr_bucket_brigade *bbtmp;      /* brigade for keeping temporary data */
     struct apr_thread_cond_t *iowait; /* our cond when trywaiting for data */
     
-    apr_pool_t *spare;              /* spare stream pool */
-    
     char status[64];                /* status message for scoreboard */
     int last_status_code;           /* the one already reported */
     const char *last_status_msg;    /* the one already reported */
@@ -190,9 +190,6 @@ void h2_session_close(h2_session *session);
 apr_status_t h2_session_handle_response(h2_session *session,
                                         struct h2_stream *stream);
 
-/* Get the h2_stream for the given stream idenrtifier. */
-struct h2_stream *h2_session_get_stream(h2_session *session, int stream_id);
-
 /**
  * Create and register a new stream under the given id.
  * 
index 8853e6cad443041d359c05e19096de74838cb5f5..20d1d350425b4ec2a4156dc4e1805d06e1e9e287 100644 (file)
@@ -182,19 +182,31 @@ h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session,
 void h2_stream_cleanup(h2_stream *stream)
 {
     AP_DEBUG_ASSERT(stream);
-    if (stream->input) {
-        h2_beam_destroy(stream->input);
-        stream->input = NULL;
-    }
     if (stream->buffer) {
         apr_brigade_cleanup(stream->buffer);
     }
+    if (stream->input) {
+        apr_status_t status;
+        status = h2_beam_shutdown(stream->input, APR_NONBLOCK_READ);
+        if (status == APR_EAGAIN) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, 
+                          "h2_stream(%ld-%d): wait on input shutdown", 
+                          stream->session->id, stream->id);
+            status = h2_beam_shutdown(stream->input, APR_BLOCK_READ);
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c, 
+                          "h2_stream(%ld-%d): input shutdown returned", 
+                          stream->session->id, stream->id);
+        }
+    }
 }
 
 void h2_stream_destroy(h2_stream *stream)
 {
     AP_DEBUG_ASSERT(stream);
-    h2_stream_cleanup(stream);
+    if (stream->input) {
+        h2_beam_destroy(stream->input);
+        stream->input = NULL;
+    }
     if (stream->pool) {
         apr_pool_destroy(stream->pool);
     }
index 58b64b0a1c6f2cfddc77e29f1bcfc4c4aed0ba33..454bc376fe1bea7ccd9cf81dcdcecd5f384eb5a4 100644 (file)
@@ -83,7 +83,6 @@ struct h2_task {
     unsigned int frozen         : 1;
     unsigned int blocking       : 1;
     unsigned int detached       : 1;
-    unsigned int orphaned       : 1; /* h2_stream is gone for this task */    
     unsigned int submitted      : 1; /* response has been submitted to client */
     unsigned int worker_started : 1; /* h2_worker started processing for this io */
     unsigned int worker_done    : 1; /* h2_worker finished for this io */