]> granicus.if.org Git - apache/blobdiff - modules/http2/h2_bucket_beam.c
On the 2.4.x branch:
[apache] / modules / http2 / h2_bucket_beam.c
index 6d41fadc5bbf6356402677bec145d7f676a973fd..6b6750d1fa17eab440a58175b697f894b03d283b 100644 (file)
@@ -1,19 +1,21 @@
-/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * http://www.apache.org/licenses/LICENSE-2.0
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 #include <apr_lib.h>
+#include <apr_atomic.h>
 #include <apr_strings.h>
 #include <apr_time.h>
 #include <apr_buckets.h>
@@ -21,6 +23,7 @@
 #include <apr_thread_cond.h>
 
 #include <httpd.h>
+#include <http_protocol.h>
 #include <http_log.h>
 
 #include "h2_private.h"
@@ -66,7 +69,7 @@ struct h2_beam_proxy {
     apr_bucket_refcount refcount;
     APR_RING_ENTRY(h2_beam_proxy) link;
     h2_bucket_beam *beam;
-    apr_bucket *bred;
+    apr_bucket *bsender;
     apr_size_t n;
 };
 
@@ -76,9 +79,9 @@ static apr_status_t beam_bucket_read(apr_bucket *b, const char **str,
                                      apr_size_t *len, apr_read_type_e block)
 {
     h2_beam_proxy *d = b->data;
-    if (d->bred) {
+    if (d->bsender) {
         const char *data;
-        apr_status_t status = apr_bucket_read(d->bred, &data, len, block);
+        apr_status_t status = apr_bucket_read(d->bsender, &data, len, block);
         if (status == APR_SUCCESS) {
             *str = data + b->start;
             *len = b->length;
@@ -109,24 +112,24 @@ static void beam_bucket_destroy(void *data)
 
 static apr_bucket * h2_beam_bucket_make(apr_bucket *b, 
                                         h2_bucket_beam *beam,
-                                        apr_bucket *bred, apr_size_t n)
+                                        apr_bucket *bsender, apr_size_t n)
 {
     h2_beam_proxy *d;
 
     d = apr_bucket_alloc(sizeof(*d), b->list);
     H2_BPROXY_LIST_INSERT_TAIL(&beam->proxies, d);
     d->beam = beam;
-    d->bred = bred;
+    d->bsender = bsender;
     d->n = n;
     
-    b = apr_bucket_shared_make(b, d, 0, bred? bred->length : 0);
+    b = apr_bucket_shared_make(b, d, 0, bsender? bsender->length : 0);
     b->type = &h2_bucket_type_beam;
 
     return b;
 }
 
 static apr_bucket *h2_beam_bucket_create(h2_bucket_beam *beam,
-                                         apr_bucket *bred,
+                                         apr_bucket *bsender,
                                          apr_bucket_alloc_t *list,
                                          apr_size_t n)
 {
@@ -135,28 +138,9 @@ static apr_bucket *h2_beam_bucket_create(h2_bucket_beam *beam,
     APR_BUCKET_INIT(b);
     b->free = apr_bucket_free;
     b->list = list;
-    return h2_beam_bucket_make(b, beam, bred, n);
+    return h2_beam_bucket_make(b, beam, bsender, n);
 }
 
-/*static apr_status_t beam_bucket_setaside(apr_bucket *b, apr_pool_t *pool)
-{
-    apr_status_t status = APR_SUCCESS;
-    h2_beam_proxy *d = b->data;
-    if (d->bred) {
-        const char *data;
-        apr_size_t len;
-        
-        status = apr_bucket_read(d->bred, &data, &len, APR_BLOCK_READ);
-        if (status == APR_SUCCESS) {
-            b = apr_bucket_heap_make(b, (char *)data + b->start, b->length, NULL);
-            if (b == NULL) {
-                return APR_ENOMEM;
-            }
-        }
-    }
-    return status;
-}*/
-
 const apr_bucket_type_t h2_bucket_type_beam = {
     "BEAM", 5, APR_BUCKET_DATA,
     beam_bucket_destroy,
@@ -169,47 +153,65 @@ const apr_bucket_type_t h2_bucket_type_beam = {
 /*******************************************************************************
  * h2_blist, a brigade without allocations
  ******************************************************************************/
-apr_size_t h2_util_bl_print(char *buffer, apr_size_t bmax, 
-                            const char *tag, const char *sep, 
-                            h2_blist *bl)
+
+static apr_array_header_t *beamers;
+
+static apr_status_t cleanup_beamers(void *dummy)
 {
-    apr_size_t off = 0;
-    const char *sp = "";
-    apr_bucket *b;
-    
-    if (bl) {
-        memset(buffer, 0, bmax--);
-        off += apr_snprintf(buffer+off, bmax-off, "%s(", tag);
-        for (b = H2_BLIST_FIRST(bl); 
-             bmax && (b != H2_BLIST_SENTINEL(bl));
-             b = APR_BUCKET_NEXT(b)) {
+    (void)dummy;
+    beamers = NULL;
+    return APR_SUCCESS;
+}
+
+void h2_register_bucket_beamer(h2_bucket_beamer *beamer)
+{
+    if (!beamers) {
+        apr_pool_cleanup_register(apr_hook_global_pool, NULL,
+                                  cleanup_beamers, apr_pool_cleanup_null);
+        beamers = apr_array_make(apr_hook_global_pool, 10, 
+                                 sizeof(h2_bucket_beamer*));
+    }
+    APR_ARRAY_PUSH(beamers, h2_bucket_beamer*) = beamer;
+}
+
+static apr_bucket *h2_beam_bucket(h2_bucket_beam *beam, 
+                                  apr_bucket_brigade *dest,
+                                  const apr_bucket *src)
+{
+    apr_bucket *b = NULL;
+    int i;
+    if (beamers) {
+        for (i = 0; i < beamers->nelts && b == NULL; ++i) {
+            h2_bucket_beamer *beamer;
             
-            off += h2_util_bucket_print(buffer+off, bmax-off, b, sp);
-            sp = " ";
+            beamer = APR_ARRAY_IDX(beamers, i, h2_bucket_beamer*);
+            b = beamer(beam, dest, src);
         }
-        off += apr_snprintf(buffer+off, bmax-off, ")%s", sep);
-    }
-    else {
-        off += apr_snprintf(buffer+off, bmax-off, "%s(null)%s", tag, sep);
     }
-    return off;
+    return b;
 }
 
 
-
 /*******************************************************************************
  * bucket beam that can transport buckets across threads
  ******************************************************************************/
 
+static void mutex_leave(void *ctx, apr_thread_mutex_t *lock)
+{
+    apr_thread_mutex_unlock(lock);
+}
+
+static apr_status_t mutex_enter(void *ctx, h2_beam_lock *pbl)
+{
+    h2_bucket_beam *beam = ctx;
+    pbl->mutex = beam->lock;
+    pbl->leave = mutex_leave;
+    return apr_thread_mutex_lock(pbl->mutex);
+}
+
 static apr_status_t enter_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl)
 {
-    if (beam->m_enter) {
-        return beam->m_enter(beam->m_ctx, pbl);
-    }
-    pbl->mutex = NULL;
-    pbl->leave = NULL;
-    return APR_SUCCESS;
+    return mutex_enter(beam, pbl);
 }
 
 static void leave_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl)
@@ -219,18 +221,65 @@ static void leave_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl)
     }
 }
 
-static apr_off_t calc_buffered(h2_bucket_beam *beam)
+static apr_off_t bucket_mem_used(apr_bucket *b)
+{
+    if (APR_BUCKET_IS_FILE(b)) {
+        return 0;
+    }
+    else {
+        /* should all have determinate length */
+        return b->length;
+    }
+}
+
+static int report_consumption(h2_bucket_beam *beam, h2_beam_lock *pbl)
+{
+    int rv = 0;
+    apr_off_t len = beam->received_bytes - beam->cons_bytes_reported;
+    h2_beam_io_callback *cb = beam->cons_io_cb;
+     
+    if (len > 0) {
+        if (cb) {
+            void *ctx = beam->cons_ctx;
+            
+            if (pbl) leave_yellow(beam, pbl);
+            cb(ctx, beam, len);
+            if (pbl) enter_yellow(beam, pbl);
+            rv = 1;
+        }
+        beam->cons_bytes_reported += len;
+    }
+    return rv;
+}
+
+static void report_prod_io(h2_bucket_beam *beam, int force, h2_beam_lock *pbl)
+{
+    apr_off_t len = beam->sent_bytes - beam->prod_bytes_reported;
+    if (force || len > 0) {
+        h2_beam_io_callback *cb = beam->prod_io_cb; 
+        if (cb) {
+            void *ctx = beam->prod_ctx;
+            
+            leave_yellow(beam, pbl);
+            cb(ctx, beam, len);
+            enter_yellow(beam, pbl);
+        }
+        beam->prod_bytes_reported += len;
+    }
+}
+
+static apr_size_t calc_buffered(h2_bucket_beam *beam)
 {
-    apr_off_t len = 0;
+    apr_size_t len = 0;
     apr_bucket *b;
-    for (b = H2_BLIST_FIRST(&beam->red); 
-         b != H2_BLIST_SENTINEL(&beam->red);
+    for (b = H2_BLIST_FIRST(&beam->send_list); 
+         b != H2_BLIST_SENTINEL(&beam->send_list);
          b = APR_BUCKET_NEXT(b)) {
         if (b->length == ((apr_size_t)-1)) {
             /* do not count */
         }
         else if (APR_BUCKET_IS_FILE(b)) {
-            /* if unread, has no real mem footprint. how to test? */
+            /* if unread, has no real mem footprint. */
         }
         else {
             len += b->length;
@@ -239,14 +288,14 @@ static apr_off_t calc_buffered(h2_bucket_beam *beam)
     return len;
 }
 
-static void r_purge_reds(h2_bucket_beam *beam)
+static void r_purge_sent(h2_bucket_beam *beam)
 {
-    apr_bucket *bred;
-    /* delete all red buckets in purge brigade, needs to be called
-     * from red thread only */
-    while (!H2_BLIST_EMPTY(&beam->purge)) {
-        bred = H2_BLIST_FIRST(&beam->purge);
-        apr_bucket_delete(bred);
+    apr_bucket *b;
+    /* delete all sender buckets in purge brigade, needs to be called
+     * from sender thread only */
+    while (!H2_BLIST_EMPTY(&beam->purge_list)) {
+        b = H2_BLIST_FIRST(&beam->purge_list);
+        apr_bucket_delete(b);
     }
 }
 
@@ -259,30 +308,80 @@ static apr_size_t calc_space_left(h2_bucket_beam *beam)
     return APR_SIZE_MAX;
 }
 
-static apr_status_t wait_cond(h2_bucket_beam *beam, apr_thread_mutex_t *lock)
+static int buffer_is_empty(h2_bucket_beam *beam)
 {
-    if (beam->timeout > 0) {
-        return apr_thread_cond_timedwait(beam->m_cond, lock, beam->timeout);
+    return ((!beam->recv_buffer || APR_BRIGADE_EMPTY(beam->recv_buffer))
+            && H2_BLIST_EMPTY(&beam->send_list));
+}
+
+static apr_status_t wait_empty(h2_bucket_beam *beam, apr_read_type_e block,  
+                               apr_thread_mutex_t *lock)
+{
+    apr_status_t rv = APR_SUCCESS;
+    
+    while (!buffer_is_empty(beam) && APR_SUCCESS == rv) {
+        if (APR_BLOCK_READ != block || !lock) {
+            rv = APR_EAGAIN;
+        }
+        else if (beam->timeout > 0) {
+            rv = apr_thread_cond_timedwait(beam->change, lock, beam->timeout);
+        }
+        else {
+            rv = apr_thread_cond_wait(beam->change, lock);
+        }
     }
-    else {
-        return apr_thread_cond_wait(beam->m_cond, lock);
+    return rv;
+}
+
+static apr_status_t wait_not_empty(h2_bucket_beam *beam, apr_read_type_e block,  
+                                   apr_thread_mutex_t *lock)
+{
+    apr_status_t rv = APR_SUCCESS;
+    
+    while (buffer_is_empty(beam) && APR_SUCCESS == rv) {
+        if (beam->aborted) {
+            rv = APR_ECONNABORTED;
+        }
+        else if (beam->closed) {
+            rv = APR_EOF;
+        }
+        else if (APR_BLOCK_READ != block || !lock) {
+            rv = APR_EAGAIN;
+        }
+        else if (beam->timeout > 0) {
+            rv = apr_thread_cond_timedwait(beam->change, lock, beam->timeout);
+        }
+        else {
+            rv = apr_thread_cond_wait(beam->change, lock);
+        }
     }
+    return rv;
 }
 
-static apr_status_t r_wait_space(h2_bucket_beam *beam, apr_read_type_e block,
-                                 h2_beam_lock *pbl, apr_off_t *premain) 
+static apr_status_t wait_not_full(h2_bucket_beam *beam, apr_read_type_e block, 
+                                  apr_size_t *pspace_left, h2_beam_lock *bl)
 {
-    *premain = calc_space_left(beam);
-    while (!beam->aborted && *premain <= 0 
-           && (block == APR_BLOCK_READ) && pbl->mutex) {
-        apr_status_t status = wait_cond(beam, pbl->mutex);
-        if (APR_STATUS_IS_TIMEUP(status)) {
-            return status;
+    apr_status_t rv = APR_SUCCESS;
+    apr_size_t left;
+    
+    while (0 == (left = calc_space_left(beam)) && APR_SUCCESS == rv) {
+        if (beam->aborted) {
+            rv = APR_ECONNABORTED;
+        }
+        else if (block != APR_BLOCK_READ || !bl->mutex) {
+            rv = APR_EAGAIN;
+        }
+        else {
+            if (beam->timeout > 0) {
+                rv = apr_thread_cond_timedwait(beam->change, bl->mutex, beam->timeout);
+            }
+            else {
+                rv = apr_thread_cond_wait(beam->change, bl->mutex);
+            }
         }
-        r_purge_reds(beam);
-        *premain = calc_space_left(beam);
     }
-    return beam->aborted? APR_ECONNABORTED : APR_SUCCESS;
+    *pspace_left = left;
+    return rv;
 }
 
 static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy)
@@ -294,34 +393,34 @@ static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy)
         /* even when beam buckets are split, only the one where
          * refcount drops to 0 will call us */
         H2_BPROXY_REMOVE(proxy);
-        /* invoked from green thread, the last beam bucket for the red
-         * bucket bred is about to be destroyed.
+        /* invoked from receiver thread, the last beam bucket for the send
+         * bucket is about to be destroyed.
          * remove it from the hold, where it should be now */
-        if (proxy->bred) {
-            for (b = H2_BLIST_FIRST(&beam->hold); 
-                 b != H2_BLIST_SENTINEL(&beam->hold);
+        if (proxy->bsender) {
+            for (b = H2_BLIST_FIRST(&beam->hold_list); 
+                 b != H2_BLIST_SENTINEL(&beam->hold_list);
                  b = APR_BUCKET_NEXT(b)) {
-                 if (b == proxy->bred) {
+                 if (b == proxy->bsender) {
                     break;
                  }
             }
-            if (b != H2_BLIST_SENTINEL(&beam->hold)) {
+            if (b != H2_BLIST_SENTINEL(&beam->hold_list)) {
                 /* bucket is in hold as it should be, mark this one
                  * and all before it for purging. We might have placed meta
-                 * buckets without a green proxy into the hold before it 
+                 * buckets without a receiver proxy into the hold before it 
                  * and schedule them for purging now */
-                for (b = H2_BLIST_FIRST(&beam->hold); 
-                     b != H2_BLIST_SENTINEL(&beam->hold);
+                for (b = H2_BLIST_FIRST(&beam->hold_list); 
+                     b != H2_BLIST_SENTINEL(&beam->hold_list);
                      b = next) {
                     next = APR_BUCKET_NEXT(b);
-                    if (b == proxy->bred) {
+                    if (b == proxy->bsender) {
                         APR_BUCKET_REMOVE(b);
-                        H2_BLIST_INSERT_TAIL(&beam->purge, b);
+                        H2_BLIST_INSERT_TAIL(&beam->purge_list, b);
                         break;
                     }
                     else if (APR_BUCKET_IS_METADATA(b)) {
                         APR_BUCKET_REMOVE(b);
-                        H2_BLIST_INSERT_TAIL(&beam->purge, b);
+                        H2_BLIST_INSERT_TAIL(&beam->purge_list, b);
                     }
                     else {
                         /* another data bucket before this one in hold. this
@@ -330,37 +429,28 @@ static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy)
                     }
                 }
                 
-                proxy->bred = NULL;
+                proxy->bsender = NULL;
             }
             else {
                 /* it should be there unless we screwed up */
-                ap_log_perror(APLOG_MARK, APLOG_WARNING, 0, beam->red_pool, 
-                              APLOGNO() "h2_beam(%d-%s): emitted bucket not "
+                ap_log_perror(APLOG_MARK, APLOG_WARNING, 0, beam->send_pool, 
+                              APLOGNO(03384) "h2_beam(%d-%s): emitted bucket not "
                               "in hold, n=%d", beam->id, beam->tag, 
                               (int)proxy->n);
-                AP_DEBUG_ASSERT(!proxy->bred);
+                ap_assert(!proxy->bsender);
             }
         }
         /* notify anyone waiting on space to become available */
         if (!bl.mutex) {
-            r_purge_reds(beam);
+            r_purge_sent(beam);
         }
-        else if (beam->m_cond) {
-            apr_thread_cond_broadcast(beam->m_cond);
+        else {
+            apr_thread_cond_broadcast(beam->change);
         }
         leave_yellow(beam, &bl);
     }
 }
 
-static void report_consumption(h2_bucket_beam *beam)
-{
-    if (beam->consumed_fn && (beam->received_bytes != beam->reported_bytes)) {
-        beam->consumed_fn(beam->consumed_ctx, beam, 
-                          beam->received_bytes - beam->reported_bytes);
-        beam->reported_bytes = beam->received_bytes;
-    }
-}
-
 static void h2_blist_cleanup(h2_blist *bl)
 {
     apr_bucket *e;
@@ -375,58 +465,197 @@ static apr_status_t beam_close(h2_bucket_beam *beam)
 {
     if (!beam->closed) {
         beam->closed = 1;
-        if (beam->m_cond) {
-            apr_thread_cond_broadcast(beam->m_cond);
-        }
+        apr_thread_cond_broadcast(beam->change);
+    }
+    return APR_SUCCESS;
+}
+
+int h2_beam_is_closed(h2_bucket_beam *beam)
+{
+    return beam->closed;
+}
+
+static int pool_register(h2_bucket_beam *beam, apr_pool_t *pool, 
+                         apr_status_t (*cleanup)(void *))
+{
+    if (pool && pool != beam->pool) {
+        apr_pool_pre_cleanup_register(pool, beam, cleanup);
+        return 1;
     }
+    return 0;
+}
+
+static int pool_kill(h2_bucket_beam *beam, apr_pool_t *pool,
+                     apr_status_t (*cleanup)(void *)) {
+    if (pool && pool != beam->pool) {
+        apr_pool_cleanup_kill(pool, beam, cleanup);
+        return 1;
+    }
+    return 0;
+}
+
+static apr_status_t beam_recv_cleanup(void *data)
+{
+    h2_bucket_beam *beam = data;
+    /* receiver pool has gone away, clear references */
+    beam->recv_buffer = NULL;
+    beam->recv_pool = NULL;
     return APR_SUCCESS;
 }
 
-static apr_status_t beam_cleanup(void *data)
+static apr_status_t beam_send_cleanup(void *data)
 {
     h2_bucket_beam *beam = data;
+    /* sender is going away, clear up all references to its memory */
+    r_purge_sent(beam);
+    h2_blist_cleanup(&beam->send_list);
+    report_consumption(beam, NULL);
+    while (!H2_BPROXY_LIST_EMPTY(&beam->proxies)) {
+        h2_beam_proxy *proxy = H2_BPROXY_LIST_FIRST(&beam->proxies);
+        H2_BPROXY_REMOVE(proxy);
+        proxy->beam = NULL;
+        proxy->bsender = NULL;
+    }
+    h2_blist_cleanup(&beam->purge_list);
+    h2_blist_cleanup(&beam->hold_list);
+    beam->send_pool = NULL;
+    return APR_SUCCESS;
+}
+
+static void beam_set_send_pool(h2_bucket_beam *beam, apr_pool_t *pool) 
+{
+    if (beam->send_pool != pool) {
+        if (beam->send_pool && beam->send_pool != beam->pool) {
+            pool_kill(beam, beam->send_pool, beam_send_cleanup);
+            beam_send_cleanup(beam);
+        }
+        beam->send_pool = pool;
+        pool_register(beam, beam->send_pool, beam_send_cleanup);
+    }
+}
+
+static void recv_buffer_cleanup(h2_bucket_beam *beam, h2_beam_lock *bl)
+{
+    if (beam->recv_buffer && !APR_BRIGADE_EMPTY(beam->recv_buffer)) {
+        apr_bucket_brigade *bb = beam->recv_buffer;
+        apr_off_t bblen = 0;
+        
+        beam->recv_buffer = NULL;
+        apr_brigade_length(bb, 0, &bblen);
+        beam->received_bytes += bblen;
+        
+        /* need to do this unlocked since bucket destroy might 
+         * call this beam again. */
+        if (bl) leave_yellow(beam, bl);
+        apr_brigade_destroy(bb);
+        if (bl) enter_yellow(beam, bl);
+        
+        if (beam->cons_ev_cb) { 
+            beam->cons_ev_cb(beam->cons_ctx, beam);
+        }
+    }
+}
+
+static apr_status_t beam_cleanup(h2_bucket_beam *beam, int from_pool)
+{
+    apr_status_t status = APR_SUCCESS;
+    int safe_send = (beam->owner == H2_BEAM_OWNER_SEND);
+    int safe_recv = (beam->owner == H2_BEAM_OWNER_RECV);
     
-    beam_close(beam);
-    r_purge_reds(beam);
-    h2_blist_cleanup(&beam->red);
-    report_consumption(beam);
-    h2_blist_cleanup(&beam->purge);
-    h2_blist_cleanup(&beam->hold);
+    /* 
+     * Owner of the beam is going away, depending on which side it owns,
+     * cleanup strategies will differ.
+     *
+     * In general, receiver holds references to memory from sender. 
+     * Clean up receiver first, if safe, then cleanup sender, if safe.
+     */
+     
+     /* When called from pool destroy, io callbacks are disabled */
+     if (from_pool) {
+         beam->cons_io_cb = NULL;
+     }
+     
+    /* When modify send is not safe, this means we still have multi-thread
+     * protection and the owner is receiving the buckets. If the sending
+     * side has not gone away, this means we could have dangling buckets
+     * in our lists that never get destroyed. This should not happen. */
+    ap_assert(safe_send || !beam->send_pool);
+    if (!H2_BLIST_EMPTY(&beam->send_list)) {
+        ap_assert(beam->send_pool);
+    }
     
-    return APR_SUCCESS;
+    if (safe_recv) {
+        if (beam->recv_pool) {
+            pool_kill(beam, beam->recv_pool, beam_recv_cleanup);
+            beam->recv_pool = NULL;
+        }
+        recv_buffer_cleanup(beam, NULL);
+    }
+    else {
+        beam->recv_buffer = NULL;
+        beam->recv_pool = NULL;
+    }
+    
+    if (safe_send && beam->send_pool) {
+        pool_kill(beam, beam->send_pool, beam_send_cleanup);
+        status = beam_send_cleanup(beam);
+    }
+    
+    if (safe_recv) {
+        ap_assert(H2_BPROXY_LIST_EMPTY(&beam->proxies));
+        ap_assert(H2_BLIST_EMPTY(&beam->send_list));
+        ap_assert(H2_BLIST_EMPTY(&beam->hold_list));
+        ap_assert(H2_BLIST_EMPTY(&beam->purge_list));
+    }
+    return status;
+}
+
+static apr_status_t beam_pool_cleanup(void *data)
+{
+    return beam_cleanup(data, 1);
 }
 
 apr_status_t h2_beam_destroy(h2_bucket_beam *beam)
 {
-    apr_pool_cleanup_kill(beam->red_pool, beam, beam_cleanup);
-    return beam_cleanup(beam);
+    apr_pool_cleanup_kill(beam->pool, beam, beam_pool_cleanup);
+    return beam_cleanup(beam, 0);
 }
 
-apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *red_pool, 
+apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *pool, 
                             int id, const char *tag, 
-                            apr_size_t max_buf_size)
+                            h2_beam_owner_t owner,
+                            apr_size_t max_buf_size,
+                            apr_interval_time_t timeout)
 {
     h2_bucket_beam *beam;
-    apr_status_t status = APR_SUCCESS;
+    apr_status_t rv = APR_SUCCESS;
     
-    beam = apr_pcalloc(red_pool, sizeof(*beam));
+    beam = apr_pcalloc(pool, sizeof(*beam));
     if (!beam) {
         return APR_ENOMEM;
     }
 
     beam->id = id;
     beam->tag = tag;
-    H2_BLIST_INIT(&beam->red);
-    H2_BLIST_INIT(&beam->hold);
-    H2_BLIST_INIT(&beam->purge);
+    beam->pool = pool;
+    beam->owner = owner;
+    H2_BLIST_INIT(&beam->send_list);
+    H2_BLIST_INIT(&beam->hold_list);
+    H2_BLIST_INIT(&beam->purge_list);
     H2_BPROXY_LIST_INIT(&beam->proxies);
-    beam->red_pool = red_pool;
+    beam->tx_mem_limits = 1;
     beam->max_buf_size = max_buf_size;
-
-    apr_pool_pre_cleanup_register(red_pool, beam, beam_cleanup);
-    *pbeam = beam;
-    
-    return status;
+    beam->timeout = timeout;
+
+    rv = apr_thread_mutex_create(&beam->lock, APR_THREAD_MUTEX_DEFAULT, pool);
+    if (APR_SUCCESS == rv) {
+        rv = apr_thread_cond_create(&beam->change, pool);
+        if (APR_SUCCESS == rv) {
+            apr_pool_pre_cleanup_register(pool, beam, beam_pool_cleanup);
+            *pbeam = beam;
+        }
+    }
+    return rv;
 }
 
 void h2_beam_buffer_size_set(h2_bucket_beam *beam, apr_size_t buffer_size)
@@ -444,28 +673,13 @@ apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam)
     h2_beam_lock bl;
     apr_size_t buffer_size = 0;
     
-    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
+    if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
         buffer_size = beam->max_buf_size;
         leave_yellow(beam, &bl);
     }
     return buffer_size;
 }
 
-void h2_beam_mutex_set(h2_bucket_beam *beam, 
-                       h2_beam_mutex_enter m_enter,
-                       apr_thread_cond_t *cond,
-                       void *m_ctx)
-{
-    h2_beam_lock bl;
-    
-    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
-        beam->m_enter = m_enter;
-        beam->m_ctx   = m_ctx;
-        beam->m_cond  = cond;
-        leave_yellow(beam, &bl);
-    }
-}
-
 void h2_beam_timeout_set(h2_bucket_beam *beam, apr_interval_time_t timeout)
 {
     h2_beam_lock bl;
@@ -492,14 +706,14 @@ void h2_beam_abort(h2_bucket_beam *beam)
 {
     h2_beam_lock bl;
     
-    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
-        r_purge_reds(beam);
-        h2_blist_cleanup(&beam->red);
-        beam->aborted = 1;
-        report_consumption(beam);
-        if (beam->m_cond) {
-            apr_thread_cond_broadcast(beam->m_cond);
+    if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
+        if (!beam->aborted) {
+            beam->aborted = 1;
+            r_purge_sent(beam);
+            h2_blist_cleanup(&beam->send_list);
+            report_consumption(beam, &bl);
         }
+        apr_thread_cond_broadcast(beam->change);
         leave_yellow(beam, &bl);
     }
 }
@@ -508,136 +722,146 @@ apr_status_t h2_beam_close(h2_bucket_beam *beam)
 {
     h2_beam_lock bl;
     
-    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
-        r_purge_reds(beam);
+    if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
+        r_purge_sent(beam);
         beam_close(beam);
-        report_consumption(beam);
+        report_consumption(beam, &bl);
         leave_yellow(beam, &bl);
     }
     return beam->aborted? APR_ECONNABORTED : APR_SUCCESS;
 }
 
-apr_status_t h2_beam_shutdown(h2_bucket_beam *beam, apr_read_type_e block)
+apr_status_t h2_beam_leave(h2_bucket_beam *beam)
+{
+    h2_beam_lock bl;
+    
+    if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
+        recv_buffer_cleanup(beam, &bl);
+        beam->aborted = 1;
+        beam_close(beam);
+        leave_yellow(beam, &bl);
+    }
+    return APR_SUCCESS;
+}
+
+apr_status_t h2_beam_wait_empty(h2_bucket_beam *beam, apr_read_type_e block)
 {
     apr_status_t status;
     h2_beam_lock bl;
     
     if ((status = enter_yellow(beam, &bl)) == APR_SUCCESS) {
-        r_purge_reds(beam);
-        h2_blist_cleanup(&beam->red);
-        beam_close(beam);
-        report_consumption(beam);
-        
-        while (status == APR_SUCCESS 
-               && (!H2_BPROXY_LIST_EMPTY(&beam->proxies)
-                   || (beam->green && !APR_BRIGADE_EMPTY(beam->green)))) {
-            if (block == APR_NONBLOCK_READ || !bl.mutex) {
-                status = APR_EAGAIN;
-                break;
-            }
-            status = wait_cond(beam, bl.mutex);
-        }
+        status = wait_empty(beam, block, bl.mutex);
         leave_yellow(beam, &bl);
     }
     return status;
 }
 
+static void move_to_hold(h2_bucket_beam *beam, 
+                         apr_bucket_brigade *sender_bb)
+{
+    apr_bucket *b;
+    while (sender_bb && !APR_BRIGADE_EMPTY(sender_bb)) {
+        b = APR_BRIGADE_FIRST(sender_bb);
+        APR_BUCKET_REMOVE(b);
+        H2_BLIST_INSERT_TAIL(&beam->send_list, b);
+    }
+}
+
 static apr_status_t append_bucket(h2_bucket_beam *beam, 
-                                  apr_bucket *bred,
+                                  apr_bucket *b,
                                   apr_read_type_e block,
-                                  apr_pool_t *pool,
+                                  apr_size_t *pspace_left,
                                   h2_beam_lock *pbl)
 {
     const char *data;
     apr_size_t len;
-    apr_off_t space_left = 0;
     apr_status_t status;
+    int can_beam, check_len;
+    
+    if (beam->aborted) {
+        return APR_ECONNABORTED;
+    }
     
-    if (APR_BUCKET_IS_METADATA(bred)) {
-        if (APR_BUCKET_IS_EOS(bred)) {
+    if (APR_BUCKET_IS_METADATA(b)) {
+        if (APR_BUCKET_IS_EOS(b)) {
             beam->closed = 1;
         }
-        APR_BUCKET_REMOVE(bred);
-        H2_BLIST_INSERT_TAIL(&beam->red, bred);
+        APR_BUCKET_REMOVE(b);
+        H2_BLIST_INSERT_TAIL(&beam->send_list, b);
         return APR_SUCCESS;
     }
-    else if (APR_BUCKET_IS_FILE(bred)) {
-        /* file bucket lengths do not really count */
+    else if (APR_BUCKET_IS_FILE(b)) {
+        /* For file buckets the problem is their internal readpool that
+         * is used on the first read to allocate buffer/mmap.
+         * Since setting aside a file bucket will de-register the
+         * file cleanup function from the previous pool, we need to
+         * call that only from the sender thread.
+         *
+         * Currently, we do not handle file bucket with refcount > 1 as
+         * the beam is then not in complete control of the file's lifetime.
+         * Which results in the bug that a file get closed by the receiver
+         * while the sender or the beam still have buckets using it. 
+         * 
+         * Additionally, we allow callbacks to prevent beaming file
+         * handles across. The use case for this is to limit the number 
+         * of open file handles and rather use a less efficient beam
+         * transport. */
+        apr_bucket_file *bf = b->data;
+        apr_file_t *fd = bf->fd;
+        can_beam = (bf->refcount.refcount == 1);
+        if (can_beam && beam->can_beam_fn) {
+            can_beam = beam->can_beam_fn(beam->can_beam_ctx, beam, fd);
+        }
+        check_len = !can_beam;
     }
     else {
-        space_left = calc_space_left(beam);
-        if (space_left > 0 && bred->length == ((apr_size_t)-1)) {
+        if (b->length == ((apr_size_t)-1)) {
             const char *data;
-            status = apr_bucket_read(bred, &data, &len, APR_BLOCK_READ);
+            status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
             if (status != APR_SUCCESS) {
                 return status;
             }
         }
-        
-        if (space_left < bred->length) {
-            status = r_wait_space(beam, block, pbl, &space_left);
-            if (status != APR_SUCCESS) {
-                return status;
-            }
-            if (space_left <= 0) {
-                return APR_EAGAIN;
-            }
-        }
-        /* space available, maybe need bucket split */
+        check_len = 1;
     }
     
+    if (check_len) {
+        if (b->length > *pspace_left) {
+            apr_bucket_split(b, *pspace_left);
+        }
+        *pspace_left -= b->length;
+    }
 
-    /* The fundamental problem is that reading a red bucket from
-     * a green thread is a total NO GO, because the bucket might use
+    /* The fundamental problem is that reading a sender bucket from
+     * a receiver thread is a total NO GO, because the bucket might use
      * its pool/bucket_alloc from a foreign thread and that will
      * corrupt. */
     status = APR_ENOTIMPL;
-    if (beam->closed && bred->length > 0) {
-        status = APR_EOF;
-    }
-    else if (APR_BUCKET_IS_TRANSIENT(bred)) {
+    if (APR_BUCKET_IS_TRANSIENT(b)) {
         /* this takes care of transient buckets and converts them
          * into heap ones. Other bucket types might or might not be
          * affected by this. */
-        status = apr_bucket_setaside(bred, pool);
+        status = apr_bucket_setaside(b, beam->send_pool);
     }
-    else if (APR_BUCKET_IS_HEAP(bred)) {
-        /* For heap buckets read from a green thread is fine. The
+    else if (APR_BUCKET_IS_HEAP(b)) {
+        /* For heap buckets read from a receiver thread is fine. The
          * data will be there and live until the bucket itself is
          * destroyed. */
         status = APR_SUCCESS;
     }
-    else if (APR_BUCKET_IS_POOL(bred)) {
+    else if (APR_BUCKET_IS_POOL(b)) {
         /* pool buckets are bastards that register at pool cleanup
          * to morph themselves into heap buckets. That may happen anytime,
          * even after the bucket data pointer has been read. So at
-         * any time inside the green thread, the pool bucket memory
+         * any time inside the receiver thread, the pool bucket memory
          * may disappear. yikes. */
-        status = apr_bucket_read(bred, &data, &len, APR_BLOCK_READ);
+        status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
         if (status == APR_SUCCESS) {
-            apr_bucket_heap_make(bred, data, len, NULL);
+            apr_bucket_heap_make(b, data, len, NULL);
         }
     }
-    else if (APR_BUCKET_IS_FILE(bred)) {
-        /* For file buckets the problem is their internal readpool that
-         * is used on the first read to allocate buffer/mmap.
-         * Since setting aside a file bucket will de-register the
-         * file cleanup function from the previous pool, we need to
-         * call that from a red thread. 
-         * Additionally, we allow callbacks to prevent beaming file
-         * handles across. The use case for this is to limit the number 
-         * of open file handles and rather use a less efficient beam
-         * transport. */
-        apr_file_t *fd = ((apr_bucket_file *)bred->data)->fd;
-        int can_beam = 1;
-        if (beam->last_beamed != fd && beam->can_beam_fn) {
-            can_beam = beam->can_beam_fn(beam->can_beam_ctx, beam, fd);
-        }
-        if (can_beam) {
-            beam->last_beamed = fd;
-            status = apr_bucket_setaside(bred, pool);
-        }
-        /* else: enter ENOTIMPL case below */
+    else if (APR_BUCKET_IS_FILE(b) && can_beam) {
+        status = apr_bucket_setaside(b, beam->send_pool);
     }
     
     if (status == APR_ENOTIMPL) {
@@ -645,17 +869,11 @@ static apr_status_t append_bucket(h2_bucket_beam *beam,
          * but hope that after read, its data stays immutable for the
          * lifetime of the bucket. (see pool bucket handling above for
          * a counter example).
-         * We do the read while in a red thread, so that the bucket may
+         * We do the read while in the sender thread, so that the bucket may
          * use pools/allocators safely. */
-        if (space_left < APR_BUCKET_BUFF_SIZE) {
-            space_left = APR_BUCKET_BUFF_SIZE;
-        }
-        if (space_left < bred->length) {
-            apr_bucket_split(bred, space_left);
-        }
-        status = apr_bucket_read(bred, &data, &len, APR_BLOCK_READ);
+        status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
         if (status == APR_SUCCESS) {
-            status = apr_bucket_setaside(bred, pool);
+            status = apr_bucket_setaside(b, beam->send_pool);
         }
     }
     
@@ -663,42 +881,65 @@ static apr_status_t append_bucket(h2_bucket_beam *beam,
         return status;
     }
     
-    APR_BUCKET_REMOVE(bred);
-    H2_BLIST_INSERT_TAIL(&beam->red, bred);
-    beam->sent_bytes += bred->length;
-    
+    APR_BUCKET_REMOVE(b);
+    H2_BLIST_INSERT_TAIL(&beam->send_list, b);
+    beam->sent_bytes += b->length;
+
     return APR_SUCCESS;
 }
 
+void h2_beam_send_from(h2_bucket_beam *beam, apr_pool_t *p)
+{
+    h2_beam_lock bl;
+    /* Called from the sender thread to add buckets to the beam */
+    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
+        r_purge_sent(beam);
+        beam_set_send_pool(beam, p);
+        leave_yellow(beam, &bl);
+    }
+}
+
 apr_status_t h2_beam_send(h2_bucket_beam *beam, 
-                          apr_bucket_brigade *red_brigade
+                          apr_bucket_brigade *sender_bb
                           apr_read_type_e block)
 {
-    apr_bucket *bred;
-    apr_status_t status = APR_SUCCESS;
+    apr_bucket *b;
+    apr_status_t rv = APR_SUCCESS;
+    apr_size_t space_left = 0;
     h2_beam_lock bl;
 
-    /* Called from the red thread to add buckets to the beam */
+    /* Called from the sender thread to add buckets to the beam */
     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
-        r_purge_reds(beam);
+        ap_assert(beam->send_pool);
+        r_purge_sent(beam);
         
         if (beam->aborted) {
-            status = APR_ECONNABORTED;
+            move_to_hold(beam, sender_bb);
+            rv = APR_ECONNABORTED;
         }
-        else if (red_brigade) {
-            while (!APR_BRIGADE_EMPTY(red_brigade)
-                   && status == APR_SUCCESS) {
-                bred = APR_BRIGADE_FIRST(red_brigade);
-                status = append_bucket(beam, bred, block, beam->red_pool, &bl);
-            }
-            if (beam->m_cond) {
-                apr_thread_cond_broadcast(beam->m_cond);
+        else if (sender_bb) {
+            int force_report = !APR_BRIGADE_EMPTY(sender_bb);
+            
+            space_left = calc_space_left(beam);
+            while (!APR_BRIGADE_EMPTY(sender_bb) && APR_SUCCESS == rv) {
+                if (space_left <= 0) {
+                    report_prod_io(beam, force_report, &bl);
+                    rv = wait_not_full(beam, block, &space_left, &bl);
+                    if (APR_SUCCESS != rv) {
+                        break;
+                    }
+                }
+                b = APR_BRIGADE_FIRST(sender_bb);
+                rv = append_bucket(beam, b, block, &space_left, &bl);
             }
+            
+            report_prod_io(beam, force_report, &bl);
+            apr_thread_cond_broadcast(beam->change);
         }
-        report_consumption(beam);
+        report_consumption(beam, &bl);
         leave_yellow(beam, &bl);
     }
-    return status;
+    return rv;
 }
 
 apr_status_t h2_beam_receive(h2_bucket_beam *beam, 
@@ -707,60 +948,75 @@ apr_status_t h2_beam_receive(h2_bucket_beam *beam,
                              apr_off_t readbytes)
 {
     h2_beam_lock bl;
-    apr_bucket *bred, *bgreen, *ng;
+    apr_bucket *bsender, *brecv, *ng;
     int transferred = 0;
     apr_status_t status = APR_SUCCESS;
-    apr_off_t remain = readbytes;
+    apr_off_t remain;
+    int transferred_buckets = 0;
     
-    /* Called from the green thread to take buckets from the beam */
+    /* Called from the receiver thread to take buckets from the beam */
     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
+        if (readbytes <= 0) {
+            readbytes = APR_SIZE_MAX;
+        }
+        remain = readbytes;
+        
 transfer:
         if (beam->aborted) {
+            recv_buffer_cleanup(beam, &bl);
             status = APR_ECONNABORTED;
             goto leave;
         }
 
-        /* transfer enough buckets from our green brigade, if we have one */
-        while (beam->green
-               && !APR_BRIGADE_EMPTY(beam->green)
-               && (readbytes <= 0 || remain >= 0)) {
-            bgreen = APR_BRIGADE_FIRST(beam->green);
-            if (readbytes > 0 && bgreen->length > 0 && remain <= 0) {
+        /* transfer enough buckets from our receiver brigade, if we have one */
+        while (remain >= 0 
+               && beam->recv_buffer 
+               && !APR_BRIGADE_EMPTY(beam->recv_buffer)) {
+               
+            brecv = APR_BRIGADE_FIRST(beam->recv_buffer);
+            if (brecv->length > 0 && remain <= 0) {
                 break;
             }            
-            APR_BUCKET_REMOVE(bgreen);
-            APR_BRIGADE_INSERT_TAIL(bb, bgreen);
-            remain -= bgreen->length;
+            APR_BUCKET_REMOVE(brecv);
+            APR_BRIGADE_INSERT_TAIL(bb, brecv);
+            remain -= brecv->length;
             ++transferred;
         }
 
-        /* transfer from our red brigade, transforming red buckets to
-         * green ones until we have enough */
-        while (!H2_BLIST_EMPTY(&beam->red) && (readbytes <= 0 || remain >= 0)) {
-            bred = H2_BLIST_FIRST(&beam->red);
-            bgreen = NULL;
-            
-            if (readbytes > 0 && bred->length > 0 && remain <= 0) {
+        /* transfer from our sender brigade, transforming sender buckets to
+         * receiver ones until we have enough */
+        while (remain >= 0 && !H2_BLIST_EMPTY(&beam->send_list)) {
+               
+            brecv = NULL;
+            bsender = H2_BLIST_FIRST(&beam->send_list);            
+            if (bsender->length > 0 && remain <= 0) {
                 break;
             }
                         
-            if (APR_BUCKET_IS_METADATA(bred)) {
-                if (APR_BUCKET_IS_EOS(bred)) {
+            if (APR_BUCKET_IS_METADATA(bsender)) {
+                if (APR_BUCKET_IS_EOS(bsender)) {
+                    brecv = apr_bucket_eos_create(bb->bucket_alloc);
                     beam->close_sent = 1;
-                    bgreen = apr_bucket_eos_create(bb->bucket_alloc);
                 }
-                else if (APR_BUCKET_IS_FLUSH(bred)) {
-                    bgreen = apr_bucket_flush_create(bb->bucket_alloc);
+                else if (APR_BUCKET_IS_FLUSH(bsender)) {
+                    brecv = apr_bucket_flush_create(bb->bucket_alloc);
                 }
-                else {
-                    /* put red into hold, no green sent out */
+                else if (AP_BUCKET_IS_ERROR(bsender)) {
+                    ap_bucket_error *eb = (ap_bucket_error *)bsender;
+                    brecv = ap_bucket_error_create(eb->status, eb->data,
+                                                    bb->p, bb->bucket_alloc);
                 }
             }
-            else if (APR_BUCKET_IS_FILE(bred)) {
+            else if (bsender->length == 0) {
+                APR_BUCKET_REMOVE(bsender);
+                H2_BLIST_INSERT_TAIL(&beam->hold_list, bsender);
+                continue;
+            }
+            else if (APR_BUCKET_IS_FILE(bsender)) {
                 /* This is set aside into the target brigade pool so that 
                  * any read operation messes with that pool and not 
-                 * the red one. */
-                apr_bucket_file *f = (apr_bucket_file *)bred->data;
+                 * the sender one. */
+                apr_bucket_file *f = (apr_bucket_file *)bsender->data;
                 apr_file_t *fd = f->fd;
                 int setaside = (f->readpool != bb->p);
                 
@@ -771,7 +1027,7 @@ transfer:
                     }
                     ++beam->files_beamed;
                 }
-                ng = apr_brigade_insert_file(bb, fd, bred->start, bred->length, 
+                ng = apr_brigade_insert_file(bb, fd, bsender->start, bsender->length, 
                                              bb->p);
 #if APR_HAS_MMAP
                 /* disable mmap handling as this leads to segfaults when
@@ -779,72 +1035,97 @@ transfer:
                  * been handed out. See also PR 59348 */
                 apr_bucket_file_enable_mmap(ng, 0);
 #endif
-                remain -= bred->length;
+                APR_BUCKET_REMOVE(bsender);
+                H2_BLIST_INSERT_TAIL(&beam->hold_list, bsender);
+
+                remain -= bsender->length;
                 ++transferred;
+                ++transferred_buckets;
+                continue;
             }
             else {
-                /* create a "green" standin bucket. we took care about the
-                 * underlying red bucket and its data when we placed it into
-                 * the red brigade.
-                 * the beam bucket will notify us on destruction that bred is
+                /* create a "receiver" standin bucket. we took care about the
+                 * underlying sender bucket and its data when we placed it into
+                 * the sender brigade.
+                 * the beam bucket will notify us on destruction that bsender is
                  * no longer needed. */
-                bgreen = h2_beam_bucket_create(beam, bred, bb->bucket_alloc,
+                brecv = h2_beam_bucket_create(beam, bsender, bb->bucket_alloc,
                                                beam->buckets_sent++);
             }
             
-            /* Place the red bucket into our hold, to be destroyed when no
-             * green bucket references it any more. */
-            APR_BUCKET_REMOVE(bred);
-            H2_BLIST_INSERT_TAIL(&beam->hold, bred);
-            beam->received_bytes += bred->length;
-            if (bgreen) {
-                APR_BRIGADE_INSERT_TAIL(bb, bgreen);
-                remain -= bgreen->length;
+            /* Place the sender bucket into our hold, to be destroyed when no
+             * receiver bucket references it any more. */
+            APR_BUCKET_REMOVE(bsender);
+            H2_BLIST_INSERT_TAIL(&beam->hold_list, bsender);
+            
+            beam->received_bytes += bsender->length;
+            ++transferred_buckets;
+            
+            if (brecv) {
+                APR_BRIGADE_INSERT_TAIL(bb, brecv);
+                remain -= brecv->length;
                 ++transferred;
             }
+            else {
+                /* let outside hook determine how bucket is beamed */
+                leave_yellow(beam, &bl);
+                brecv = h2_beam_bucket(beam, bb, bsender);
+                enter_yellow(beam, &bl);
+                
+                while (brecv && brecv != APR_BRIGADE_SENTINEL(bb)) {
+                    ++transferred;
+                    remain -= brecv->length;
+                    brecv = APR_BUCKET_NEXT(brecv);
+                }
+            }
         }
 
-        if (readbytes > 0 && remain < 0) {
-            /* too much, put some back */
+        if (remain < 0) {
+            /* too much, put some back into out recv_buffer */
             remain = readbytes;
-            for (bgreen = APR_BRIGADE_FIRST(bb);
-                 bgreen != APR_BRIGADE_SENTINEL(bb);
-                 bgreen = APR_BUCKET_NEXT(bgreen)) {
-                 remain -= bgreen->length;
-                 if (remain < 0) {
-                     apr_bucket_split(bgreen, bgreen->length+remain);
-                     beam->green = apr_brigade_split_ex(bb, 
-                                                        APR_BUCKET_NEXT(bgreen), 
-                                                        beam->green);
-                     break;
-                 }
+            for (brecv = APR_BRIGADE_FIRST(bb);
+                 brecv != APR_BRIGADE_SENTINEL(bb);
+                 brecv = APR_BUCKET_NEXT(brecv)) {
+                remain -= (beam->tx_mem_limits? bucket_mem_used(brecv) 
+                           : brecv->length);
+                if (remain < 0) {
+                    apr_bucket_split(brecv, brecv->length+remain);
+                    beam->recv_buffer = apr_brigade_split_ex(bb, 
+                                                             APR_BUCKET_NEXT(brecv), 
+                                                             beam->recv_buffer);
+                    break;
+                }
             }
         }
-                
-        if (transferred) {
-            status = APR_SUCCESS;
-        }
-        else if (beam->closed) {
+
+        if (beam->closed && buffer_is_empty(beam)) {
+            /* beam is closed and we have nothing more to receive */ 
             if (!beam->close_sent) {
                 apr_bucket *b = apr_bucket_eos_create(bb->bucket_alloc);
                 APR_BRIGADE_INSERT_TAIL(bb, b);
                 beam->close_sent = 1;
+                ++transferred;
                 status = APR_SUCCESS;
             }
-            else {
-                status = APR_EOF;
+        }
+        
+        if (transferred_buckets > 0) {
+           if (beam->cons_ev_cb) { 
+               beam->cons_ev_cb(beam->cons_ctx, beam);
             }
         }
-        else if (block == APR_BLOCK_READ && bl.mutex && beam->m_cond) {
-            status = wait_cond(beam, bl.mutex);
+        
+        if (transferred) {
+            apr_thread_cond_broadcast(beam->change);
+            status = APR_SUCCESS;
+        }
+        else {
+            status = wait_not_empty(beam, block, bl.mutex);
             if (status != APR_SUCCESS) {
                 goto leave;
             }
             goto transfer;
         }
-        else {
-            status = APR_EAGAIN;
-        }
 leave:        
         leave_yellow(beam, &bl);
     }
@@ -852,13 +1133,25 @@ leave:
 }
 
 void h2_beam_on_consumed(h2_bucket_beam *beam, 
-                         h2_beam_consumed_callback *cb, void *ctx)
+                         h2_beam_ev_callback *ev_cb,
+                         h2_beam_io_callback *io_cb, void *ctx)
+{
+    h2_beam_lock bl;
+    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
+        beam->cons_ev_cb = ev_cb;
+        beam->cons_io_cb = io_cb;
+        beam->cons_ctx = ctx;
+        leave_yellow(beam, &bl);
+    }
+}
+
+void h2_beam_on_produced(h2_bucket_beam *beam, 
+                         h2_beam_io_callback *io_cb, void *ctx)
 {
     h2_beam_lock bl;
-    
     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
-        beam->consumed_fn = cb;
-        beam->consumed_ctx = ctx;
+        beam->prod_io_cb = io_cb;
+        beam->prod_ctx = ctx;
         leave_yellow(beam, &bl);
     }
 }
@@ -882,9 +1175,9 @@ apr_off_t h2_beam_get_buffered(h2_bucket_beam *beam)
     apr_off_t l = 0;
     h2_beam_lock bl;
     
-    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
-        for (b = H2_BLIST_FIRST(&beam->red); 
-            b != H2_BLIST_SENTINEL(&beam->red);
+    if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
+        for (b = H2_BLIST_FIRST(&beam->send_list); 
+            b != H2_BLIST_SENTINEL(&beam->send_list);
             b = APR_BUCKET_NEXT(b)) {
             /* should all have determinate length */
             l += b->length;
@@ -900,17 +1193,11 @@ apr_off_t h2_beam_get_mem_used(h2_bucket_beam *beam)
     apr_off_t l = 0;
     h2_beam_lock bl;
     
-    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
-        for (b = H2_BLIST_FIRST(&beam->red); 
-            b != H2_BLIST_SENTINEL(&beam->red);
+    if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
+        for (b = H2_BLIST_FIRST(&beam->send_list); 
+            b != H2_BLIST_SENTINEL(&beam->send_list);
             b = APR_BUCKET_NEXT(b)) {
-            if (APR_BUCKET_IS_FILE(b)) {
-                /* do not count */
-            }
-            else {
-                /* should all have determinate length */
-                l += b->length;
-            }
+            l += bucket_mem_used(b);
         }
         leave_yellow(beam, &bl);
     }
@@ -922,17 +1209,24 @@ int h2_beam_empty(h2_bucket_beam *beam)
     int empty = 1;
     h2_beam_lock bl;
     
-    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
-        empty = (H2_BLIST_EMPTY(&beam->red
-                 && (!beam->green || APR_BRIGADE_EMPTY(beam->green)));
+    if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
+        empty = (H2_BLIST_EMPTY(&beam->send_list
+                 && (!beam->recv_buffer || APR_BRIGADE_EMPTY(beam->recv_buffer)));
         leave_yellow(beam, &bl);
     }
     return empty;
 }
 
-int h2_beam_closed(h2_bucket_beam *beam)
+int h2_beam_holds_proxies(h2_bucket_beam *beam)
 {
-    return beam->closed;
+    int has_proxies = 1;
+    h2_beam_lock bl;
+    
+    if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
+        has_proxies = !H2_BPROXY_LIST_EMPTY(&beam->proxies);
+        leave_yellow(beam, &bl);
+    }
+    return has_proxies;
 }
 
 int h2_beam_was_received(h2_bucket_beam *beam)
@@ -940,7 +1234,7 @@ int h2_beam_was_received(h2_bucket_beam *beam)
     int happend = 0;
     h2_beam_lock bl;
     
-    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
+    if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
         happend = (beam->received_bytes > 0);
         leave_yellow(beam, &bl);
     }
@@ -952,10 +1246,38 @@ apr_size_t h2_beam_get_files_beamed(h2_bucket_beam *beam)
     apr_size_t n = 0;
     h2_beam_lock bl;
     
-    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
+    if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
         n = beam->files_beamed;
         leave_yellow(beam, &bl);
     }
     return n;
 }
 
+int h2_beam_no_files(void *ctx, h2_bucket_beam *beam, apr_file_t *file)
+{
+    return 0;
+}
+
+int h2_beam_report_consumption(h2_bucket_beam *beam)
+{
+    h2_beam_lock bl;
+    int rv = 0;
+    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
+        rv = report_consumption(beam, &bl);
+        leave_yellow(beam, &bl);
+    }
+    return rv;
+}
+
+void h2_beam_log(h2_bucket_beam *beam, conn_rec *c, int level, const char *msg)
+{
+    if (beam && APLOG_C_IS_LEVEL(c,level)) {
+        ap_log_cerror(APLOG_MARK, level, 0, c, 
+                      "beam(%ld-%d,%s,closed=%d,aborted=%d,empty=%d,buf=%ld): %s", 
+                      (c->master? c->master->id : c->id), beam->id, beam->tag, 
+                      beam->closed, beam->aborted, h2_beam_empty(beam), 
+                      (long)h2_beam_get_buffered(beam), msg);
+    }
+}
+
+