-/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * http://www.apache.org/licenses/LICENSE-2.0
-
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
+
#include <apr_lib.h>
+#include <apr_atomic.h>
#include <apr_strings.h>
#include <apr_time.h>
#include <apr_buckets.h>
#include <apr_thread_cond.h>
#include <httpd.h>
+#include <http_protocol.h>
#include <http_log.h>
#include "h2_private.h"
apr_bucket_refcount refcount;
APR_RING_ENTRY(h2_beam_proxy) link;
h2_bucket_beam *beam;
- apr_bucket *bred;
+ apr_bucket *bsender;
apr_size_t n;
};
apr_size_t *len, apr_read_type_e block)
{
h2_beam_proxy *d = b->data;
- if (d->bred) {
+ if (d->bsender) {
const char *data;
- apr_status_t status = apr_bucket_read(d->bred, &data, len, block);
+ apr_status_t status = apr_bucket_read(d->bsender, &data, len, block);
if (status == APR_SUCCESS) {
*str = data + b->start;
*len = b->length;
static apr_bucket * h2_beam_bucket_make(apr_bucket *b,
h2_bucket_beam *beam,
- apr_bucket *bred, apr_size_t n)
+ apr_bucket *bsender, apr_size_t n)
{
h2_beam_proxy *d;
d = apr_bucket_alloc(sizeof(*d), b->list);
H2_BPROXY_LIST_INSERT_TAIL(&beam->proxies, d);
d->beam = beam;
- d->bred = bred;
+ d->bsender = bsender;
d->n = n;
- b = apr_bucket_shared_make(b, d, 0, bred? bred->length : 0);
+ b = apr_bucket_shared_make(b, d, 0, bsender? bsender->length : 0);
b->type = &h2_bucket_type_beam;
return b;
}
static apr_bucket *h2_beam_bucket_create(h2_bucket_beam *beam,
- apr_bucket *bred,
+ apr_bucket *bsender,
apr_bucket_alloc_t *list,
apr_size_t n)
{
APR_BUCKET_INIT(b);
b->free = apr_bucket_free;
b->list = list;
- return h2_beam_bucket_make(b, beam, bred, n);
+ return h2_beam_bucket_make(b, beam, bsender, n);
}
-/*static apr_status_t beam_bucket_setaside(apr_bucket *b, apr_pool_t *pool)
-{
- apr_status_t status = APR_SUCCESS;
- h2_beam_proxy *d = b->data;
- if (d->bred) {
- const char *data;
- apr_size_t len;
-
- status = apr_bucket_read(d->bred, &data, &len, APR_BLOCK_READ);
- if (status == APR_SUCCESS) {
- b = apr_bucket_heap_make(b, (char *)data + b->start, b->length, NULL);
- if (b == NULL) {
- return APR_ENOMEM;
- }
- }
- }
- return status;
-}*/
-
const apr_bucket_type_t h2_bucket_type_beam = {
"BEAM", 5, APR_BUCKET_DATA,
beam_bucket_destroy,
/*******************************************************************************
* h2_blist, a brigade without allocations
******************************************************************************/
-
-apr_size_t h2_util_bl_print(char *buffer, apr_size_t bmax,
- const char *tag, const char *sep,
- h2_blist *bl)
+
+static apr_array_header_t *beamers;
+
+static apr_status_t cleanup_beamers(void *dummy)
{
- apr_size_t off = 0;
- const char *sp = "";
- apr_bucket *b;
-
- if (bl) {
- memset(buffer, 0, bmax--);
- off += apr_snprintf(buffer+off, bmax-off, "%s(", tag);
- for (b = H2_BLIST_FIRST(bl);
- bmax && (b != H2_BLIST_SENTINEL(bl));
- b = APR_BUCKET_NEXT(b)) {
+ (void)dummy;
+ beamers = NULL;
+ return APR_SUCCESS;
+}
+
+void h2_register_bucket_beamer(h2_bucket_beamer *beamer)
+{
+ if (!beamers) {
+ apr_pool_cleanup_register(apr_hook_global_pool, NULL,
+ cleanup_beamers, apr_pool_cleanup_null);
+ beamers = apr_array_make(apr_hook_global_pool, 10,
+ sizeof(h2_bucket_beamer*));
+ }
+ APR_ARRAY_PUSH(beamers, h2_bucket_beamer*) = beamer;
+}
+
+static apr_bucket *h2_beam_bucket(h2_bucket_beam *beam,
+ apr_bucket_brigade *dest,
+ const apr_bucket *src)
+{
+ apr_bucket *b = NULL;
+ int i;
+ if (beamers) {
+ for (i = 0; i < beamers->nelts && b == NULL; ++i) {
+ h2_bucket_beamer *beamer;
- off += h2_util_bucket_print(buffer+off, bmax-off, b, sp);
- sp = " ";
+ beamer = APR_ARRAY_IDX(beamers, i, h2_bucket_beamer*);
+ b = beamer(beam, dest, src);
}
- off += apr_snprintf(buffer+off, bmax-off, ")%s", sep);
- }
- else {
- off += apr_snprintf(buffer+off, bmax-off, "%s(null)%s", tag, sep);
}
- return off;
+ return b;
}
-
/*******************************************************************************
* bucket beam that can transport buckets across threads
******************************************************************************/
+static void mutex_leave(void *ctx, apr_thread_mutex_t *lock)
+{
+ apr_thread_mutex_unlock(lock);
+}
+
+static apr_status_t mutex_enter(void *ctx, h2_beam_lock *pbl)
+{
+ h2_bucket_beam *beam = ctx;
+ pbl->mutex = beam->lock;
+ pbl->leave = mutex_leave;
+ return apr_thread_mutex_lock(pbl->mutex);
+}
+
static apr_status_t enter_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl)
{
- if (beam->m_enter) {
- return beam->m_enter(beam->m_ctx, pbl);
- }
- pbl->mutex = NULL;
- pbl->leave = NULL;
- return APR_SUCCESS;
+ return mutex_enter(beam, pbl);
}
static void leave_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl)
}
}
-static apr_off_t calc_buffered(h2_bucket_beam *beam)
+static apr_off_t bucket_mem_used(apr_bucket *b)
+{
+ if (APR_BUCKET_IS_FILE(b)) {
+ return 0;
+ }
+ else {
+ /* should all have determinate length */
+ return b->length;
+ }
+}
+
+static int report_consumption(h2_bucket_beam *beam, h2_beam_lock *pbl)
+{
+ int rv = 0;
+ apr_off_t len = beam->received_bytes - beam->cons_bytes_reported;
+ h2_beam_io_callback *cb = beam->cons_io_cb;
+
+ if (len > 0) {
+ if (cb) {
+ void *ctx = beam->cons_ctx;
+
+ if (pbl) leave_yellow(beam, pbl);
+ cb(ctx, beam, len);
+ if (pbl) enter_yellow(beam, pbl);
+ rv = 1;
+ }
+ beam->cons_bytes_reported += len;
+ }
+ return rv;
+}
+
+static void report_prod_io(h2_bucket_beam *beam, int force, h2_beam_lock *pbl)
+{
+ apr_off_t len = beam->sent_bytes - beam->prod_bytes_reported;
+ if (force || len > 0) {
+ h2_beam_io_callback *cb = beam->prod_io_cb;
+ if (cb) {
+ void *ctx = beam->prod_ctx;
+
+ leave_yellow(beam, pbl);
+ cb(ctx, beam, len);
+ enter_yellow(beam, pbl);
+ }
+ beam->prod_bytes_reported += len;
+ }
+}
+
+static apr_size_t calc_buffered(h2_bucket_beam *beam)
{
- apr_off_t len = 0;
+ apr_size_t len = 0;
apr_bucket *b;
- for (b = H2_BLIST_FIRST(&beam->red);
- b != H2_BLIST_SENTINEL(&beam->red);
+ for (b = H2_BLIST_FIRST(&beam->send_list);
+ b != H2_BLIST_SENTINEL(&beam->send_list);
b = APR_BUCKET_NEXT(b)) {
if (b->length == ((apr_size_t)-1)) {
/* do not count */
}
else if (APR_BUCKET_IS_FILE(b)) {
- /* if unread, has no real mem footprint. how to test? */
+ /* if unread, has no real mem footprint. */
}
else {
len += b->length;
return len;
}
-static void r_purge_reds(h2_bucket_beam *beam)
+static void r_purge_sent(h2_bucket_beam *beam)
{
- apr_bucket *bred;
- /* delete all red buckets in purge brigade, needs to be called
- * from red thread only */
- while (!H2_BLIST_EMPTY(&beam->purge)) {
- bred = H2_BLIST_FIRST(&beam->purge);
- apr_bucket_delete(bred);
+ apr_bucket *b;
+ /* delete all sender buckets in purge brigade, needs to be called
+ * from sender thread only */
+ while (!H2_BLIST_EMPTY(&beam->purge_list)) {
+ b = H2_BLIST_FIRST(&beam->purge_list);
+ apr_bucket_delete(b);
}
}
return APR_SIZE_MAX;
}
-static apr_status_t wait_cond(h2_bucket_beam *beam, apr_thread_mutex_t *lock)
+static int buffer_is_empty(h2_bucket_beam *beam)
{
- if (beam->timeout > 0) {
- return apr_thread_cond_timedwait(beam->m_cond, lock, beam->timeout);
+ return ((!beam->recv_buffer || APR_BRIGADE_EMPTY(beam->recv_buffer))
+ && H2_BLIST_EMPTY(&beam->send_list));
+}
+
+static apr_status_t wait_empty(h2_bucket_beam *beam, apr_read_type_e block,
+ apr_thread_mutex_t *lock)
+{
+ apr_status_t rv = APR_SUCCESS;
+
+ while (!buffer_is_empty(beam) && APR_SUCCESS == rv) {
+ if (APR_BLOCK_READ != block || !lock) {
+ rv = APR_EAGAIN;
+ }
+ else if (beam->timeout > 0) {
+ rv = apr_thread_cond_timedwait(beam->change, lock, beam->timeout);
+ }
+ else {
+ rv = apr_thread_cond_wait(beam->change, lock);
+ }
}
- else {
- return apr_thread_cond_wait(beam->m_cond, lock);
+ return rv;
+}
+
+static apr_status_t wait_not_empty(h2_bucket_beam *beam, apr_read_type_e block,
+ apr_thread_mutex_t *lock)
+{
+ apr_status_t rv = APR_SUCCESS;
+
+ while (buffer_is_empty(beam) && APR_SUCCESS == rv) {
+ if (beam->aborted) {
+ rv = APR_ECONNABORTED;
+ }
+ else if (beam->closed) {
+ rv = APR_EOF;
+ }
+ else if (APR_BLOCK_READ != block || !lock) {
+ rv = APR_EAGAIN;
+ }
+ else if (beam->timeout > 0) {
+ rv = apr_thread_cond_timedwait(beam->change, lock, beam->timeout);
+ }
+ else {
+ rv = apr_thread_cond_wait(beam->change, lock);
+ }
}
+ return rv;
}
-static apr_status_t r_wait_space(h2_bucket_beam *beam, apr_read_type_e block,
- h2_beam_lock *pbl, apr_off_t *premain)
+static apr_status_t wait_not_full(h2_bucket_beam *beam, apr_read_type_e block,
+ apr_size_t *pspace_left, h2_beam_lock *bl)
{
- *premain = calc_space_left(beam);
- while (!beam->aborted && *premain <= 0
- && (block == APR_BLOCK_READ) && pbl->mutex) {
- apr_status_t status = wait_cond(beam, pbl->mutex);
- if (APR_STATUS_IS_TIMEUP(status)) {
- return status;
+ apr_status_t rv = APR_SUCCESS;
+ apr_size_t left;
+
+ while (0 == (left = calc_space_left(beam)) && APR_SUCCESS == rv) {
+ if (beam->aborted) {
+ rv = APR_ECONNABORTED;
+ }
+ else if (block != APR_BLOCK_READ || !bl->mutex) {
+ rv = APR_EAGAIN;
+ }
+ else {
+ if (beam->timeout > 0) {
+ rv = apr_thread_cond_timedwait(beam->change, bl->mutex, beam->timeout);
+ }
+ else {
+ rv = apr_thread_cond_wait(beam->change, bl->mutex);
+ }
}
- r_purge_reds(beam);
- *premain = calc_space_left(beam);
}
- return beam->aborted? APR_ECONNABORTED : APR_SUCCESS;
+ *pspace_left = left;
+ return rv;
}
static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy)
/* even when beam buckets are split, only the one where
* refcount drops to 0 will call us */
H2_BPROXY_REMOVE(proxy);
- /* invoked from green thread, the last beam bucket for the red
- * bucket bred is about to be destroyed.
+ /* invoked from receiver thread, the last beam bucket for the send
+ * bucket is about to be destroyed.
* remove it from the hold, where it should be now */
- if (proxy->bred) {
- for (b = H2_BLIST_FIRST(&beam->hold);
- b != H2_BLIST_SENTINEL(&beam->hold);
+ if (proxy->bsender) {
+ for (b = H2_BLIST_FIRST(&beam->hold_list);
+ b != H2_BLIST_SENTINEL(&beam->hold_list);
b = APR_BUCKET_NEXT(b)) {
- if (b == proxy->bred) {
+ if (b == proxy->bsender) {
break;
}
}
- if (b != H2_BLIST_SENTINEL(&beam->hold)) {
+ if (b != H2_BLIST_SENTINEL(&beam->hold_list)) {
/* bucket is in hold as it should be, mark this one
* and all before it for purging. We might have placed meta
- * buckets without a green proxy into the hold before it
+ * buckets without a receiver proxy into the hold before it
* and schedule them for purging now */
- for (b = H2_BLIST_FIRST(&beam->hold);
- b != H2_BLIST_SENTINEL(&beam->hold);
+ for (b = H2_BLIST_FIRST(&beam->hold_list);
+ b != H2_BLIST_SENTINEL(&beam->hold_list);
b = next) {
next = APR_BUCKET_NEXT(b);
- if (b == proxy->bred) {
+ if (b == proxy->bsender) {
APR_BUCKET_REMOVE(b);
- H2_BLIST_INSERT_TAIL(&beam->purge, b);
+ H2_BLIST_INSERT_TAIL(&beam->purge_list, b);
break;
}
else if (APR_BUCKET_IS_METADATA(b)) {
APR_BUCKET_REMOVE(b);
- H2_BLIST_INSERT_TAIL(&beam->purge, b);
+ H2_BLIST_INSERT_TAIL(&beam->purge_list, b);
}
else {
/* another data bucket before this one in hold. this
}
}
- proxy->bred = NULL;
+ proxy->bsender = NULL;
}
else {
/* it should be there unless we screwed up */
- ap_log_perror(APLOG_MARK, APLOG_WARNING, 0, beam->red_pool,
- APLOGNO() "h2_beam(%d-%s): emitted bucket not "
+ ap_log_perror(APLOG_MARK, APLOG_WARNING, 0, beam->send_pool,
+ APLOGNO(03384) "h2_beam(%d-%s): emitted bucket not "
"in hold, n=%d", beam->id, beam->tag,
(int)proxy->n);
- AP_DEBUG_ASSERT(!proxy->bred);
+ ap_assert(!proxy->bsender);
}
}
/* notify anyone waiting on space to become available */
if (!bl.mutex) {
- r_purge_reds(beam);
+ r_purge_sent(beam);
}
- else if (beam->m_cond) {
- apr_thread_cond_broadcast(beam->m_cond);
+ else {
+ apr_thread_cond_broadcast(beam->change);
}
leave_yellow(beam, &bl);
}
}
-static void report_consumption(h2_bucket_beam *beam)
-{
- if (beam->consumed_fn && (beam->received_bytes != beam->reported_bytes)) {
- beam->consumed_fn(beam->consumed_ctx, beam,
- beam->received_bytes - beam->reported_bytes);
- beam->reported_bytes = beam->received_bytes;
- }
-}
-
static void h2_blist_cleanup(h2_blist *bl)
{
apr_bucket *e;
{
if (!beam->closed) {
beam->closed = 1;
- if (beam->m_cond) {
- apr_thread_cond_broadcast(beam->m_cond);
- }
+ apr_thread_cond_broadcast(beam->change);
+ }
+ return APR_SUCCESS;
+}
+
+int h2_beam_is_closed(h2_bucket_beam *beam)
+{
+ return beam->closed;
+}
+
+static int pool_register(h2_bucket_beam *beam, apr_pool_t *pool,
+ apr_status_t (*cleanup)(void *))
+{
+ if (pool && pool != beam->pool) {
+ apr_pool_pre_cleanup_register(pool, beam, cleanup);
+ return 1;
}
+ return 0;
+}
+
+static int pool_kill(h2_bucket_beam *beam, apr_pool_t *pool,
+ apr_status_t (*cleanup)(void *)) {
+ if (pool && pool != beam->pool) {
+ apr_pool_cleanup_kill(pool, beam, cleanup);
+ return 1;
+ }
+ return 0;
+}
+
+static apr_status_t beam_recv_cleanup(void *data)
+{
+ h2_bucket_beam *beam = data;
+ /* receiver pool has gone away, clear references */
+ beam->recv_buffer = NULL;
+ beam->recv_pool = NULL;
return APR_SUCCESS;
}
-static apr_status_t beam_cleanup(void *data)
+static apr_status_t beam_send_cleanup(void *data)
{
h2_bucket_beam *beam = data;
+ /* sender is going away, clear up all references to its memory */
+ r_purge_sent(beam);
+ h2_blist_cleanup(&beam->send_list);
+ report_consumption(beam, NULL);
+ while (!H2_BPROXY_LIST_EMPTY(&beam->proxies)) {
+ h2_beam_proxy *proxy = H2_BPROXY_LIST_FIRST(&beam->proxies);
+ H2_BPROXY_REMOVE(proxy);
+ proxy->beam = NULL;
+ proxy->bsender = NULL;
+ }
+ h2_blist_cleanup(&beam->purge_list);
+ h2_blist_cleanup(&beam->hold_list);
+ beam->send_pool = NULL;
+ return APR_SUCCESS;
+}
+
+static void beam_set_send_pool(h2_bucket_beam *beam, apr_pool_t *pool)
+{
+ if (beam->send_pool != pool) {
+ if (beam->send_pool && beam->send_pool != beam->pool) {
+ pool_kill(beam, beam->send_pool, beam_send_cleanup);
+ beam_send_cleanup(beam);
+ }
+ beam->send_pool = pool;
+ pool_register(beam, beam->send_pool, beam_send_cleanup);
+ }
+}
+
+static void recv_buffer_cleanup(h2_bucket_beam *beam, h2_beam_lock *bl)
+{
+ if (beam->recv_buffer && !APR_BRIGADE_EMPTY(beam->recv_buffer)) {
+ apr_bucket_brigade *bb = beam->recv_buffer;
+ apr_off_t bblen = 0;
+
+ beam->recv_buffer = NULL;
+ apr_brigade_length(bb, 0, &bblen);
+ beam->received_bytes += bblen;
+
+ /* need to do this unlocked since bucket destroy might
+ * call this beam again. */
+ if (bl) leave_yellow(beam, bl);
+ apr_brigade_destroy(bb);
+ if (bl) enter_yellow(beam, bl);
+
+ if (beam->cons_ev_cb) {
+ beam->cons_ev_cb(beam->cons_ctx, beam);
+ }
+ }
+}
+
+static apr_status_t beam_cleanup(h2_bucket_beam *beam, int from_pool)
+{
+ apr_status_t status = APR_SUCCESS;
+ int safe_send = (beam->owner == H2_BEAM_OWNER_SEND);
+ int safe_recv = (beam->owner == H2_BEAM_OWNER_RECV);
- beam_close(beam);
- r_purge_reds(beam);
- h2_blist_cleanup(&beam->red);
- report_consumption(beam);
- h2_blist_cleanup(&beam->purge);
- h2_blist_cleanup(&beam->hold);
+ /*
+ * Owner of the beam is going away, depending on which side it owns,
+ * cleanup strategies will differ.
+ *
+ * In general, receiver holds references to memory from sender.
+ * Clean up receiver first, if safe, then cleanup sender, if safe.
+ */
+
+ /* When called from pool destroy, io callbacks are disabled */
+ if (from_pool) {
+ beam->cons_io_cb = NULL;
+ }
+
+ /* When modify send is not safe, this means we still have multi-thread
+ * protection and the owner is receiving the buckets. If the sending
+ * side has not gone away, this means we could have dangling buckets
+ * in our lists that never get destroyed. This should not happen. */
+ ap_assert(safe_send || !beam->send_pool);
+ if (!H2_BLIST_EMPTY(&beam->send_list)) {
+ ap_assert(beam->send_pool);
+ }
- return APR_SUCCESS;
+ if (safe_recv) {
+ if (beam->recv_pool) {
+ pool_kill(beam, beam->recv_pool, beam_recv_cleanup);
+ beam->recv_pool = NULL;
+ }
+ recv_buffer_cleanup(beam, NULL);
+ }
+ else {
+ beam->recv_buffer = NULL;
+ beam->recv_pool = NULL;
+ }
+
+ if (safe_send && beam->send_pool) {
+ pool_kill(beam, beam->send_pool, beam_send_cleanup);
+ status = beam_send_cleanup(beam);
+ }
+
+ if (safe_recv) {
+ ap_assert(H2_BPROXY_LIST_EMPTY(&beam->proxies));
+ ap_assert(H2_BLIST_EMPTY(&beam->send_list));
+ ap_assert(H2_BLIST_EMPTY(&beam->hold_list));
+ ap_assert(H2_BLIST_EMPTY(&beam->purge_list));
+ }
+ return status;
+}
+
+static apr_status_t beam_pool_cleanup(void *data)
+{
+ return beam_cleanup(data, 1);
}
apr_status_t h2_beam_destroy(h2_bucket_beam *beam)
{
- apr_pool_cleanup_kill(beam->red_pool, beam, beam_cleanup);
- return beam_cleanup(beam);
+ apr_pool_cleanup_kill(beam->pool, beam, beam_pool_cleanup);
+ return beam_cleanup(beam, 0);
}
-apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *red_pool,
+apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *pool,
int id, const char *tag,
- apr_size_t max_buf_size)
+ h2_beam_owner_t owner,
+ apr_size_t max_buf_size,
+ apr_interval_time_t timeout)
{
h2_bucket_beam *beam;
- apr_status_t status = APR_SUCCESS;
+ apr_status_t rv = APR_SUCCESS;
- beam = apr_pcalloc(red_pool, sizeof(*beam));
+ beam = apr_pcalloc(pool, sizeof(*beam));
if (!beam) {
return APR_ENOMEM;
}
beam->id = id;
beam->tag = tag;
- H2_BLIST_INIT(&beam->red);
- H2_BLIST_INIT(&beam->hold);
- H2_BLIST_INIT(&beam->purge);
+ beam->pool = pool;
+ beam->owner = owner;
+ H2_BLIST_INIT(&beam->send_list);
+ H2_BLIST_INIT(&beam->hold_list);
+ H2_BLIST_INIT(&beam->purge_list);
H2_BPROXY_LIST_INIT(&beam->proxies);
- beam->red_pool = red_pool;
+ beam->tx_mem_limits = 1;
beam->max_buf_size = max_buf_size;
-
- apr_pool_pre_cleanup_register(red_pool, beam, beam_cleanup);
- *pbeam = beam;
-
- return status;
+ beam->timeout = timeout;
+
+ rv = apr_thread_mutex_create(&beam->lock, APR_THREAD_MUTEX_DEFAULT, pool);
+ if (APR_SUCCESS == rv) {
+ rv = apr_thread_cond_create(&beam->change, pool);
+ if (APR_SUCCESS == rv) {
+ apr_pool_pre_cleanup_register(pool, beam, beam_pool_cleanup);
+ *pbeam = beam;
+ }
+ }
+ return rv;
}
void h2_beam_buffer_size_set(h2_bucket_beam *beam, apr_size_t buffer_size)
h2_beam_lock bl;
apr_size_t buffer_size = 0;
- if (enter_yellow(beam, &bl) == APR_SUCCESS) {
+ if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
buffer_size = beam->max_buf_size;
leave_yellow(beam, &bl);
}
return buffer_size;
}
-void h2_beam_mutex_set(h2_bucket_beam *beam,
- h2_beam_mutex_enter m_enter,
- apr_thread_cond_t *cond,
- void *m_ctx)
-{
- h2_beam_lock bl;
-
- if (enter_yellow(beam, &bl) == APR_SUCCESS) {
- beam->m_enter = m_enter;
- beam->m_ctx = m_ctx;
- beam->m_cond = cond;
- leave_yellow(beam, &bl);
- }
-}
-
void h2_beam_timeout_set(h2_bucket_beam *beam, apr_interval_time_t timeout)
{
h2_beam_lock bl;
{
h2_beam_lock bl;
- if (enter_yellow(beam, &bl) == APR_SUCCESS) {
- r_purge_reds(beam);
- h2_blist_cleanup(&beam->red);
- beam->aborted = 1;
- report_consumption(beam);
- if (beam->m_cond) {
- apr_thread_cond_broadcast(beam->m_cond);
+ if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
+ if (!beam->aborted) {
+ beam->aborted = 1;
+ r_purge_sent(beam);
+ h2_blist_cleanup(&beam->send_list);
+ report_consumption(beam, &bl);
}
+ apr_thread_cond_broadcast(beam->change);
leave_yellow(beam, &bl);
}
}
{
h2_beam_lock bl;
- if (enter_yellow(beam, &bl) == APR_SUCCESS) {
- r_purge_reds(beam);
+ if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
+ r_purge_sent(beam);
beam_close(beam);
- report_consumption(beam);
+ report_consumption(beam, &bl);
leave_yellow(beam, &bl);
}
return beam->aborted? APR_ECONNABORTED : APR_SUCCESS;
}
-apr_status_t h2_beam_shutdown(h2_bucket_beam *beam, apr_read_type_e block)
+apr_status_t h2_beam_leave(h2_bucket_beam *beam)
+{
+ h2_beam_lock bl;
+
+ if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
+ recv_buffer_cleanup(beam, &bl);
+ beam->aborted = 1;
+ beam_close(beam);
+ leave_yellow(beam, &bl);
+ }
+ return APR_SUCCESS;
+}
+
+apr_status_t h2_beam_wait_empty(h2_bucket_beam *beam, apr_read_type_e block)
{
apr_status_t status;
h2_beam_lock bl;
if ((status = enter_yellow(beam, &bl)) == APR_SUCCESS) {
- r_purge_reds(beam);
- h2_blist_cleanup(&beam->red);
- beam_close(beam);
- report_consumption(beam);
-
- while (status == APR_SUCCESS
- && (!H2_BPROXY_LIST_EMPTY(&beam->proxies)
- || (beam->green && !APR_BRIGADE_EMPTY(beam->green)))) {
- if (block == APR_NONBLOCK_READ || !bl.mutex) {
- status = APR_EAGAIN;
- break;
- }
- status = wait_cond(beam, bl.mutex);
- }
+ status = wait_empty(beam, block, bl.mutex);
leave_yellow(beam, &bl);
}
return status;
}
+static void move_to_hold(h2_bucket_beam *beam,
+ apr_bucket_brigade *sender_bb)
+{
+ apr_bucket *b;
+ while (sender_bb && !APR_BRIGADE_EMPTY(sender_bb)) {
+ b = APR_BRIGADE_FIRST(sender_bb);
+ APR_BUCKET_REMOVE(b);
+ H2_BLIST_INSERT_TAIL(&beam->send_list, b);
+ }
+}
+
static apr_status_t append_bucket(h2_bucket_beam *beam,
- apr_bucket *bred,
+ apr_bucket *b,
apr_read_type_e block,
- apr_pool_t *pool,
+ apr_size_t *pspace_left,
h2_beam_lock *pbl)
{
const char *data;
apr_size_t len;
- apr_off_t space_left = 0;
apr_status_t status;
+ int can_beam, check_len;
+
+ if (beam->aborted) {
+ return APR_ECONNABORTED;
+ }
- if (APR_BUCKET_IS_METADATA(bred)) {
- if (APR_BUCKET_IS_EOS(bred)) {
+ if (APR_BUCKET_IS_METADATA(b)) {
+ if (APR_BUCKET_IS_EOS(b)) {
beam->closed = 1;
}
- APR_BUCKET_REMOVE(bred);
- H2_BLIST_INSERT_TAIL(&beam->red, bred);
+ APR_BUCKET_REMOVE(b);
+ H2_BLIST_INSERT_TAIL(&beam->send_list, b);
return APR_SUCCESS;
}
- else if (APR_BUCKET_IS_FILE(bred)) {
- /* file bucket lengths do not really count */
+ else if (APR_BUCKET_IS_FILE(b)) {
+ /* For file buckets the problem is their internal readpool that
+ * is used on the first read to allocate buffer/mmap.
+ * Since setting aside a file bucket will de-register the
+ * file cleanup function from the previous pool, we need to
+ * call that only from the sender thread.
+ *
+ * Currently, we do not handle file bucket with refcount > 1 as
+ * the beam is then not in complete control of the file's lifetime.
+ * Which results in the bug that a file get closed by the receiver
+ * while the sender or the beam still have buckets using it.
+ *
+ * Additionally, we allow callbacks to prevent beaming file
+ * handles across. The use case for this is to limit the number
+ * of open file handles and rather use a less efficient beam
+ * transport. */
+ apr_bucket_file *bf = b->data;
+ apr_file_t *fd = bf->fd;
+ can_beam = (bf->refcount.refcount == 1);
+ if (can_beam && beam->can_beam_fn) {
+ can_beam = beam->can_beam_fn(beam->can_beam_ctx, beam, fd);
+ }
+ check_len = !can_beam;
}
else {
- space_left = calc_space_left(beam);
- if (space_left > 0 && bred->length == ((apr_size_t)-1)) {
+ if (b->length == ((apr_size_t)-1)) {
const char *data;
- status = apr_bucket_read(bred, &data, &len, APR_BLOCK_READ);
+ status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
if (status != APR_SUCCESS) {
return status;
}
}
-
- if (space_left < bred->length) {
- status = r_wait_space(beam, block, pbl, &space_left);
- if (status != APR_SUCCESS) {
- return status;
- }
- if (space_left <= 0) {
- return APR_EAGAIN;
- }
- }
- /* space available, maybe need bucket split */
+ check_len = 1;
}
+ if (check_len) {
+ if (b->length > *pspace_left) {
+ apr_bucket_split(b, *pspace_left);
+ }
+ *pspace_left -= b->length;
+ }
- /* The fundamental problem is that reading a red bucket from
- * a green thread is a total NO GO, because the bucket might use
+ /* The fundamental problem is that reading a sender bucket from
+ * a receiver thread is a total NO GO, because the bucket might use
* its pool/bucket_alloc from a foreign thread and that will
* corrupt. */
status = APR_ENOTIMPL;
- if (beam->closed && bred->length > 0) {
- status = APR_EOF;
- }
- else if (APR_BUCKET_IS_TRANSIENT(bred)) {
+ if (APR_BUCKET_IS_TRANSIENT(b)) {
/* this takes care of transient buckets and converts them
* into heap ones. Other bucket types might or might not be
* affected by this. */
- status = apr_bucket_setaside(bred, pool);
+ status = apr_bucket_setaside(b, beam->send_pool);
}
- else if (APR_BUCKET_IS_HEAP(bred)) {
- /* For heap buckets read from a green thread is fine. The
+ else if (APR_BUCKET_IS_HEAP(b)) {
+ /* For heap buckets read from a receiver thread is fine. The
* data will be there and live until the bucket itself is
* destroyed. */
status = APR_SUCCESS;
}
- else if (APR_BUCKET_IS_POOL(bred)) {
+ else if (APR_BUCKET_IS_POOL(b)) {
/* pool buckets are bastards that register at pool cleanup
* to morph themselves into heap buckets. That may happen anytime,
* even after the bucket data pointer has been read. So at
- * any time inside the green thread, the pool bucket memory
+ * any time inside the receiver thread, the pool bucket memory
* may disappear. yikes. */
- status = apr_bucket_read(bred, &data, &len, APR_BLOCK_READ);
+ status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
if (status == APR_SUCCESS) {
- apr_bucket_heap_make(bred, data, len, NULL);
+ apr_bucket_heap_make(b, data, len, NULL);
}
}
- else if (APR_BUCKET_IS_FILE(bred)) {
- /* For file buckets the problem is their internal readpool that
- * is used on the first read to allocate buffer/mmap.
- * Since setting aside a file bucket will de-register the
- * file cleanup function from the previous pool, we need to
- * call that from a red thread.
- * Additionally, we allow callbacks to prevent beaming file
- * handles across. The use case for this is to limit the number
- * of open file handles and rather use a less efficient beam
- * transport. */
- apr_file_t *fd = ((apr_bucket_file *)bred->data)->fd;
- int can_beam = 1;
- if (beam->last_beamed != fd && beam->can_beam_fn) {
- can_beam = beam->can_beam_fn(beam->can_beam_ctx, beam, fd);
- }
- if (can_beam) {
- beam->last_beamed = fd;
- status = apr_bucket_setaside(bred, pool);
- }
- /* else: enter ENOTIMPL case below */
+ else if (APR_BUCKET_IS_FILE(b) && can_beam) {
+ status = apr_bucket_setaside(b, beam->send_pool);
}
if (status == APR_ENOTIMPL) {
* but hope that after read, its data stays immutable for the
* lifetime of the bucket. (see pool bucket handling above for
* a counter example).
- * We do the read while in a red thread, so that the bucket may
+ * We do the read while in the sender thread, so that the bucket may
* use pools/allocators safely. */
- if (space_left < APR_BUCKET_BUFF_SIZE) {
- space_left = APR_BUCKET_BUFF_SIZE;
- }
- if (space_left < bred->length) {
- apr_bucket_split(bred, space_left);
- }
- status = apr_bucket_read(bred, &data, &len, APR_BLOCK_READ);
+ status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
if (status == APR_SUCCESS) {
- status = apr_bucket_setaside(bred, pool);
+ status = apr_bucket_setaside(b, beam->send_pool);
}
}
return status;
}
- APR_BUCKET_REMOVE(bred);
- H2_BLIST_INSERT_TAIL(&beam->red, bred);
- beam->sent_bytes += bred->length;
-
+ APR_BUCKET_REMOVE(b);
+ H2_BLIST_INSERT_TAIL(&beam->send_list, b);
+ beam->sent_bytes += b->length;
+
return APR_SUCCESS;
}
+void h2_beam_send_from(h2_bucket_beam *beam, apr_pool_t *p)
+{
+ h2_beam_lock bl;
+ /* Called from the sender thread to add buckets to the beam */
+ if (enter_yellow(beam, &bl) == APR_SUCCESS) {
+ r_purge_sent(beam);
+ beam_set_send_pool(beam, p);
+ leave_yellow(beam, &bl);
+ }
+}
+
apr_status_t h2_beam_send(h2_bucket_beam *beam,
- apr_bucket_brigade *red_brigade,
+ apr_bucket_brigade *sender_bb,
apr_read_type_e block)
{
- apr_bucket *bred;
- apr_status_t status = APR_SUCCESS;
+ apr_bucket *b;
+ apr_status_t rv = APR_SUCCESS;
+ apr_size_t space_left = 0;
h2_beam_lock bl;
- /* Called from the red thread to add buckets to the beam */
+ /* Called from the sender thread to add buckets to the beam */
if (enter_yellow(beam, &bl) == APR_SUCCESS) {
- r_purge_reds(beam);
+ ap_assert(beam->send_pool);
+ r_purge_sent(beam);
if (beam->aborted) {
- status = APR_ECONNABORTED;
+ move_to_hold(beam, sender_bb);
+ rv = APR_ECONNABORTED;
}
- else if (red_brigade) {
- while (!APR_BRIGADE_EMPTY(red_brigade)
- && status == APR_SUCCESS) {
- bred = APR_BRIGADE_FIRST(red_brigade);
- status = append_bucket(beam, bred, block, beam->red_pool, &bl);
- }
- if (beam->m_cond) {
- apr_thread_cond_broadcast(beam->m_cond);
+ else if (sender_bb) {
+ int force_report = !APR_BRIGADE_EMPTY(sender_bb);
+
+ space_left = calc_space_left(beam);
+ while (!APR_BRIGADE_EMPTY(sender_bb) && APR_SUCCESS == rv) {
+ if (space_left <= 0) {
+ report_prod_io(beam, force_report, &bl);
+ rv = wait_not_full(beam, block, &space_left, &bl);
+ if (APR_SUCCESS != rv) {
+ break;
+ }
+ }
+ b = APR_BRIGADE_FIRST(sender_bb);
+ rv = append_bucket(beam, b, block, &space_left, &bl);
}
+
+ report_prod_io(beam, force_report, &bl);
+ apr_thread_cond_broadcast(beam->change);
}
- report_consumption(beam);
+ report_consumption(beam, &bl);
leave_yellow(beam, &bl);
}
- return status;
+ return rv;
}
apr_status_t h2_beam_receive(h2_bucket_beam *beam,
apr_off_t readbytes)
{
h2_beam_lock bl;
- apr_bucket *bred, *bgreen, *ng;
+ apr_bucket *bsender, *brecv, *ng;
int transferred = 0;
apr_status_t status = APR_SUCCESS;
- apr_off_t remain = readbytes;
+ apr_off_t remain;
+ int transferred_buckets = 0;
- /* Called from the green thread to take buckets from the beam */
+ /* Called from the receiver thread to take buckets from the beam */
if (enter_yellow(beam, &bl) == APR_SUCCESS) {
+ if (readbytes <= 0) {
+ readbytes = APR_SIZE_MAX;
+ }
+ remain = readbytes;
+
transfer:
if (beam->aborted) {
+ recv_buffer_cleanup(beam, &bl);
status = APR_ECONNABORTED;
goto leave;
}
- /* transfer enough buckets from our green brigade, if we have one */
- while (beam->green
- && !APR_BRIGADE_EMPTY(beam->green)
- && (readbytes <= 0 || remain >= 0)) {
- bgreen = APR_BRIGADE_FIRST(beam->green);
- if (readbytes > 0 && bgreen->length > 0 && remain <= 0) {
+ /* transfer enough buckets from our receiver brigade, if we have one */
+ while (remain >= 0
+ && beam->recv_buffer
+ && !APR_BRIGADE_EMPTY(beam->recv_buffer)) {
+
+ brecv = APR_BRIGADE_FIRST(beam->recv_buffer);
+ if (brecv->length > 0 && remain <= 0) {
break;
}
- APR_BUCKET_REMOVE(bgreen);
- APR_BRIGADE_INSERT_TAIL(bb, bgreen);
- remain -= bgreen->length;
+ APR_BUCKET_REMOVE(brecv);
+ APR_BRIGADE_INSERT_TAIL(bb, brecv);
+ remain -= brecv->length;
++transferred;
}
- /* transfer from our red brigade, transforming red buckets to
- * green ones until we have enough */
- while (!H2_BLIST_EMPTY(&beam->red) && (readbytes <= 0 || remain >= 0)) {
- bred = H2_BLIST_FIRST(&beam->red);
- bgreen = NULL;
-
- if (readbytes > 0 && bred->length > 0 && remain <= 0) {
+ /* transfer from our sender brigade, transforming sender buckets to
+ * receiver ones until we have enough */
+ while (remain >= 0 && !H2_BLIST_EMPTY(&beam->send_list)) {
+
+ brecv = NULL;
+ bsender = H2_BLIST_FIRST(&beam->send_list);
+ if (bsender->length > 0 && remain <= 0) {
break;
}
- if (APR_BUCKET_IS_METADATA(bred)) {
- if (APR_BUCKET_IS_EOS(bred)) {
+ if (APR_BUCKET_IS_METADATA(bsender)) {
+ if (APR_BUCKET_IS_EOS(bsender)) {
+ brecv = apr_bucket_eos_create(bb->bucket_alloc);
beam->close_sent = 1;
- bgreen = apr_bucket_eos_create(bb->bucket_alloc);
}
- else if (APR_BUCKET_IS_FLUSH(bred)) {
- bgreen = apr_bucket_flush_create(bb->bucket_alloc);
+ else if (APR_BUCKET_IS_FLUSH(bsender)) {
+ brecv = apr_bucket_flush_create(bb->bucket_alloc);
}
- else {
- /* put red into hold, no green sent out */
+ else if (AP_BUCKET_IS_ERROR(bsender)) {
+ ap_bucket_error *eb = (ap_bucket_error *)bsender;
+ brecv = ap_bucket_error_create(eb->status, eb->data,
+ bb->p, bb->bucket_alloc);
}
}
- else if (APR_BUCKET_IS_FILE(bred)) {
+ else if (bsender->length == 0) {
+ APR_BUCKET_REMOVE(bsender);
+ H2_BLIST_INSERT_TAIL(&beam->hold_list, bsender);
+ continue;
+ }
+ else if (APR_BUCKET_IS_FILE(bsender)) {
/* This is set aside into the target brigade pool so that
* any read operation messes with that pool and not
- * the red one. */
- apr_bucket_file *f = (apr_bucket_file *)bred->data;
+ * the sender one. */
+ apr_bucket_file *f = (apr_bucket_file *)bsender->data;
apr_file_t *fd = f->fd;
int setaside = (f->readpool != bb->p);
}
++beam->files_beamed;
}
- ng = apr_brigade_insert_file(bb, fd, bred->start, bred->length,
+ ng = apr_brigade_insert_file(bb, fd, bsender->start, bsender->length,
bb->p);
#if APR_HAS_MMAP
/* disable mmap handling as this leads to segfaults when
* been handed out. See also PR 59348 */
apr_bucket_file_enable_mmap(ng, 0);
#endif
- remain -= bred->length;
+ APR_BUCKET_REMOVE(bsender);
+ H2_BLIST_INSERT_TAIL(&beam->hold_list, bsender);
+
+ remain -= bsender->length;
++transferred;
+ ++transferred_buckets;
+ continue;
}
else {
- /* create a "green" standin bucket. we took care about the
- * underlying red bucket and its data when we placed it into
- * the red brigade.
- * the beam bucket will notify us on destruction that bred is
+ /* create a "receiver" standin bucket. we took care about the
+ * underlying sender bucket and its data when we placed it into
+ * the sender brigade.
+ * the beam bucket will notify us on destruction that bsender is
* no longer needed. */
- bgreen = h2_beam_bucket_create(beam, bred, bb->bucket_alloc,
+ brecv = h2_beam_bucket_create(beam, bsender, bb->bucket_alloc,
beam->buckets_sent++);
}
- /* Place the red bucket into our hold, to be destroyed when no
- * green bucket references it any more. */
- APR_BUCKET_REMOVE(bred);
- H2_BLIST_INSERT_TAIL(&beam->hold, bred);
- beam->received_bytes += bred->length;
- if (bgreen) {
- APR_BRIGADE_INSERT_TAIL(bb, bgreen);
- remain -= bgreen->length;
+ /* Place the sender bucket into our hold, to be destroyed when no
+ * receiver bucket references it any more. */
+ APR_BUCKET_REMOVE(bsender);
+ H2_BLIST_INSERT_TAIL(&beam->hold_list, bsender);
+
+ beam->received_bytes += bsender->length;
+ ++transferred_buckets;
+
+ if (brecv) {
+ APR_BRIGADE_INSERT_TAIL(bb, brecv);
+ remain -= brecv->length;
++transferred;
}
+ else {
+ /* let outside hook determine how bucket is beamed */
+ leave_yellow(beam, &bl);
+ brecv = h2_beam_bucket(beam, bb, bsender);
+ enter_yellow(beam, &bl);
+
+ while (brecv && brecv != APR_BRIGADE_SENTINEL(bb)) {
+ ++transferred;
+ remain -= brecv->length;
+ brecv = APR_BUCKET_NEXT(brecv);
+ }
+ }
}
- if (readbytes > 0 && remain < 0) {
- /* too much, put some back */
+ if (remain < 0) {
+ /* too much, put some back into out recv_buffer */
remain = readbytes;
- for (bgreen = APR_BRIGADE_FIRST(bb);
- bgreen != APR_BRIGADE_SENTINEL(bb);
- bgreen = APR_BUCKET_NEXT(bgreen)) {
- remain -= bgreen->length;
- if (remain < 0) {
- apr_bucket_split(bgreen, bgreen->length+remain);
- beam->green = apr_brigade_split_ex(bb,
- APR_BUCKET_NEXT(bgreen),
- beam->green);
- break;
- }
+ for (brecv = APR_BRIGADE_FIRST(bb);
+ brecv != APR_BRIGADE_SENTINEL(bb);
+ brecv = APR_BUCKET_NEXT(brecv)) {
+ remain -= (beam->tx_mem_limits? bucket_mem_used(brecv)
+ : brecv->length);
+ if (remain < 0) {
+ apr_bucket_split(brecv, brecv->length+remain);
+ beam->recv_buffer = apr_brigade_split_ex(bb,
+ APR_BUCKET_NEXT(brecv),
+ beam->recv_buffer);
+ break;
+ }
}
}
-
- if (transferred) {
- status = APR_SUCCESS;
- }
- else if (beam->closed) {
+
+ if (beam->closed && buffer_is_empty(beam)) {
+ /* beam is closed and we have nothing more to receive */
if (!beam->close_sent) {
apr_bucket *b = apr_bucket_eos_create(bb->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(bb, b);
beam->close_sent = 1;
+ ++transferred;
status = APR_SUCCESS;
}
- else {
- status = APR_EOF;
+ }
+
+ if (transferred_buckets > 0) {
+ if (beam->cons_ev_cb) {
+ beam->cons_ev_cb(beam->cons_ctx, beam);
}
}
- else if (block == APR_BLOCK_READ && bl.mutex && beam->m_cond) {
- status = wait_cond(beam, bl.mutex);
+
+ if (transferred) {
+ apr_thread_cond_broadcast(beam->change);
+ status = APR_SUCCESS;
+ }
+ else {
+ status = wait_not_empty(beam, block, bl.mutex);
if (status != APR_SUCCESS) {
goto leave;
}
goto transfer;
}
- else {
- status = APR_EAGAIN;
- }
leave:
leave_yellow(beam, &bl);
}
}
void h2_beam_on_consumed(h2_bucket_beam *beam,
- h2_beam_consumed_callback *cb, void *ctx)
+ h2_beam_ev_callback *ev_cb,
+ h2_beam_io_callback *io_cb, void *ctx)
+{
+ h2_beam_lock bl;
+ if (enter_yellow(beam, &bl) == APR_SUCCESS) {
+ beam->cons_ev_cb = ev_cb;
+ beam->cons_io_cb = io_cb;
+ beam->cons_ctx = ctx;
+ leave_yellow(beam, &bl);
+ }
+}
+
+void h2_beam_on_produced(h2_bucket_beam *beam,
+ h2_beam_io_callback *io_cb, void *ctx)
{
h2_beam_lock bl;
-
if (enter_yellow(beam, &bl) == APR_SUCCESS) {
- beam->consumed_fn = cb;
- beam->consumed_ctx = ctx;
+ beam->prod_io_cb = io_cb;
+ beam->prod_ctx = ctx;
leave_yellow(beam, &bl);
}
}
apr_off_t l = 0;
h2_beam_lock bl;
- if (enter_yellow(beam, &bl) == APR_SUCCESS) {
- for (b = H2_BLIST_FIRST(&beam->red);
- b != H2_BLIST_SENTINEL(&beam->red);
+ if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
+ for (b = H2_BLIST_FIRST(&beam->send_list);
+ b != H2_BLIST_SENTINEL(&beam->send_list);
b = APR_BUCKET_NEXT(b)) {
/* should all have determinate length */
l += b->length;
apr_off_t l = 0;
h2_beam_lock bl;
- if (enter_yellow(beam, &bl) == APR_SUCCESS) {
- for (b = H2_BLIST_FIRST(&beam->red);
- b != H2_BLIST_SENTINEL(&beam->red);
+ if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
+ for (b = H2_BLIST_FIRST(&beam->send_list);
+ b != H2_BLIST_SENTINEL(&beam->send_list);
b = APR_BUCKET_NEXT(b)) {
- if (APR_BUCKET_IS_FILE(b)) {
- /* do not count */
- }
- else {
- /* should all have determinate length */
- l += b->length;
- }
+ l += bucket_mem_used(b);
}
leave_yellow(beam, &bl);
}
int empty = 1;
h2_beam_lock bl;
- if (enter_yellow(beam, &bl) == APR_SUCCESS) {
- empty = (H2_BLIST_EMPTY(&beam->red)
- && (!beam->green || APR_BRIGADE_EMPTY(beam->green)));
+ if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
+ empty = (H2_BLIST_EMPTY(&beam->send_list)
+ && (!beam->recv_buffer || APR_BRIGADE_EMPTY(beam->recv_buffer)));
leave_yellow(beam, &bl);
}
return empty;
}
-int h2_beam_closed(h2_bucket_beam *beam)
+int h2_beam_holds_proxies(h2_bucket_beam *beam)
{
- return beam->closed;
+ int has_proxies = 1;
+ h2_beam_lock bl;
+
+ if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
+ has_proxies = !H2_BPROXY_LIST_EMPTY(&beam->proxies);
+ leave_yellow(beam, &bl);
+ }
+ return has_proxies;
}
int h2_beam_was_received(h2_bucket_beam *beam)
int happend = 0;
h2_beam_lock bl;
- if (enter_yellow(beam, &bl) == APR_SUCCESS) {
+ if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
happend = (beam->received_bytes > 0);
leave_yellow(beam, &bl);
}
apr_size_t n = 0;
h2_beam_lock bl;
- if (enter_yellow(beam, &bl) == APR_SUCCESS) {
+ if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
n = beam->files_beamed;
leave_yellow(beam, &bl);
}
return n;
}
+int h2_beam_no_files(void *ctx, h2_bucket_beam *beam, apr_file_t *file)
+{
+ return 0;
+}
+
+int h2_beam_report_consumption(h2_bucket_beam *beam)
+{
+ h2_beam_lock bl;
+ int rv = 0;
+ if (enter_yellow(beam, &bl) == APR_SUCCESS) {
+ rv = report_consumption(beam, &bl);
+ leave_yellow(beam, &bl);
+ }
+ return rv;
+}
+
+void h2_beam_log(h2_bucket_beam *beam, conn_rec *c, int level, const char *msg)
+{
+ if (beam && APLOG_C_IS_LEVEL(c,level)) {
+ ap_log_cerror(APLOG_MARK, level, 0, c,
+ "beam(%ld-%d,%s,closed=%d,aborted=%d,empty=%d,buf=%ld): %s",
+ (c->master? c->master->id : c->id), beam->id, beam->tag,
+ beam->closed, beam->aborted, h2_beam_empty(beam),
+ (long)h2_beam_get_buffered(beam), msg);
+ }
+}
+
+