]> granicus.if.org Git - apache/commitdiff
*) mod_http2: internal code cleanups and simplifications. Common output code for
authorStefan Eissing <icing@apache.org>
Wed, 22 May 2019 13:41:36 +0000 (13:41 +0000)
committerStefan Eissing <icing@apache.org>
Wed, 22 May 2019 13:41:36 +0000 (13:41 +0000)
     h2 and h2c protocols, using nested mutex locks for simplified calls. [Stefan Eissing]

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1859724 13f79535-47bb-0310-9956-ffa450edef68

24 files changed:
CHANGES
build/apr_common.m4
modules/http2/h2.h
modules/http2/h2_bucket_beam.c
modules/http2/h2_bucket_beam.h
modules/http2/h2_bucket_eos.c
modules/http2/h2_bucket_eos.h
modules/http2/h2_conn.c
modules/http2/h2_conn_io.c
modules/http2/h2_conn_io.h
modules/http2/h2_filter.h
modules/http2/h2_h2.c
modules/http2/h2_mplx.c
modules/http2/h2_mplx.h
modules/http2/h2_push.c
modules/http2/h2_session.c
modules/http2/h2_session.h
modules/http2/h2_stream.c
modules/http2/h2_stream.h
modules/http2/h2_task.c
modules/http2/h2_task.h
modules/http2/h2_version.h
modules/http2/h2_workers.c
modules/http2/mod_proxy_http2.c

diff --git a/CHANGES b/CHANGES
index 5ead401111b2bcf9e7cc31d1944e132170eef034..bb8b6f3fd169d4667c040be5eeac9e1579b43418 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -1,6 +1,9 @@
                                                          -*- coding: utf-8 -*-
 Changes with Apache 2.5.1
 
+  *) mod_http2: internal code cleanups and simplifications. Common output code for
+     h2 and h2c protocols, using nested mutex locks for simplified calls. [Stefan Eissing]
+
   *) mod_proxy/ssl: Proxy SSL client certificate configuration and other proxy
      SSL configurations broken inside <Proxy> context.  PR 63430.
      [Ruediger Pluem, Yann Ylavic]
index f4e2dfd0a7cbc406fe327585d1461d6bd79e72b6..6b5c0f033b9e2aca5343a44ef26086f4e4cf1a66 100644 (file)
@@ -511,9 +511,9 @@ AC_DEFUN([APR_TRY_COMPILE_NO_WARNING],
    [int main(int argc, const char *const *argv) {]
    [[$2]]
    [   return 0; }]
-  )], [CFLAGS=$apr_save_CFLAGS
-$3],  [CFLAGS=$apr_save_CFLAGS
-$4])
+  )],
+  [$3], [$4])
+ CFLAGS=$apr_save_CFLAGS
 ])
 
 dnl
@@ -974,45 +974,12 @@ fi
 AC_SUBST(MKDEP)
 ])
 
-dnl
-dnl APR_CHECK_TYPES_FMT_COMPATIBLE(TYPE-1, TYPE-2, FMT-TAG, 
-dnl                                [ACTION-IF-TRUE], [ACTION-IF-FALSE])
-dnl
-dnl Try to determine whether two types are the same and accept the given
-dnl printf formatter (bare token, e.g. literal d, ld, etc).
-dnl
-AC_DEFUN([APR_CHECK_TYPES_FMT_COMPATIBLE], [
-define([apr_cvname], apr_cv_typematch_[]translit([$1], [ ], [_])_[]translit([$2], [ ], [_])_[][$3])
-AC_CACHE_CHECK([whether $1 and $2 use fmt %$3], apr_cvname, [
-APR_TRY_COMPILE_NO_WARNING([#include <sys/types.h>
-#include <stdio.h>
-#ifdef HAVE_STDINT_H
-#include <stdint.h>
-#endif
-], [
-    $1 chk1, *ptr1;
-    $2 chk2, *ptr2 = &chk1;
-    ptr1 = &chk2;
-    *ptr1 = *ptr2 = 0;
-    printf("%$3 %$3", chk1, chk2);
-], [apr_cvname=yes], [apr_cvname=no])])
-if test "$apr_cvname" = "yes"; then
-    :
-    $4
-else
-    :
-    $5
-fi
-])
-
 dnl
 dnl APR_CHECK_TYPES_COMPATIBLE(TYPE-1, TYPE-2, [ACTION-IF-TRUE])
 dnl
 dnl Try to determine whether two types are the same. Only works
 dnl for gcc and icc.
 dnl
-dnl @deprecated @see APR_CHECK_TYPES_FMT_COMPATIBLE
-dnl
 AC_DEFUN([APR_CHECK_TYPES_COMPATIBLE], [
 define([apr_cvname], apr_cv_typematch_[]translit([$1], [ ], [_])_[]translit([$2], [ ], [_]))
 AC_CACHE_CHECK([whether $1 and $2 are the same], apr_cvname, [
index e057d66e0cd6e6236f1788784f99d8bb69fbc707..798f4b5b7f1f16179c65817472f754ca13f0bfd8 100644 (file)
@@ -112,6 +112,7 @@ typedef enum h2_stream_state_t {
     H2_SS_CLOSED_L,
     H2_SS_CLOSED,
     H2_SS_CLEANUP,
+    H2_SS_DESTROYED,
     H2_SS_MAX
 } h2_stream_state_t;
 
@@ -123,6 +124,17 @@ typedef enum {
     H2_SEV_IN_DATA_PENDING,
 } h2_stream_event_t;
 
+typedef enum {
+    H2_PS_NONE,
+    H2_PS_QUEUED,
+    H2_PS_RUNNING,
+    H2_PS_FINISHED,
+} h2_processing_state_t;
+
+#define H2_PS_IS_RUNNING(s)      ((s) == H2_PS_RUNNING)
+#define H2_PS_IS_NOT_RUNNING(s)  ((s) != H2_PS_RUNNING)
+#define H2_PS_IS_WAS_STARTED(s)  ((s) >= H2_PS_RUNNING)
+#define H2_PS_IS_HAS_FINISHED(s) ((s) == H2_PS_FINISHED)
 
 /* h2_request is the transformer of HTTP2 streams into HTTP/1.1 internal
  * format that will be fed to various httpd input filters to finally
index a7f5edf5cc5534debacd4572d779ce86b7572386..cf97797b9c6ea9e865f297ac7c52bb4dd9eae43a 100644 (file)
@@ -24,6 +24,7 @@
 
 #include <httpd.h>
 #include <http_protocol.h>
+#include <http_request.h>
 #include <http_log.h>
 
 #include "h2_private.h"
@@ -154,6 +155,30 @@ const apr_bucket_type_t h2_bucket_type_beam = {
  * h2_blist, a brigade without allocations
  ******************************************************************************/
 
+static void h2_blist_cleanup(h2_blist *bl)
+{
+    apr_bucket *e;
+
+    while (!H2_BLIST_EMPTY(bl)) {
+        e = H2_BLIST_FIRST(bl);
+        apr_bucket_delete(e);
+    }
+}
+
+static void brigade_move_to_blist(apr_bucket_brigade *bb, h2_blist *list)
+{
+    apr_bucket *b;
+    while (bb && !APR_BRIGADE_EMPTY(bb)) {
+        b = APR_BRIGADE_FIRST(bb);
+        APR_BUCKET_REMOVE(b);
+        H2_BLIST_INSERT_TAIL(list, b);
+    }
+}
+
+/*******************************************************************************
+ * bucket beamer registration
+ ******************************************************************************/
+
 static apr_array_header_t *beamers;
 
 static apr_status_t cleanup_beamers(void *dummy)
@@ -290,17 +315,6 @@ static apr_size_t calc_buffered(h2_bucket_beam *beam)
     return len;
 }
 
-static void r_purge_sent(h2_bucket_beam *beam)
-{
-    apr_bucket *b;
-    /* delete all sender buckets in purge brigade, needs to be called
-     * from sender thread only */
-    while (!H2_BLIST_EMPTY(&beam->purge_list)) {
-        b = H2_BLIST_FIRST(&beam->purge_list);
-        apr_bucket_delete(b);
-    }
-}
-
 static apr_size_t calc_space_left(h2_bucket_beam *beam)
 {
     if (beam->max_buf_size > 0) {
@@ -435,7 +449,7 @@ static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy)
             }
             else {
                 /* it should be there unless we screwed up */
-                ap_log_perror(APLOG_MARK, APLOG_WARNING, 0, beam->send_pool, 
+                ap_log_perror(APLOG_MARK, APLOG_WARNING, 0, beam->pool, 
                               APLOGNO(03384) "h2_beam(%d-%s): emitted bucket not "
                               "in hold, n=%d", beam->id, beam->tag, 
                               (int)proxy->n);
@@ -444,7 +458,7 @@ static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy)
         }
         /* notify anyone waiting on space to become available */
         if (!bl.mutex) {
-            r_purge_sent(beam);
+            h2_blist_cleanup(&beam->purge_list);
         }
         else {
             apr_thread_cond_broadcast(beam->change);
@@ -453,16 +467,6 @@ static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy)
     }
 }
 
-static void h2_blist_cleanup(h2_blist *bl)
-{
-    apr_bucket *e;
-
-    while (!H2_BLIST_EMPTY(bl)) {
-        e = H2_BLIST_FIRST(bl);
-        apr_bucket_delete(e);
-    }
-}
-
 static apr_status_t beam_close(h2_bucket_beam *beam)
 {
     if (!beam->closed) {
@@ -477,40 +481,10 @@ int h2_beam_is_closed(h2_bucket_beam *beam)
     return beam->closed;
 }
 
-static int pool_register(h2_bucket_beam *beam, apr_pool_t *pool, 
-                         apr_status_t (*cleanup)(void *))
-{
-    if (pool && pool != beam->pool) {
-        apr_pool_pre_cleanup_register(pool, beam, cleanup);
-        return 1;
-    }
-    return 0;
-}
-
-static int pool_kill(h2_bucket_beam *beam, apr_pool_t *pool,
-                     apr_status_t (*cleanup)(void *)) {
-    if (pool && pool != beam->pool) {
-        apr_pool_cleanup_kill(pool, beam, cleanup);
-        return 1;
-    }
-    return 0;
-}
-
-static apr_status_t beam_recv_cleanup(void *data)
-{
-    h2_bucket_beam *beam = data;
-    /* receiver pool has gone away, clear references */
-    beam->recv_buffer = NULL;
-    beam->recv_pool = NULL;
-    return APR_SUCCESS;
-}
-
 static apr_status_t beam_send_cleanup(void *data)
 {
     h2_bucket_beam *beam = data;
     /* sender is going away, clear up all references to its memory */
-    r_purge_sent(beam);
-    h2_blist_cleanup(&beam->send_list);
     report_consumption(beam, NULL);
     while (!H2_BPROXY_LIST_EMPTY(&beam->proxies)) {
         h2_beam_proxy *proxy = H2_BPROXY_LIST_FIRST(&beam->proxies);
@@ -520,22 +494,10 @@ static apr_status_t beam_send_cleanup(void *data)
     }
     h2_blist_cleanup(&beam->purge_list);
     h2_blist_cleanup(&beam->hold_list);
-    beam->send_pool = NULL;
+    h2_blist_cleanup(&beam->send_list);
     return APR_SUCCESS;
 }
 
-static void beam_set_send_pool(h2_bucket_beam *beam, apr_pool_t *pool) 
-{
-    if (beam->send_pool != pool) {
-        if (beam->send_pool && beam->send_pool != beam->pool) {
-            pool_kill(beam, beam->send_pool, beam_send_cleanup);
-            beam_send_cleanup(beam);
-        }
-        beam->send_pool = pool;
-        pool_register(beam, beam->send_pool, beam_send_cleanup);
-    }
-}
-
 static void recv_buffer_cleanup(h2_bucket_beam *beam, h2_beam_lock *bl)
 {
     if (beam->recv_buffer && !APR_BRIGADE_EMPTY(beam->recv_buffer)) {
@@ -559,74 +521,18 @@ static void recv_buffer_cleanup(h2_bucket_beam *beam, h2_beam_lock *bl)
     }
 }
 
-static apr_status_t beam_cleanup(h2_bucket_beam *beam, int from_pool)
-{
-    apr_status_t status = APR_SUCCESS;
-    int safe_send = (beam->owner == H2_BEAM_OWNER_SEND);
-    int safe_recv = (beam->owner == H2_BEAM_OWNER_RECV);
-    
-    /* 
-     * Owner of the beam is going away, depending on which side it owns,
-     * cleanup strategies will differ.
-     *
-     * In general, receiver holds references to memory from sender. 
-     * Clean up receiver first, if safe, then cleanup sender, if safe.
-     */
-     
-     /* When called from pool destroy, io callbacks are disabled */
-     if (from_pool) {
-         beam->cons_io_cb = NULL;
-     }
-     
-    /* When modify send is not safe, this means we still have multi-thread
-     * protection and the owner is receiving the buckets. If the sending
-     * side has not gone away, this means we could have dangling buckets
-     * in our lists that never get destroyed. This should not happen. */
-    ap_assert(safe_send || !beam->send_pool);
-    if (!H2_BLIST_EMPTY(&beam->send_list)) {
-        ap_assert(beam->send_pool);
-    }
-    
-    if (safe_recv) {
-        if (beam->recv_pool) {
-            pool_kill(beam, beam->recv_pool, beam_recv_cleanup);
-            beam->recv_pool = NULL;
-        }
-        recv_buffer_cleanup(beam, NULL);
-    }
-    else {
-        beam->recv_buffer = NULL;
-        beam->recv_pool = NULL;
-    }
-    
-    if (safe_send && beam->send_pool) {
-        pool_kill(beam, beam->send_pool, beam_send_cleanup);
-        status = beam_send_cleanup(beam);
-    }
-    
-    if (safe_recv) {
-        ap_assert(H2_BPROXY_LIST_EMPTY(&beam->proxies));
-        ap_assert(H2_BLIST_EMPTY(&beam->send_list));
-        ap_assert(H2_BLIST_EMPTY(&beam->hold_list));
-        ap_assert(H2_BLIST_EMPTY(&beam->purge_list));
-    }
-    return status;
-}
-
-static apr_status_t beam_pool_cleanup(void *data)
-{
-    return beam_cleanup(data, 1);
-}
-
 apr_status_t h2_beam_destroy(h2_bucket_beam *beam)
 {
-    apr_pool_cleanup_kill(beam->pool, beam, beam_pool_cleanup);
-    return beam_cleanup(beam, 0);
+    /* no more io callbacks */
+    beam->cons_io_cb = NULL;
+    beam->recv_buffer = NULL;
+    beam->recv_pool = NULL;
+    
+    return beam_send_cleanup(beam);
 }
 
 apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *pool, 
                             int id, const char *tag, 
-                            h2_beam_owner_t owner,
                             apr_size_t max_buf_size,
                             apr_interval_time_t timeout)
 {
@@ -641,7 +547,6 @@ apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *pool,
     beam->id = id;
     beam->tag = tag;
     beam->pool = pool;
-    beam->owner = owner;
     H2_BLIST_INIT(&beam->send_list);
     H2_BLIST_INIT(&beam->hold_list);
     H2_BLIST_INIT(&beam->purge_list);
@@ -650,14 +555,11 @@ apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *pool,
     beam->max_buf_size = max_buf_size;
     beam->timeout = timeout;
 
-    rv = apr_thread_mutex_create(&beam->lock, APR_THREAD_MUTEX_DEFAULT, pool);
+    rv = apr_thread_mutex_create(&beam->lock, APR_THREAD_MUTEX_NESTED, pool);
     if (APR_SUCCESS == rv) {
         rv = apr_thread_cond_create(&beam->change, pool);
-        if (APR_SUCCESS == rv) {
-            apr_pool_pre_cleanup_register(pool, beam, beam_pool_cleanup);
-            *pbeam = beam;
-        }
     }
+    *pbeam = (APR_SUCCESS == rv)? beam : NULL;
     return rv;
 }
 
@@ -711,7 +613,7 @@ void h2_beam_abort(h2_bucket_beam *beam)
     
     if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
         beam->aborted = 1;
-        r_purge_sent(beam);
+        h2_blist_cleanup(&beam->purge_list);
         h2_blist_cleanup(&beam->send_list);
         report_consumption(beam, &bl);
         apr_thread_cond_broadcast(beam->change);
@@ -724,7 +626,7 @@ apr_status_t h2_beam_close(h2_bucket_beam *beam)
     h2_beam_lock bl;
     
     if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
-        r_purge_sent(beam);
+        h2_blist_cleanup(&beam->purge_list);
         beam_close(beam);
         report_consumption(beam, &bl);
         leave_yellow(beam, &bl);
@@ -757,17 +659,6 @@ apr_status_t h2_beam_wait_empty(h2_bucket_beam *beam, apr_read_type_e block)
     return status;
 }
 
-static void move_to_hold(h2_bucket_beam *beam, 
-                         apr_bucket_brigade *sender_bb)
-{
-    apr_bucket *b;
-    while (sender_bb && !APR_BRIGADE_EMPTY(sender_bb)) {
-        b = APR_BRIGADE_FIRST(sender_bb);
-        APR_BUCKET_REMOVE(b);
-        H2_BLIST_INSERT_TAIL(&beam->send_list, b);
-    }
-}
-
 static apr_status_t append_bucket(h2_bucket_beam *beam, 
                                   apr_bucket *b,
                                   apr_read_type_e block,
@@ -789,6 +680,19 @@ static apr_status_t append_bucket(h2_bucket_beam *beam,
         if (APR_BUCKET_IS_EOS(b)) {
             beam->closed = 1;
         }
+        if (AP_BUCKET_IS_EOR(b)) {
+            /* The problem with EOR buckets:
+             * - we cannot delete it now, as it will destroy the request pool
+             *   and free data that we are still holding in the beam.
+             * - if we add it to the send_list, as all other buckets,
+             *   it will most likely not be read, as an EOS came before.
+             *   This means we still juggle it when the beam is destroyed,
+             *   and rarely this seems to cause the pool to be freed twice...
+             *   if asan stack traces are to be believed...
+             * - since we 
+             */
+            beam->closed = 1;
+        }
         APR_BUCKET_REMOVE(b);
         H2_BLIST_INSERT_TAIL(&beam->send_list, b);
         return APR_SUCCESS;
@@ -844,7 +748,7 @@ static apr_status_t append_bucket(h2_bucket_beam *beam,
         /* this takes care of transient buckets and converts them
          * into heap ones. Other bucket types might or might not be
          * affected by this. */
-        status = apr_bucket_setaside(b, beam->send_pool);
+        status = apr_bucket_setaside(b, beam->pool);
     }
     else if (APR_BUCKET_IS_HEAP(b)) {
         /* For heap buckets read from a receiver thread is fine. The
@@ -864,7 +768,7 @@ static apr_status_t append_bucket(h2_bucket_beam *beam,
         }
     }
     else if (APR_BUCKET_IS_FILE(b) && can_beam) {
-        status = apr_bucket_setaside(b, beam->send_pool);
+        status = apr_bucket_setaside(b, beam->pool);
     }
     
     if (status == APR_ENOTIMPL) {
@@ -876,7 +780,7 @@ static apr_status_t append_bucket(h2_bucket_beam *beam,
          * use pools/allocators safely. */
         status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
         if (status == APR_SUCCESS) {
-            status = apr_bucket_setaside(b, beam->send_pool);
+            status = apr_bucket_setaside(b, beam->pool);
         }
     }
     
@@ -891,17 +795,6 @@ static apr_status_t append_bucket(h2_bucket_beam *beam,
     return APR_SUCCESS;
 }
 
-void h2_beam_send_from(h2_bucket_beam *beam, apr_pool_t *p)
-{
-    h2_beam_lock bl;
-    /* Called from the sender thread to add buckets to the beam */
-    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
-        r_purge_sent(beam);
-        beam_set_send_pool(beam, p);
-        leave_yellow(beam, &bl);
-    }
-}
-
 apr_status_t h2_beam_send(h2_bucket_beam *beam, 
                           apr_bucket_brigade *sender_bb, 
                           apr_read_type_e block)
@@ -913,11 +806,11 @@ apr_status_t h2_beam_send(h2_bucket_beam *beam,
 
     /* Called from the sender thread to add buckets to the beam */
     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
-        ap_assert(beam->send_pool);
-        r_purge_sent(beam);
+        ap_assert(beam->pool);
+        h2_blist_cleanup(&beam->purge_list);
         
         if (beam->aborted) {
-            move_to_hold(beam, sender_bb);
+            brigade_move_to_blist(sender_bb, &beam->send_list);
             rv = APR_ECONNABORTED;
         }
         else if (sender_bb) {
@@ -927,7 +820,7 @@ apr_status_t h2_beam_send(h2_bucket_beam *beam,
             while (!APR_BRIGADE_EMPTY(sender_bb) && APR_SUCCESS == rv) {
                 if (space_left <= 0) {
                     report_prod_io(beam, force_report, &bl);
-                    r_purge_sent(beam);
+                    h2_blist_cleanup(&beam->purge_list);
                     rv = wait_not_full(beam, block, &space_left, &bl);
                     if (APR_SUCCESS != rv) {
                         break;
index f260762366e3a46fb54b6f6ac9e46009b1e019dd..be5a2fda6e5f5c92c5bcc825c4b780ce87c524f5 100644 (file)
@@ -150,11 +150,6 @@ typedef struct {
 typedef int h2_beam_can_beam_callback(void *ctx, h2_bucket_beam *beam,
                                       apr_file_t *file);
 
-typedef enum {
-    H2_BEAM_OWNER_SEND,
-    H2_BEAM_OWNER_RECV
-} h2_beam_owner_t;
-
 /**
  * Will deny all transfer of apr_file_t across the beam and force
  * a data copy instead.
@@ -165,13 +160,11 @@ struct h2_bucket_beam {
     int id;
     const char *tag;
     apr_pool_t *pool;
-    h2_beam_owner_t owner;
     h2_blist send_list;
     h2_blist hold_list;
     h2_blist purge_list;
     apr_bucket_brigade *recv_buffer;
     h2_bproxy_list proxies;
-    apr_pool_t *send_pool;
     apr_pool_t *recv_pool;
     
     apr_size_t max_buf_size;
@@ -215,8 +208,6 @@ struct h2_bucket_beam {
  * @param pool          pool owning the beam, beam will cleanup when pool released
  * @param id            identifier of the beam
  * @param tag           tag identifying beam for logging
- * @param owner         if the beam is owned by the sender or receiver, e.g. if
- *                      the pool owner is using this beam for sending or receiving
  * @param buffer_size   maximum memory footprint of buckets buffered in beam, or
  *                      0 for no limitation
  * @param timeout       timeout for blocking operations
@@ -224,7 +215,6 @@ struct h2_bucket_beam {
 apr_status_t h2_beam_create(h2_bucket_beam **pbeam,
                             apr_pool_t *pool, 
                             int id, const char *tag,
-                            h2_beam_owner_t owner,  
                             apr_size_t buffer_size,
                             apr_interval_time_t timeout);
 
@@ -245,13 +235,6 @@ apr_status_t h2_beam_send(h2_bucket_beam *beam,
                           apr_bucket_brigade *bb, 
                           apr_read_type_e block);
 
-/**
- * Register the pool from which future buckets are send. This defines
- * the lifetime of the buckets, e.g. the pool should not be cleared/destroyed
- * until the data is no longer needed (or has been received).
- */
-void h2_beam_send_from(h2_bucket_beam *beam, apr_pool_t *p);
-
 /**
  * Receive buckets from the beam into the given brigade. Will return APR_EOF
  * when reading past an EOS bucket. Reads can be blocking until data is 
index 4fe7ea725f92d22176cbbebc0480b23a8ae5c9ae..17f5f06f4efec95aa0988fe5983514ea36b97ce9 100644 (file)
 
 #include "h2_private.h"
 #include "h2.h"
+#include "h2_ctx.h"
 #include "h2_mplx.h"
-#include "h2_stream.h"
+#include "h2_session.h"
 #include "h2_bucket_eos.h"
 
 typedef struct {
     apr_bucket_refcount refcount;
-    h2_stream *stream;
+    conn_rec *c;
+    int stream_id;
 } h2_bucket_eos;
 
-static apr_status_t bucket_cleanup(void *data)
-{
-    h2_stream **pstream = data;
-
-    if (*pstream) {
-        /* If bucket_destroy is called after us, this prevents
-         * bucket_destroy from trying to destroy the stream again. */
-        *pstream = NULL;
-    }
-    return APR_SUCCESS;
-}
-
 static apr_status_t bucket_read(apr_bucket *b, const char **str,
                                 apr_size_t *len, apr_read_type_e block)
 {
@@ -55,12 +45,13 @@ static apr_status_t bucket_read(apr_bucket *b, const char **str,
     return APR_SUCCESS;
 }
 
-apr_bucket *h2_bucket_eos_make(apr_bucket *b, h2_stream *stream)
+apr_bucket *h2_bucket_eos_make(apr_bucket *b, conn_rec *c, int stream_id)
 {
     h2_bucket_eos *h;
 
     h = apr_bucket_alloc(sizeof(*h), b->list);
-    h->stream = stream;
+    h->c = c;
+    h->stream_id = stream_id;
 
     b = apr_bucket_shared_make(b, h, 0, 0);
     b->type = &h2_bucket_type_eos;
@@ -68,35 +59,27 @@ apr_bucket *h2_bucket_eos_make(apr_bucket *b, h2_stream *stream)
     return b;
 }
 
-apr_bucket *h2_bucket_eos_create(apr_bucket_alloc_t *list,
-                                 h2_stream *stream)
+apr_bucket *h2_bucket_eos_create(apr_bucket_alloc_t *list, conn_rec *c, int stream_id)
 {
     apr_bucket *b = apr_bucket_alloc(sizeof(*b), list);
 
     APR_BUCKET_INIT(b);
     b->free = apr_bucket_free;
     b->list = list;
-    b = h2_bucket_eos_make(b, stream);
-    if (stream) {
-        h2_bucket_eos *h = b->data;
-        apr_pool_pre_cleanup_register(stream->pool, &h->stream, bucket_cleanup);
-    }
+    b = h2_bucket_eos_make(b, c, stream_id);
     return b;
 }
 
 static void bucket_destroy(void *data)
 {
     h2_bucket_eos *h = data;
-
+    h2_session *session;
+    
     if (apr_bucket_shared_destroy(h)) {
-        h2_stream *stream = h->stream;
-        if (stream && stream->pool) {
-            apr_pool_cleanup_kill(stream->pool, &h->stream, bucket_cleanup);
+        if ((session = h2_ctx_get_session(h->c))) {
+            h2_session_eos_sent(session, h->stream_id);
         }
         apr_bucket_free(h);
-        if (stream) {
-            h2_stream_dispatch(stream, H2_SEV_EOS_SENT);
-        }
     }
 }
 
index 04e32e37f117b823d194cc6b2cee08ff2896a59a..3f9b80071759b05e38cb75a7b719d2f074901ce8 100644 (file)
 #ifndef mod_http2_h2_bucket_stream_eos_h
 #define mod_http2_h2_bucket_stream_eos_h
 
-struct h2_stream;
-
 /** End Of HTTP/2 STREAM (H2EOS) bucket */
 extern const apr_bucket_type_t h2_bucket_type_eos;
 
 #define H2_BUCKET_IS_H2EOS(e)     (e->type == &h2_bucket_type_eos)
 
-apr_bucket *h2_bucket_eos_make(apr_bucket *b, struct h2_stream *stream);
+apr_bucket *h2_bucket_eos_make(apr_bucket *b, conn_rec *c, int stream_id);
 
-apr_bucket *h2_bucket_eos_create(apr_bucket_alloc_t *list, 
-                                 struct h2_stream *stream);
+apr_bucket *h2_bucket_eos_create(apr_bucket_alloc_t *list, conn_rec *c, int stream_id); 
 
 #endif /* mod_http2_h2_bucket_stream_eos_h */
index d29cd7e996b8c7918ed3fb6e408aaa92234ec0ee..7abbed989834f681e3e01c6521bf31539f98072e 100644 (file)
@@ -37,7 +37,6 @@
 #include "h2_filter.h"
 #include "h2_mplx.h"
 #include "h2_session.h"
-#include "h2_stream.h"
 #include "h2_h2.h"
 #include "h2_task.h"
 #include "h2_workers.h"
@@ -351,8 +350,7 @@ conn_rec *h2_slave_create(conn_rec *master, int slave_id, apr_pool_t *parent)
 
 void h2_slave_destroy(conn_rec *slave)
 {
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, slave,
-                  "h2_slave(%s): destroy", slave->log_id);
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, slave, "h2_slave(%s): destroy", slave->log_id);
     slave->sbh = NULL;
     apr_pool_destroy(slave->pool);
 }
index 68c15d13e4906f3c4cfe94b0d236e2fe5f5efcfa..25f96ed98c9be31c31441fcaae2ed6d3ac4c1579 100644 (file)
@@ -134,7 +134,8 @@ apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c, server_rec *s)
     io->c              = c;
     io->output         = apr_brigade_create(c->pool, c->bucket_alloc);
     io->is_tls         = h2_h2_is_tls(c);
-    io->buffer_output  = io->is_tls;
+    /* we used to buffer only on TLS connections, but to eliminate code paths
+     * and force more predictable behaviour, we do it on all now. Less test cases. */
     io->flush_threshold = (apr_size_t)h2_config_sgeti64(s, H2_CONF_STREAM_MAX_MEM);
 
     if (io->is_tls) {
@@ -150,14 +151,13 @@ apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c, server_rec *s)
     else {
         io->warmup_size    = 0;
         io->cooldown_usecs = 0;
-        io->write_size     = 0;
+        io->write_size     = WRITE_SIZE_MAX;
     }
 
     if (APLOGctrace1(c)) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c,
-                      "h2_conn_io(%ld): init, buffering=%d, warmup_size=%ld, "
-                      "cd_secs=%f", io->c->id, io->buffer_output, 
-                      (long)io->warmup_size,
+                      "h2_conn_io(%ld): init, warmup_size=%ld, cd_secs=%f", 
+                      io->c->id, (long)io->warmup_size,
                       ((double)io->cooldown_usecs/APR_USEC_PER_SEC));
     }
 
@@ -321,24 +321,19 @@ apr_status_t h2_conn_io_write(h2_conn_io *io, const char *data, size_t length)
         io->is_flushed = 0;
     }
     
-    if (io->buffer_output) {
-        while (length > 0) {
-            remain = assure_scratch_space(io);
-            if (remain >= length) {
-                memcpy(io->scratch + io->slen, data, length);
-                io->slen += length;
-                length = 0;
-            }
-            else {
-                memcpy(io->scratch + io->slen, data, remain);
-                io->slen += remain;
-                data += remain;
-                length -= remain;
-            }
+    while (length > 0) {
+        remain = assure_scratch_space(io);
+        if (remain >= length) {
+            memcpy(io->scratch + io->slen, data, length);
+            io->slen += length;
+            length = 0;
+        }
+        else {
+            memcpy(io->scratch + io->slen, data, remain);
+            io->slen += remain;
+            data += remain;
+            length -= remain;
         }
-    }
-    else {
-        status = apr_brigade_write(io->output, NULL, NULL, data, length);
     }
     return status;
 }
@@ -356,37 +351,26 @@ apr_status_t h2_conn_io_pass(h2_conn_io *io, apr_bucket_brigade *bb)
         b = APR_BRIGADE_FIRST(bb);
         
         if (APR_BUCKET_IS_METADATA(b)) {
-            /* need to finish any open scratch bucket, as meta data 
-             * needs to be forward "in order". */
-            append_scratch(io);
-            APR_BUCKET_REMOVE(b);
-            APR_BRIGADE_INSERT_TAIL(io->output, b);
-        }
-        else if (io->buffer_output) {
-            apr_size_t remain = assure_scratch_space(io);
-            if (b->length > remain) {
-                apr_bucket_split(b, remain);
-                if (io->slen == 0) {
-                    /* complete write_size bucket, append unchanged */
-                    APR_BUCKET_REMOVE(b);
-                    APR_BRIGADE_INSERT_TAIL(io->output, b);
-                    continue;
-                }
+            if (APR_BUCKET_IS_FLUSH(b)) {
+                /* need to finish any open scratch bucket, as meta data 
+                 * needs to be forward "in order". */
+                append_scratch(io);
+                APR_BUCKET_REMOVE(b);
+                APR_BRIGADE_INSERT_TAIL(io->output, b);
             }
             else {
-                /* bucket fits in remain, copy to scratch */
-                status = read_to_scratch(io, b);
                 apr_bucket_delete(b);
-                continue;
             }
         }
         else {
-            /* no buffering, forward buckets setaside on flush */
-            if (APR_BUCKET_IS_TRANSIENT(b)) {
-                apr_bucket_setaside(b, io->c->pool);
+            apr_size_t remain = assure_scratch_space(io);
+            if (b->length > remain) {
+                apr_bucket_split(b, remain);
             }
-            APR_BUCKET_REMOVE(b);
-            APR_BRIGADE_INSERT_TAIL(io->output, b);
+            /* bucket now fits in remain, copy to scratch */
+            status = read_to_scratch(io, b);
+            apr_bucket_delete(b);
+            continue;
         }
     }
     return status;
index e96203cac243aba242b8d552cef0c38a79ef6457..a8821aa0955615411d68a9d1f2960cb9bc2c09b3 100644 (file)
@@ -39,7 +39,6 @@ typedef struct {
     apr_int64_t bytes_read;
     apr_int64_t bytes_written;
     
-    int buffer_output;
     apr_size_t flush_threshold;
     unsigned int is_flushed : 1;
     
index 12810d81b7793f5a0ba4bdfd2304451f899bc7c1..bb32546aabf47473db016af112c644e238b57d4c 100644 (file)
@@ -19,7 +19,6 @@
 
 struct h2_bucket_beam;
 struct h2_headers;
-struct h2_stream;
 struct h2_session;
 
 typedef struct h2_filter_cin {
index 4ff1d51d84eac0e4cb659a10b7a225dc529caf36..ded9bc41c46ecfa39f9a88c931ff61cd411cca02 100644 (file)
@@ -34,7 +34,6 @@
 #include "h2_private.h"
 
 #include "h2_bucket_beam.h"
-#include "h2_stream.h"
 #include "h2_task.h"
 #include "h2_config.h"
 #include "h2_ctx.h"
@@ -756,6 +755,10 @@ static int h2_h2_late_fixups(request_rec *r)
             }
             check_push(r, "late_fixup");
         }
+        /* enforce that we will close this slave connection after
+         * the task is done. This will keep request processing from
+         * trying to clean up dangling input data, for example. */
+        r->connection->keepalive = AP_CONN_CLOSE;
     }
     return DECLINED;
 }
index 81b063ad44a49816428ecde9b4c60900a4918275..ba47edec3359b9fdd44e34e6c3e75ed6482b1795 100644 (file)
@@ -61,8 +61,8 @@ apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s)
 }
 
 #define H2_MPLX_ENTER(m)    \
-    do { apr_status_t rv; if ((rv = apr_thread_mutex_lock(m->lock)) != APR_SUCCESS) {\
-        return rv;\
+    do { apr_status_t lrv; if ((lrv = apr_thread_mutex_lock(m->lock)) != APR_SUCCESS) {\
+        return lrv;\
     } } while(0)
 
 #define H2_MPLX_LEAVE(m)    \
@@ -104,7 +104,7 @@ static void stream_joined(h2_mplx *m, h2_stream *stream)
     h2_ihash_add(m->spurge, stream);
 }
 
-static void stream_cleanup(h2_mplx *m, h2_stream *stream)
+static void stream_discard(h2_mplx *m, h2_stream *stream)
 {
     ap_assert(stream->state == H2_SS_CLEANUP);
 
@@ -175,7 +175,7 @@ h2_mplx *h2_mplx_create(conn_rec *c, server_rec *s, apr_pool_t *parent,
         }
         apr_pool_tag(m->pool, "h2_mplx");
         apr_allocator_owner_set(allocator, m->pool);
-        status = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_DEFAULT,
+        status = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED,
                                          m->pool);
         if (status != APR_SUCCESS) {
             apr_pool_destroy(m->pool);
@@ -183,7 +183,7 @@ h2_mplx *h2_mplx_create(conn_rec *c, server_rec *s, apr_pool_t *parent,
         }
         apr_allocator_mutex_set(allocator, mutex);
 
-        status = apr_thread_mutex_create(&m->lock, APR_THREAD_MUTEX_DEFAULT,
+        status = apr_thread_mutex_create(&m->lock, APR_THREAD_MUTEX_NESTED,
                                          m->pool);
         if (status != APR_SUCCESS) {
             apr_pool_destroy(m->pool);
@@ -267,8 +267,13 @@ static int stream_destroy_iter(void *ctx, void *val)
     h2_mplx *m = ctx;
     h2_stream *stream = val;
 
+    /* Make dead certain we are called for a stream 
+    to purge and that we have not already done so */
+    ap_assert(h2_ihash_get(m->spurge, stream->id) == stream);
+    
     h2_ihash_remove(m->spurge, stream->id);
     ap_assert(stream->state == H2_SS_CLEANUP);
+    stream->state = H2_SS_DESTROYED;
     
     if (stream->input) {
         /* Process outstanding events before destruction */
@@ -303,15 +308,15 @@ static int stream_destroy_iter(void *ctx, void *val)
                                && !task->rst_error);
             }
             
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, 
+                          APLOGNO(03385) "h2_task_destroy, reuse slave=%d", reuse_slave); 
+            task->c = NULL;
+            h2_task_destroy(task);
+            
             if (reuse_slave) {
-                h2_beam_log(task->output.beam, m->c, APLOG_DEBUG, 
-                            APLOGNO(03385) "h2_task_destroy, reuse slave");    
-                h2_task_destroy(task);
                 APR_ARRAY_PUSH(m->spare_slaves, conn_rec*) = slave;
             }
             else {
-                h2_beam_log(task->output.beam, m->c, APLOG_TRACE1, 
-                            "h2_task_destroy, destroy slave");    
                 h2_slave_destroy(slave);
             }
         }
@@ -320,15 +325,15 @@ static int stream_destroy_iter(void *ctx, void *val)
     return 0;
 }
 
-static void purge_streams(h2_mplx *m, int lock)
+static void purge_streams(h2_mplx *m)
 {
+    H2_MPLX_ENTER_ALWAYS(m);
     if (!h2_ihash_empty(m->spurge)) {
-        H2_MPLX_ENTER_MAYBE(m, lock);
         while (!h2_ihash_iter(m->spurge, stream_destroy_iter, m)) {
             /* repeat until empty */
         }
-        H2_MPLX_LEAVE_MAYBE(m, lock);
     }
+    H2_MPLX_LEAVE(m);
 }
 
 typedef struct {
@@ -390,7 +395,7 @@ static int unexpected_stream_iter(void *ctx, void *val) {
     return 1;
 }
 
-static int stream_cancel_iter(void *ctx, void *val) {
+static int stream_cancel_and_discard_iter(void *ctx, void *val) {
     h2_mplx *m = ctx;
     h2_stream *stream = val;
 
@@ -404,7 +409,7 @@ static int stream_cancel_iter(void *ctx, void *val) {
     h2_stream_rst(stream, H2_ERR_NO_ERROR);
     /* All connection data has been sent, simulate cleanup */
     h2_stream_dispatch(stream, H2_SEV_EOS_SENT);
-    stream_cleanup(m, stream);  
+    stream_discard(m, stream);  
     return 0;
 }
 
@@ -430,7 +435,7 @@ void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
 
     /* How to shut down a h2 connection:
      * 1. cancel all streams still active */
-    while (!h2_ihash_iter(m->streams, stream_cancel_iter, m)) {
+    while (!h2_ihash_iter(m->streams, stream_cancel_and_discard_iter, m)) {
         /* until empty */
     }
     
@@ -466,6 +471,7 @@ void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
         h2_ihash_iter(m->shold, unexpected_stream_iter, m);
     }
     
+    purge_streams(m);
     m->c->aborted = old_aborted;
     H2_MPLX_LEAVE(m);
 
@@ -473,16 +479,9 @@ void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
                   "h2_mplx(%ld): released", m->id);
 }
 
-apr_status_t h2_mplx_stream_cleanup(h2_mplx *m, h2_stream *stream)
+static h2_stream *mplx_stream_get(h2_mplx *m, int id)
 {
-    H2_MPLX_ENTER(m);
-    
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
-                  H2_STRM_MSG(stream, "cleanup"));
-    stream_cleanup(m, stream);        
-    
-    H2_MPLX_LEAVE(m);
-    return APR_SUCCESS;
+    return h2_ihash_get(m->streams, id);
 }
 
 h2_stream *h2_mplx_stream_get(h2_mplx *m, int id)
@@ -490,13 +489,26 @@ h2_stream *h2_mplx_stream_get(h2_mplx *m, int id)
     h2_stream *s = NULL;
     
     H2_MPLX_ENTER_ALWAYS(m);
-
     s = h2_ihash_get(m->streams, id);
-
     H2_MPLX_LEAVE(m);
     return s;
 }
 
+apr_status_t h2_mplx_stream_discard(h2_mplx *m, int stream_id)
+{
+    h2_stream *stream;
+    
+    H2_MPLX_ENTER(m);
+    stream = mplx_stream_get(m, stream_id);
+    if (stream) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
+                      H2_STRM_MSG(stream, "cleanup"));
+        stream_discard(m, stream);        
+    }
+    H2_MPLX_LEAVE(m);
+    return APR_SUCCESS;
+}
+
 static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
 {
     h2_stream *stream = ctx;
@@ -594,7 +606,7 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
         status = APR_SUCCESS;
     }
     else {
-        purge_streams(m, 0);
+        purge_streams(m);
         h2_ihash_iter(m->streams, report_consumption_iter, m);
         m->added_output = iowait;
         status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
@@ -656,19 +668,31 @@ static void register_if_needed(h2_mplx *m)
     }
 }
 
-apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream, 
-                             h2_stream_pri_cmp *cmp, void *ctx)
+void h2_mplx_stream_register(h2_mplx *m, h2_stream *stream)
 {
-    apr_status_t status;
+    H2_MPLX_ENTER_ALWAYS(m);
+    AP_DEBUG_ASSERT(h2_ihash_get(m->streams, stream->id) == NULL);
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, H2_STRM_MSG(stream, "registered")); 
+    h2_ihash_add(m->streams, stream);
+    H2_MPLX_LEAVE(m);
+}
+
+apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, h2_stream_pri_cmp *cmp, void *ctx)
+{
+    h2_stream *stream;
+    apr_status_t rv = APR_SUCCESS;
     
     H2_MPLX_ENTER(m);
 
     if (m->aborted) {
-        status = APR_ECONNABORTED;
+        rv = APR_ECONNABORTED;
     }
     else {
-        status = APR_SUCCESS;
-        h2_ihash_add(m->streams, stream);
+        stream = mplx_stream_get(m, stream_id);
+        if (!stream) goto leave;
+        ap_assert(!stream->scheduled);
+        stream->scheduled = 1;
+        
         if (h2_stream_is_ready(stream)) {
             /* already have a response */
             check_data_for(m, stream, 0);
@@ -682,9 +706,9 @@ apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream,
                           H2_STRM_MSG(stream, "process, added to q")); 
         }
     }
-
+leave:
     H2_MPLX_LEAVE(m);
-    return status;
+    return rv;
 }
 
 static h2_task *next_stream_task(h2_mplx *m)
@@ -1026,7 +1050,6 @@ apr_status_t h2_mplx_idle(h2_mplx *m)
                                               ", out has %ld bytes buffered"),
                                   h2_beam_is_closed(stream->output),
                                   (long)h2_beam_get_buffered(stream->output));
-                    h2_ihash_add(m->streams, stream);
                     check_data_for(m, stream, 0);
                     stream->out_checked = 1;
                     status = APR_EAGAIN;
@@ -1062,7 +1085,7 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m,
 
     /* update input windows for streams */
     h2_ihash_iter(m->streams, report_consumption_iter, m);    
-    purge_streams(m, 1);
+    purge_streams(m);
     
     n = h2_ififo_count(m->readyq);
     while (n > 0 
index 575ccaf430f0ddfa0063efeeb6d3f4c9fdf0401e..af8462a22a3d6bd28b8ec1f3d8272902b612eb1c 100644 (file)
@@ -136,7 +136,22 @@ int h2_mplx_is_busy(h2_mplx *m);
  * IO lifetime of streams.
  ******************************************************************************/
 
-struct h2_stream *h2_mplx_stream_get(h2_mplx *m, int id);
+/**
+ * Register a stream with the multiplexer. This transfers responisibility
+ * for lifetime and final destruction to mplx.
+ * @param mplx the multiplexer
+ * @param stream the h2 stream instance
+ */
+void h2_mplx_stream_register(h2_mplx *mplx, struct h2_stream *stream);
+
+/**
+ * Lookup a stream by its id. Will only return active streams, not discarded ones.
+ * @param mplx the multiplexer
+ * @param id the stream identifier
+ * @return the stream or NULL
+ */
+struct h2_stream *h2_mplx_stream_get(h2_mplx *mplx, int id);
 
 /**
  * Notifies mplx that a stream has been completely handled on the main
@@ -145,7 +160,7 @@ struct h2_stream *h2_mplx_stream_get(h2_mplx *m, int id);
  * @param m the mplx itself
  * @param stream the stream ready for cleanup
  */
-apr_status_t h2_mplx_stream_cleanup(h2_mplx *m, struct h2_stream *stream);
+apr_status_t h2_mplx_stream_discard(h2_mplx *m, int stream_id);
 
 /**
  * Waits on output data from any stream in this session to become available. 
@@ -164,13 +179,12 @@ apr_status_t h2_mplx_keep_active(h2_mplx *m, struct h2_stream *stream);
  * Process a stream request.
  * 
  * @param m the multiplexer
- * @param stream the identifier of the stream
+ * @param stream_id the identifier of the stream
  * @param r the request to be processed
  * @param cmp the stream priority compare function
  * @param ctx context data for the compare function
  */
-apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream, 
-                             h2_stream_pri_cmp *cmp, void *ctx);
+apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, h2_stream_pri_cmp *cmp, void *ctx);
 
 /**
  * Stream priorities have changed, reschedule pending requests.
index 750088af2d9c5d83a2cbb4c3fb98c15281af9ac1..4740026e2c9bcd41ffb8b540a841ff273b137282 100644 (file)
@@ -622,15 +622,19 @@ static h2_push_diary_entry *move_to_last(h2_push_diary *diary, apr_size_t idx)
 {
     h2_push_diary_entry *entries = (h2_push_diary_entry*)diary->entries->elts;
     h2_push_diary_entry e;
-    apr_size_t lastidx = (apr_size_t)diary->entries->nelts;
     
-    /* move entry[idx] to the end */
-    if (idx+1 < lastidx) {
-        e =  entries[idx];
-        memmove(entries+idx, entries+idx+1, sizeof(e) * (lastidx - idx));
-        entries[lastidx] = e;
+    if (diary->entries->nelts > 0) {
+        int lastidx = diary->entries->nelts - 1;
+        
+        /* move entry[idx] to the end */
+        if (idx < lastidx) {
+            e =  entries[idx];
+            memmove(entries+idx, entries+idx+1, sizeof(e) * (lastidx - idx));
+            entries[lastidx] = e;
+            return &entries[lastidx];
+        }
     }
-    return &entries[lastidx];
+    return &entries[idx];
 }
 
 static void h2_push_diary_append(h2_push_diary *diary, h2_push_diary_entry *e)
@@ -707,7 +711,7 @@ apr_array_header_t *h2_push_collect_update(h2_stream *stream,
         }
     }
     pushes = h2_push_collect(stream->pool, req, stream->push_policy, res);
-    return h2_push_diary_update(stream->session, pushes);
+    return h2_push_diary_update(session, pushes);
 }
 
 static apr_int32_t h2_log2inv(unsigned char log2)
index 1fceabc112997b13b208e99079308157d0238210..8a10e40973c707137db16acaf7eec86bd6fa6127 100644 (file)
@@ -75,7 +75,15 @@ static int h2_session_status_from_apr_status(apr_status_t rv)
 
 static h2_stream *get_stream(h2_session *session, int stream_id)
 {
-    return nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
+    h2_stream *stream;
+    
+    if (stream_id <= 0) return NULL;
+    stream = h2_mplx_stream_get(session->mplx, stream_id);
+    if (!stream) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+                      "session_stream_get(%d) == NULL", stream_id);
+    }
+    return stream;
 }
 
 static void dispatch_event(h2_session *session, h2_session_event_t ev, 
@@ -109,21 +117,24 @@ static void cleanup_unprocessed_streams(h2_session *session)
     h2_mplx_stream_do(session->mplx, rst_unprocessed_stream, session);
 }
 
-static h2_stream *h2_session_open_stream(h2_session *session, int stream_id,
-                                         int initiated_on)
+static apr_pool_t *session_stream_pool_create(h2_session *session) 
 {
-    h2_stream * stream;
-    apr_pool_t *stream_pool;
-    
-    apr_pool_create(&stream_pool, session->pool);
-    apr_pool_tag(stream_pool, "h2_stream");
+    apr_pool_t *pool;
     
-    stream = h2_stream_create(stream_id, stream_pool, session, 
-                              session->monitor, initiated_on);
-    if (stream) {
-        nghttp2_session_set_stream_user_data(session->ngh2, stream_id, stream);
-    }
-    return stream;
+    apr_pool_create(&pool, session->pool);
+    apr_pool_tag(pool, "h2_stream");
+    return pool;
+}
+
+static h2_stream *session_stream_pcreate(h2_session *session, int stream_id,
+                                            apr_pool_t *pool, int initiated_on)
+{
+    return h2_stream_create(stream_id, pool, session, session->monitor, initiated_on);
+}
+
+static h2_stream *session_stream_create(h2_session *session, int stream_id)
+{
+    return session_stream_pcreate(session, stream_id, session_stream_pool_create(session), 0);
 }
 
 /**
@@ -275,19 +286,18 @@ static int on_begin_headers_cb(nghttp2_session *ngh2,
                                const nghttp2_frame *frame, void *userp)
 {
     h2_session *session = (h2_session *)userp;
-    h2_stream *s;
+    h2_stream *stream;
     
     /* We may see HEADERs at the start of a stream or after all DATA
      * streams to carry trailers. */
     (void)ngh2;
-    s = get_stream(session, frame->hd.stream_id);
-    if (s) {
-        /* nop */
-    }
-    else {
-        s = h2_session_open_stream(userp, frame->hd.stream_id, 0);
+    stream = get_stream(session, frame->hd.stream_id);
+    if (!stream) {
+        stream = session_stream_create(session, frame->hd.stream_id);
+        if (!stream) return NGHTTP2_ERR_START_STREAM_NOT_ALLOWED;
+        h2_mplx_stream_register(session->mplx, stream);
     }
-    return s? 0 : NGHTTP2_ERR_START_STREAM_NOT_ALLOWED;
+    return 0;
 }
 
 static int on_header_cb(nghttp2_session *ngh2, const nghttp2_frame *frame,
@@ -366,13 +376,15 @@ static int on_frame_recv_cb(nghttp2_session *ng2s,
             break;
         case NGHTTP2_PRIORITY:
             session->reprioritize = 1;
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
-                          "h2_stream(%ld-%d): PRIORITY frame "
-                          " weight=%d, dependsOn=%d, exclusive=%d", 
-                          session->id, (int)frame->hd.stream_id,
-                          frame->priority.pri_spec.weight,
-                          frame->priority.pri_spec.stream_id,
-                          frame->priority.pri_spec.exclusive);
+            if (APLOGctrace2(session->c)) {
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+                              "h2_stream(%ld-%d): PRIORITY frame "
+                              " weight=%d, dependsOn=%d, exclusive=%d", 
+                              session->id, (int)frame->hd.stream_id,
+                              frame->priority.pri_spec.weight,
+                              frame->priority.pri_spec.stream_id,
+                              frame->priority.pri_spec.exclusive);
+            }
             break;
         case NGHTTP2_WINDOW_UPDATE:
             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
@@ -384,16 +396,15 @@ static int on_frame_recv_cb(nghttp2_session *ng2s,
             }
             break;
         case NGHTTP2_RST_STREAM:
-            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03067)
-                          "h2_stream(%ld-%d): RST_STREAM by client, errror=%d",
-                          session->id, (int)frame->hd.stream_id,
-                          (int)frame->rst_stream.error_code);
-            stream = get_stream(session, frame->hd.stream_id);
-            if (stream && stream->initiated_on) {
-                ++session->pushes_reset;
+            if (APLOGcdebug(session->c)) {
+                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03067)
+                              "h2_stream(%ld-%d): RST_STREAM by client, errror=%d",
+                              session->id, (int)frame->hd.stream_id,
+                              (int)frame->rst_stream.error_code);
             }
-            else {
-                ++session->streams_reset;
+            stream = get_stream(session, frame->hd.stream_id);
+            if (stream) {
+                stream->initiated_on? ++session->pushes_reset : ++session->streams_reset;
             }
             break;
         case NGHTTP2_GOAWAY:
@@ -457,18 +468,7 @@ static int on_frame_recv_cb(nghttp2_session *ng2s,
         }
     }
     
-    if (APR_SUCCESS != rv) return NGHTTP2_ERR_PROTO;
-    return 0;
-}
-
-static int h2_session_continue_data(h2_session *session) {
-    if (h2_mplx_has_master_events(session->mplx)) {
-        return 0;
-    }
-    if (h2_conn_io_needs_flush(&session->io)) {
-        return 0;
-    }
-    return 1;
+    return (APR_SUCCESS != rv)? NGHTTP2_ERR_PROTO : 0;
 }
 
 static char immortal_zeros[H2_MAX_PADLEN];
@@ -491,7 +491,8 @@ static int on_send_data_cb(nghttp2_session *ngh2,
     
     (void)ngh2;
     (void)source;
-    if (!h2_session_continue_data(session)) {
+    /* Be nimble, react to events from your tasks and do not buffer more than we need */
+    if (h2_mplx_has_master_events(session->mplx) ||h2_conn_io_needs_flush(&session->io)) {
         return NGHTTP2_ERR_WOULDBLOCK;
     }
 
@@ -833,10 +834,9 @@ apr_status_t h2_session_create(h2_session **psession, conn_rec *c, request_rec *
     }
     apr_pool_tag(pool, "h2_session");
     apr_allocator_owner_set(allocator, pool);
-    status = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_DEFAULT, pool);
+    status = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool);
     if (status != APR_SUCCESS) {
-        apr_pool_destroy(pool);
-        return APR_ENOMEM;
+        goto fail;
     }
     apr_allocator_mutex_set(allocator, mutex);
     
@@ -862,26 +862,25 @@ apr_status_t h2_session_create(h2_session **psession, conn_rec *c, request_rec *
     
     status = apr_thread_cond_create(&session->iowait, session->pool);
     if (status != APR_SUCCESS) {
-        apr_pool_destroy(pool);
-        return status;
+        goto fail;
     }
     
     session->in_pending = h2_iq_create(session->pool, (int)session->max_stream_count);
     if (session->in_pending == NULL) {
-        apr_pool_destroy(pool);
-        return APR_ENOMEM;
+        status = APR_ENOMEM;
+        goto fail;
     }
 
     session->in_process = h2_iq_create(session->pool, (int)session->max_stream_count);
     if (session->in_process == NULL) {
-        apr_pool_destroy(pool);
-        return APR_ENOMEM;
+        status = APR_ENOMEM;
+        goto fail;
     }
     
     session->monitor = apr_pcalloc(pool, sizeof(h2_stream_monitor));
     if (session->monitor == NULL) {
-        apr_pool_destroy(pool);
-        return APR_ENOMEM;
+        status = APR_ENOMEM;
+        goto fail;
     }
     session->monitor->ctx = session;
     session->monitor->on_state_enter = on_stream_state_enter;
@@ -906,8 +905,8 @@ apr_status_t h2_session_create(h2_session **psession, conn_rec *c, request_rec *
     if (status != APR_SUCCESS) {
         ap_log_cerror(APLOG_MARK, APLOG_ERR, status, c, APLOGNO(02927) 
                       "nghttp2: error in init_callbacks");
-        apr_pool_destroy(pool);
-        return status;
+        status = APR_ENOMEM;
+        goto fail;
     }
     
     rv = nghttp2_option_new(&options);
@@ -915,8 +914,8 @@ apr_status_t h2_session_create(h2_session **psession, conn_rec *c, request_rec *
         ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EGENERAL, c,
                       APLOGNO(02928) "nghttp2_option_new: %s", 
                       nghttp2_strerror(rv));
-        apr_pool_destroy(pool);
-        return status;
+        status = APR_ENOMEM;
+        goto fail;
     }
     nghttp2_option_set_peer_max_concurrent_streams(options, (uint32_t)session->max_stream_count);
     /* We need to handle window updates ourself, otherwise we
@@ -932,8 +931,8 @@ apr_status_t h2_session_create(h2_session **psession, conn_rec *c, request_rec *
         ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EGENERAL, c,
                       APLOGNO(02929) "nghttp2_session_server_new: %s",
                       nghttp2_strerror(rv));
-        apr_pool_destroy(pool);
-        return APR_ENOMEM;
+        status = APR_ENOMEM;
+        goto fail;
     }
     
     n = h2_config_sgeti(s, H2_CONF_PUSH_DIARY_SIZE);
@@ -956,6 +955,9 @@ apr_status_t h2_session_create(h2_session **psession, conn_rec *c, request_rec *
     apr_pool_pre_cleanup_register(pool, c, session_pool_cleanup);
         
     return APR_SUCCESS;
+fail:
+    apr_pool_destroy(pool);
+    return status;
 }
 
 static apr_status_t h2_session_start(h2_session *session, int *rv)
@@ -1003,7 +1005,7 @@ static apr_status_t h2_session_start(h2_session *session, int *rv)
         }
         
         /* Now we need to auto-open stream 1 for the request we got. */
-        stream = h2_session_open_stream(session, 1, 0);
+        stream = session_stream_create(session, 1);
         if (!stream) {
             status = APR_EGENERAL;
             ap_log_rerror(APLOG_MARK, APLOG_ERR, status, session->r,
@@ -1011,11 +1013,11 @@ static apr_status_t h2_session_start(h2_session *session, int *rv)
                           nghttp2_strerror(*rv));
             return status;
         }
-        
         status = h2_stream_set_request_rec(stream, session->r, 1);
         if (status != APR_SUCCESS) {
             return status;
         }
+        h2_mplx_stream_register(session->mplx, stream);
     }
 
     slen = 0;
@@ -1146,48 +1148,6 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
     return (ssize_t)nread;
 }
 
-struct h2_stream *h2_session_push(h2_session *session, h2_stream *is,
-                                  h2_push *push)
-{
-    h2_stream *stream;
-    h2_ngheader *ngh;
-    apr_status_t status;
-    int nid = 0;
-    
-    status = h2_req_create_ngheader(&ngh, is->pool, push->req);
-    if (status == APR_SUCCESS) {
-        nid = nghttp2_submit_push_promise(session->ngh2, 0, is->id, 
-                                          ngh->nv, ngh->nvlen, NULL);
-    }
-    if (status != APR_SUCCESS || nid <= 0) {
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, 
-                      H2_STRM_LOG(APLOGNO(03075), is, 
-                      "submitting push promise fail: %s"), nghttp2_strerror(nid));
-        return NULL;
-    }
-    ++session->pushes_promised;
-    
-    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
-                  H2_STRM_LOG(APLOGNO(03076), is, "SERVER_PUSH %d for %s %s on %d"),
-                  nid, push->req->method, push->req->path, is->id);
-                  
-    stream = h2_session_open_stream(session, nid, is->id);
-    if (!stream) {
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
-                      H2_STRM_LOG(APLOGNO(03077), stream, 
-                      "failed to create stream obj %d"), nid);
-        /* kill the push_promise */
-        nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE, nid,
-                                  NGHTTP2_INTERNAL_ERROR);
-        return NULL;
-    }
-    
-    h2_session_set_prio(session, stream, push->priority);
-    h2_stream_set_request(stream, push->req);
-    ++session->unsent_promises;
-    return stream;
-}
-
 static int valid_weight(float f) 
 {
     int w = (int)f;
@@ -1195,8 +1155,8 @@ static int valid_weight(float f)
             (w > NGHTTP2_MAX_WEIGHT)? NGHTTP2_MAX_WEIGHT : w);
 }
 
-apr_status_t h2_session_set_prio(h2_session *session, h2_stream *stream, 
-                                 const h2_priority *prio)
+static apr_status_t session_stream_priority_set(h2_session *session, h2_stream *stream, 
+                                                const h2_priority *prio)
 {
     apr_status_t status = APR_SUCCESS;
 #ifdef H2_NG2_CHANGE_PRIO
@@ -1294,6 +1254,51 @@ apr_status_t h2_session_set_prio(h2_session *session, h2_stream *stream,
     return status;
 }
 
+apr_status_t h2_session_push(h2_session *session, int initiating_stream_id, h2_push *push)
+{
+    h2_stream *stream;
+    apr_pool_t *pool;
+    h2_ngheader *ngh;
+    int nid = 0;
+    
+    pool = session_stream_pool_create(session);
+    if (APR_SUCCESS != h2_req_create_ngheader(&ngh, pool, push->req)) goto fail;
+    
+    nid = nghttp2_submit_push_promise(session->ngh2, 0, initiating_stream_id, 
+                                      ngh->nv, ngh->nvlen, NULL);
+    if (nid <= 0) {
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
+                      APLOGNO(03075) "submitting push promise fail: %s", nghttp2_strerror(nid));
+        goto fail;
+    }
+    
+    ++session->pushes_promised;
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
+                  APLOGNO(03076) "SERVER_PUSH %d for %s %s on %d",
+                  nid, push->req->method, push->req->path, initiating_stream_id);
+                  
+    stream = session_stream_pcreate(session, nid, pool, initiating_stream_id);
+    if (!stream) {
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
+                      H2_STRM_LOG(APLOGNO(03077), stream, 
+                      "failed to create stream obj %d"), nid);
+        goto fail;
+    }
+    
+    session_stream_priority_set(session, stream, push->priority);
+    h2_stream_request_set(stream, push->req);
+    ++session->unsent_promises;
+    h2_mplx_stream_register(session->mplx, stream);
+    return APR_SUCCESS;
+    
+fail:
+    if (nid > 0) {
+        nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE, nid, NGHTTP2_INTERNAL_ERROR);
+    }
+    if (pool) apr_pool_destroy(pool);
+    return APR_EINVAL;
+}
+
 int h2_session_push_enabled(h2_session *session)
 {
     /* iff we can and they can and want */
@@ -1421,7 +1426,7 @@ static apr_status_t on_stream_headers(h2_session *session, h2_stream *stream,
         if (!stream->pref_priority) {
             stream->pref_priority = h2_stream_get_priority(stream, headers);
         }
-        h2_session_set_prio(session, stream, stream->pref_priority);
+        session_stream_priority_set(session, stream, stream->pref_priority);
         
         note = apr_table_get(headers->notes, H2_FILTER_DEBUG_NOTE);
         if (note && !strcmp("on", note)) {
@@ -1545,9 +1550,8 @@ static void h2_session_in_flush(h2_session *session)
     while ((id = h2_iq_shift(session->in_process)) > 0) {
         h2_stream *stream = get_stream(session, id);
         if (stream) {
-            ap_assert(!stream->scheduled);
             if (h2_stream_prep_processing(stream) == APR_SUCCESS) {
-                h2_mplx_process(session->mplx, stream, stream_pri_cmp, session);
+                h2_mplx_process(session->mplx, id, stream_pri_cmp, session);
             }
             else {
                 h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
@@ -1915,6 +1919,21 @@ static void ev_stream_open(h2_session *session, h2_stream *stream)
     h2_iq_append(session->in_process, stream->id);
 }
 
+void h2_session_eos_sent(h2_session *session, int stream_id)
+{
+    /* stream may no longer be known by nghttp2, but still kept in mplx */
+    h2_stream *stream = h2_mplx_stream_get(session->mplx, stream_id);
+    if (stream) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+                    H2_STRM_MSG(stream, "eos sent"));
+        h2_stream_dispatch(stream, H2_SEV_EOS_SENT);
+    }
+    else {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+                      "eos sent for unknown stream %d", stream_id);
+    }
+}
+
 static void ev_stream_closed(h2_session *session, h2_stream *stream)
 {
     apr_bucket *b;
@@ -1930,12 +1949,14 @@ static void ev_stream_closed(h2_session *session, h2_stream *stream)
             break;
     }
     
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+                  H2_STRM_MSG(stream, "sending eos"));
     /* The stream might have data in the buffers of the main connection.
      * We can only free the allocated resources once all had been written.
      * Send a special buckets on the connection that gets destroyed when
      * all preceding data has been handled. On its destruction, it is safe
      * to purge all resources of the stream. */
-    b = h2_bucket_eos_create(session->c->bucket_alloc, stream);
+    b = h2_bucket_eos_create(session->c->bucket_alloc, session->c, stream->id);
     APR_BRIGADE_INSERT_TAIL(session->bbtmp, b);
     h2_conn_io_pass(&session->io, session->bbtmp);
     apr_brigade_cleanup(session->bbtmp);
@@ -1977,7 +1998,7 @@ static void on_stream_state_enter(void *ctx, h2_stream *stream)
             ev_stream_closed(session, stream);
             break;
         case H2_SS_CLEANUP:
-            h2_mplx_stream_cleanup(session->mplx, stream);
+            h2_mplx_stream_discard(session->mplx, stream->id);
             break;
         default:
             break;
index cd08fc2429175a28894093b47af34754cc06b950..ae2c26b7692aa62b96205439441f787779d6b2e3 100644 (file)
@@ -51,7 +51,6 @@ struct h2_priority;
 struct h2_push;
 struct h2_push_diary;
 struct h2_session;
-struct h2_stream;
 struct h2_stream_monitor;
 struct h2_task;
 struct h2_workers;
@@ -192,16 +191,17 @@ int h2_session_push_enabled(h2_session *session);
  * processing..
  * 
  * @param session the session to work in
- * @param is the stream initiating the push
+ * @param initiating_stream_id id of the stream initiating this push
  * @param push the push to promise
- * @return the new promised stream or NULL
  */
-struct h2_stream *h2_session_push(h2_session *session, 
-                                  struct h2_stream *is, struct h2_push *push);
+apr_status_t h2_session_push(h2_session *session, 
+                             int initiating_stream_id, struct h2_push *push);
 
-apr_status_t h2_session_set_prio(h2_session *session, 
-                                 struct h2_stream *stream, 
-                                 const struct h2_priority *prio);
+/**
+ * Notifies the session that the EOS for a stream has been sent.
+ * See h2_bucket_eos for usage.
+ */
+void h2_session_eos_sent(h2_session *session, int stream_id);
 
 #define H2_SSSN_MSG(s, msg)     \
     "h2_session(%ld,%s,%d): "msg, s->id, h2_session_state_str(s->state), \
index 9b7d2c5655200417da385ca87bdac0cfbef07ea7..18537fff2626baf2111cf329cc2348d4814e8424 100644 (file)
@@ -88,39 +88,39 @@ const char *h2_stream_state_str(h2_stream *stream)
 
 /* state transisitions when certain frame types are sent */
 static int trans_on_send[][H2_SS_MAX] = {
-/*S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS,  S_CLN, */        
-{ S_ERR, S_ERR,  S_ERR,  S_NOP,  S_NOP,  S_ERR,  S_NOP,  S_NOP, },/* DATA */ 
-{ S_ERR, S_ERR,  S_CL_R, S_NOP,  S_NOP,  S_ERR,  S_NOP,  S_NOP, },/* HEADERS */ 
-{ S_NOP, S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP, },/* PRIORITY */    
-{ S_CLS, S_CLS,  S_CLS,  S_CLS,  S_CLS,  S_CLS,  S_NOP,  S_NOP, },/* RST_STREAM */ 
-{ S_ERR, S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, },/* SETTINGS */ 
-{ S_RS_L,S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, },/* PUSH_PROMISE */  
-{ S_ERR, S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, },/* PING */ 
-{ S_ERR, S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, },/* GOAWAY */ 
-{ S_NOP, S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP, },/* WINDOW_UPDATE */ 
-{ S_NOP, S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP, },/* CONT */ 
+/*S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS,  S_CLN, S_DSTR */        
+{ S_ERR, S_ERR,  S_ERR,  S_NOP,  S_NOP,  S_ERR,  S_NOP,  S_NOP, S_NOP, },/* DATA */ 
+{ S_ERR, S_ERR,  S_CL_R, S_NOP,  S_NOP,  S_ERR,  S_NOP,  S_NOP, S_NOP, },/* HEADERS */ 
+{ S_NOP, S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP, S_NOP, },/* PRIORITY */    
+{ S_CLS, S_CLS,  S_CLS,  S_CLS,  S_CLS,  S_CLS,  S_NOP,  S_NOP, S_NOP, },/* RST_STREAM */ 
+{ S_ERR, S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, S_NOP, },/* SETTINGS */ 
+{ S_RS_L,S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, S_NOP, },/* PUSH_PROMISE */  
+{ S_ERR, S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, S_NOP, },/* PING */ 
+{ S_ERR, S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, S_NOP, },/* GOAWAY */ 
+{ S_NOP, S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP, S_NOP, },/* WINDOW_UPDATE */ 
+{ S_NOP, S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP, S_NOP, },/* CONT */ 
 };
 /* state transisitions when certain frame types are received */
 static int trans_on_recv[][H2_SS_MAX] = {
-/*S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS,  S_CLN, */        
-{ S_ERR, S_ERR,  S_ERR,  S_NOP,  S_ERR,  S_NOP,  S_NOP,  S_NOP, },/* DATA */ 
-{ S_OPEN,S_CL_L, S_ERR,  S_NOP,  S_ERR,  S_NOP,  S_NOP,  S_NOP, },/* HEADERS */ 
-{ S_NOP, S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP, },/* PRIORITY */    
-{ S_ERR, S_CLS,  S_CLS,  S_CLS,  S_CLS,  S_CLS,  S_NOP,  S_NOP, },/* RST_STREAM */ 
-{ S_ERR, S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, },/* SETTINGS */ 
-{ S_RS_R,S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, },/* PUSH_PROMISE */  
-{ S_ERR, S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, },/* PING */ 
-{ S_ERR, S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, },/* GOAWAY */ 
-{ S_NOP, S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP, },/* WINDOW_UPDATE */ 
-{ S_NOP, S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP, },/* CONT */ 
+/*S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS,  S_CLN, S_DSTR */        
+{ S_ERR, S_ERR,  S_ERR,  S_NOP,  S_ERR,  S_NOP,  S_NOP,  S_NOP, S_NOP, },/* DATA */ 
+{ S_OPEN,S_CL_L, S_ERR,  S_NOP,  S_ERR,  S_NOP,  S_NOP,  S_NOP, S_NOP, },/* HEADERS */ 
+{ S_NOP, S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP, S_NOP, },/* PRIORITY */    
+{ S_ERR, S_CLS,  S_CLS,  S_CLS,  S_CLS,  S_CLS,  S_NOP,  S_NOP, S_NOP, },/* RST_STREAM */ 
+{ S_ERR, S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, S_NOP, },/* SETTINGS */ 
+{ S_RS_R,S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, S_NOP, },/* PUSH_PROMISE */  
+{ S_ERR, S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, S_NOP, },/* PING */ 
+{ S_ERR, S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, S_NOP, },/* GOAWAY */ 
+{ S_NOP, S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP, S_NOP, },/* WINDOW_UPDATE */ 
+{ S_NOP, S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP, S_NOP, },/* CONT */ 
 };
 /* state transisitions when certain events happen */
 static int trans_on_event[][H2_SS_MAX] = {
-/*S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS,  S_CLN, */        
-{ S_XXX, S_ERR,  S_ERR,  S_CL_L, S_CLS,  S_XXX,  S_XXX,  S_XXX, },/* EV_CLOSED_L*/
-{ S_ERR, S_ERR,  S_ERR,  S_CL_R, S_ERR,  S_CLS,  S_NOP,  S_NOP, },/* EV_CLOSED_R*/
-{ S_CLS, S_CLS,  S_CLS,  S_CLS,  S_CLS,  S_CLS,  S_NOP,  S_NOP, },/* EV_CANCELLED*/
-{ S_NOP, S_XXX,  S_XXX,  S_XXX,  S_XXX,  S_CLS,  S_CLN,  S_XXX, },/* EV_EOS_SENT*/
+/*S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS,  S_CLN, S_DSTR */        
+{ S_XXX, S_ERR,  S_ERR,  S_CL_L, S_CLS,  S_XXX,  S_XXX,  S_XXX, S_NOP, },/* EV_CLOSED_L*/
+{ S_ERR, S_ERR,  S_ERR,  S_CL_R, S_ERR,  S_CLS,  S_NOP,  S_NOP, S_NOP, },/* EV_CLOSED_R*/
+{ S_CLS, S_CLS,  S_CLS,  S_CLS,  S_CLS,  S_CLS,  S_NOP,  S_NOP, S_NOP, },/* EV_CANCELLED*/
+{ S_NOP, S_XXX,  S_XXX,  S_XXX,  S_XXX,  S_CLS,  S_CLN,  S_XXX, S_NOP, },/* EV_EOS_SENT*/
 };
 
 static int on_map(h2_stream_state_t state, int map[H2_SS_MAX])
@@ -189,9 +189,7 @@ static apr_status_t setup_input(h2_stream *stream) {
                          || APR_BRIGADE_EMPTY(stream->in_buffer)));
         if (!empty) {
             h2_beam_create(&stream->input, stream->pool, stream->id, 
-                           "input", H2_BEAM_OWNER_SEND, 0, 
-                           stream->session->s->timeout);
-            h2_beam_send_from(stream->input, stream->pool);
+                           "input", 0, stream->session->s->timeout);
         }
     }
     return APR_SUCCESS;
@@ -592,7 +590,6 @@ apr_status_t h2_stream_prep_processing(h2_stream *stream)
                       H2_STRM_MSG(stream, "schedule %s %s://%s%s chunked=%d"),
                       r->method, r->scheme, r->authority, r->path, r->chunked);
         setup_input(stream);
-        stream->scheduled = 1;
         return APR_SUCCESS;
     }
     return APR_EINVAL;
@@ -637,7 +634,7 @@ apr_status_t h2_stream_set_request_rec(h2_stream *stream,
     return status;
 }
 
-void h2_stream_set_request(h2_stream *stream, const h2_request *r)
+void h2_stream_request_set(h2_stream *stream, const h2_request *r)
 {
     ap_assert(stream->request == NULL);
     ap_assert(stream->rtmp == NULL);
@@ -953,7 +950,7 @@ apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb,
 
 apr_status_t h2_stream_submit_pushes(h2_stream *stream, h2_headers *response)
 {
-    apr_status_t status = APR_SUCCESS;
+    apr_status_t rv = APR_SUCCESS;
     apr_array_header_t *pushes;
     int i;
     
@@ -962,16 +959,12 @@ apr_status_t h2_stream_submit_pushes(h2_stream *stream, h2_headers *response)
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
                       H2_STRM_MSG(stream, "found %d push candidates"),
                       pushes->nelts);
-        for (i = 0; i < pushes->nelts; ++i) {
+        for (i = 0; i < pushes->nelts && (APR_SUCCESS == rv); ++i) {
             h2_push *push = APR_ARRAY_IDX(pushes, i, h2_push*);
-            h2_stream *s = h2_session_push(stream->session, stream, push);
-            if (!s) {
-                status = APR_ECONNRESET;
-                break;
-            }
+            rv = h2_session_push(stream->session, stream->id, push);
         }
     }
-    return status;
+    return rv;
 }
 
 apr_table_t *h2_stream_get_trailers(h2_stream *stream)
index 7ecc0ad6bcf69517d44aaa50dc5e4b4df6032c3c..8d6146fa4c274611d77dc7f436d3fee941aedb80 100644 (file)
@@ -167,13 +167,14 @@ void h2_stream_cleanup(h2_stream *stream);
 apr_status_t h2_stream_in_consumed(h2_stream *stream, apr_off_t amount);
 
 /**
- * Set complete stream headers from given h2_request.
+ * Set complete stream headers from given h2_request, creates a deep copy.
+ * Only to be called once to initialize.
  * 
  * @param stream stream to write request to
  * @param r the request with all the meta data
  * @param eos != 0 iff stream input is closed
  */
-void h2_stream_set_request(h2_stream *stream, const h2_request *r);
+void h2_stream_request_set(h2_stream *stream, const h2_request *r);
 
 /**
  * Set complete stream header from given request_rec.
index 8d3dc6fde8d7790bb1c95523e2e3c7c47b73fa07..949c0b97ef1bd6406b9053903e60b2194b6f8e42 100644 (file)
@@ -44,7 +44,6 @@
 #include "h2_request.h"
 #include "h2_headers.h"
 #include "h2_session.h"
-#include "h2_stream.h"
 #include "h2_task.h"
 #include "h2_util.h"
 
@@ -493,6 +492,14 @@ static int h2_task_pre_conn(conn_rec* c, void *arg)
     return OK;
 }
 
+static apr_status_t task_pool_cleanup(void *data)
+{
+    h2_task *task = data;
+    
+    ap_assert(task->destroyed);
+    return APR_SUCCESS;
+}
+
 h2_task *h2_task_create(conn_rec *slave, int stream_id,
                         const h2_request *req, h2_mplx *m,
                         h2_bucket_beam *input, 
@@ -521,13 +528,15 @@ h2_task *h2_task_create(conn_rec *slave, int stream_id,
     task->input.beam  = input;
     task->output.max_buffer = output_max_mem;
 
+    apr_pool_cleanup_register(pool, task, task_pool_cleanup, apr_pool_cleanup_null);
+
     return task;
 }
 
 void h2_task_destroy(h2_task *task)
 {
+    task->destroyed = 1;
     if (task->output.beam) {
-        h2_beam_log(task->output.beam, task->c, APLOG_TRACE2, "task_destroy");
         h2_beam_destroy(task->output.beam);
         task->output.beam = NULL;
     }
@@ -584,13 +593,12 @@ apr_status_t h2_task_do(h2_task *task, apr_thread_t *thread, int worker_id)
     }
         
     h2_beam_create(&task->output.beam, c->pool, task->stream_id, "output", 
-                   H2_BEAM_OWNER_SEND, 0, task->timeout);
+                   0, task->timeout);
     if (!task->output.beam) {
         return APR_ENOMEM;
     }
     
     h2_beam_buffer_size_set(task->output.beam, task->output.max_buffer);
-    h2_beam_send_from(task->output.beam, task->pool);
     
     h2_ctx_create_for(c, task);
     apr_table_setn(c->notes, H2_TASK_ID_NOTE, task->id);
@@ -708,3 +716,4 @@ static int h2_task_process_conn(conn_rec* c)
     return DECLINED;
 }
 
+
index 4121d0fd69c50c862cbf60ddaa51554bad265d7b..80c6640804f644d8a52354a66ab798f3dc59d02c 100644 (file)
@@ -87,10 +87,11 @@ struct h2_task {
     apr_time_t started_at;           /* when processing started */
     apr_time_t done_at;              /* when processing was done */
     apr_bucket *eor;
+    int destroyed;
 };
 
 h2_task *h2_task_create(conn_rec *slave, int stream_id,
-                        const h2_request *req, struct h2_mplx *m, 
+                        const struct h2_request *req, struct h2_mplx *m, 
                         struct h2_bucket_beam *input, 
                         apr_interval_time_t timeout,
                         apr_size_t output_max_mem);
index 286e98f1cecb6b7adf03e9c804aef6b9a6600af4..498ad139505b9fa152fdbba4d0436d1e5b802de9 100644 (file)
@@ -27,7 +27,7 @@
  * @macro
  * Version number of the http2 module as c string
  */
-#define MOD_HTTP2_VERSION "1.14.1-git"
+#define MOD_HTTP2_VERSION "1.15.0-git"
 
 /**
  * @macro
@@ -35,7 +35,7 @@
  * release. This is a 24 bit number with 8 bits for major number, 8 bits
  * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
  */
-#define MOD_HTTP2_VERSION_NUM 0x010e01
+#define MOD_HTTP2_VERSION_NUM 0x010f00
 
 
 #endif /* mod_h2_h2_version_h */
index 699f533f804813ae73ab47672b7641b58f27110a..5ee934abf020b4ba513cc9ab9dc29fd31be1a863 100644 (file)
@@ -83,7 +83,7 @@ static apr_status_t activate_slot(h2_workers *workers, h2_slot *slot)
 
     if (!slot->lock) {
         status = apr_thread_mutex_create(&slot->lock,
-                                         APR_THREAD_MUTEX_DEFAULT,
+                                         APR_THREAD_MUTEX_NESTED,
                                          workers->pool);
         if (status != APR_SUCCESS) {
             push_slot(&workers->free, slot);
@@ -336,7 +336,7 @@ h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool,
     }
     
     status = apr_thread_mutex_create(&workers->lock,
-                                     APR_THREAD_MUTEX_DEFAULT,
+                                     APR_THREAD_MUTEX_NESTED,
                                      workers->pool);
     if (status == APR_SUCCESS) {        
         n = workers->nslots = workers->max_workers;
index 220870799021000b1a5b78f79903efb6dcd8077b..83ae431c87594ee91802e795cd12550589ada914 100644 (file)
@@ -403,6 +403,14 @@ run_connect:
          */
         apr_table_setn(ctx->p_conn->connection->notes,
                        "proxy-request-alpn-protos", "h2");
+        if (ctx->p_conn->ssl_hostname) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, ctx->owner,
+                          "set SNI to %s for (%s)",
+                          ctx->p_conn->ssl_hostname,
+                          ctx->p_conn->hostname);
+            apr_table_setn(ctx->p_conn->connection->notes,
+                           "proxy-request-hostname", ctx->p_conn->ssl_hostname);
+        }
     }
 
     if (ctx->master->aborted) goto cleanup;