1 /* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
17 #include <apr_strings.h>
19 #include <apr_buckets.h>
20 #include <apr_thread_mutex.h>
21 #include <apr_thread_cond.h>
26 #include "h2_private.h"
28 #include "h2_bucket_beam.h"
30 static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy);
32 #define H2_BPROXY_NEXT(e) APR_RING_NEXT((e), link)
33 #define H2_BPROXY_PREV(e) APR_RING_PREV((e), link)
34 #define H2_BPROXY_REMOVE(e) APR_RING_REMOVE((e), link)
36 #define H2_BPROXY_LIST_INIT(b) APR_RING_INIT(&(b)->list, h2_beam_proxy, link);
37 #define H2_BPROXY_LIST_SENTINEL(b) APR_RING_SENTINEL(&(b)->list, h2_beam_proxy, link)
38 #define H2_BPROXY_LIST_EMPTY(b) APR_RING_EMPTY(&(b)->list, h2_beam_proxy, link)
39 #define H2_BPROXY_LIST_FIRST(b) APR_RING_FIRST(&(b)->list)
40 #define H2_BPROXY_LIST_LAST(b) APR_RING_LAST(&(b)->list)
41 #define H2_PROXY_BLIST_INSERT_HEAD(b, e) do { \
42 h2_beam_proxy *ap__b = (e); \
43 APR_RING_INSERT_HEAD(&(b)->list, ap__b, h2_beam_proxy, link); \
45 #define H2_BPROXY_LIST_INSERT_TAIL(b, e) do { \
46 h2_beam_proxy *ap__b = (e); \
47 APR_RING_INSERT_TAIL(&(b)->list, ap__b, h2_beam_proxy, link); \
49 #define H2_BPROXY_LIST_CONCAT(a, b) do { \
50 APR_RING_CONCAT(&(a)->list, &(b)->list, h2_beam_proxy, link); \
52 #define H2_BPROXY_LIST_PREPEND(a, b) do { \
53 APR_RING_PREPEND(&(a)->list, &(b)->list, h2_beam_proxy, link); \
57 /*******************************************************************************
58 * beam bucket with reference to beam and bucket it represents
59 ******************************************************************************/
61 const apr_bucket_type_t h2_bucket_type_beam;
63 #define H2_BUCKET_IS_BEAM(e) (e->type == &h2_bucket_type_beam)
65 struct h2_beam_proxy {
66 apr_bucket_refcount refcount;
67 APR_RING_ENTRY(h2_beam_proxy) link;
73 static const char Dummy = '\0';
75 static apr_status_t beam_bucket_read(apr_bucket *b, const char **str,
76 apr_size_t *len, apr_read_type_e block)
78 h2_beam_proxy *d = b->data;
81 apr_status_t status = apr_bucket_read(d->bred, &data, len, block);
82 if (status == APR_SUCCESS) {
83 *str = data + b->start;
90 return APR_ECONNRESET;
93 static void beam_bucket_destroy(void *data)
95 h2_beam_proxy *d = data;
97 if (apr_bucket_shared_destroy(d)) {
98 /* When the beam gets destroyed before this bucket, it will
99 * NULLify its reference here. This is not protected by a mutex,
100 * so it will not help with race conditions.
101 * But it lets us shut down memory pool with circulare beam
104 h2_beam_emitted(d->beam, d);
110 static apr_bucket * h2_beam_bucket_make(apr_bucket *b,
111 h2_bucket_beam *beam,
112 apr_bucket *bred, apr_size_t n)
116 d = apr_bucket_alloc(sizeof(*d), b->list);
117 H2_BPROXY_LIST_INSERT_TAIL(&beam->proxies, d);
122 b = apr_bucket_shared_make(b, d, 0, bred? bred->length : 0);
123 b->type = &h2_bucket_type_beam;
128 static apr_bucket *h2_beam_bucket_create(h2_bucket_beam *beam,
130 apr_bucket_alloc_t *list,
133 apr_bucket *b = apr_bucket_alloc(sizeof(*b), list);
136 b->free = apr_bucket_free;
138 return h2_beam_bucket_make(b, beam, bred, n);
141 /*static apr_status_t beam_bucket_setaside(apr_bucket *b, apr_pool_t *pool)
143 apr_status_t status = APR_SUCCESS;
144 h2_beam_proxy *d = b->data;
149 status = apr_bucket_read(d->bred, &data, &len, APR_BLOCK_READ);
150 if (status == APR_SUCCESS) {
151 b = apr_bucket_heap_make(b, (char *)data + b->start, b->length, NULL);
160 const apr_bucket_type_t h2_bucket_type_beam = {
161 "BEAM", 5, APR_BUCKET_DATA,
164 apr_bucket_setaside_noop,
165 apr_bucket_shared_split,
166 apr_bucket_shared_copy
169 /*******************************************************************************
170 * h2_blist, a brigade without allocations
171 ******************************************************************************/
173 apr_size_t h2_util_bl_print(char *buffer, apr_size_t bmax,
174 const char *tag, const char *sep,
182 memset(buffer, 0, bmax--);
183 off += apr_snprintf(buffer+off, bmax-off, "%s(", tag);
184 for (b = H2_BLIST_FIRST(bl);
185 bmax && (b != H2_BLIST_SENTINEL(bl));
186 b = APR_BUCKET_NEXT(b)) {
188 off += h2_util_bucket_print(buffer+off, bmax-off, b, sp);
191 off += apr_snprintf(buffer+off, bmax-off, ")%s", sep);
194 off += apr_snprintf(buffer+off, bmax-off, "%s(null)%s", tag, sep);
201 /*******************************************************************************
202 * bucket beam that can transport buckets across threads
203 ******************************************************************************/
205 static apr_status_t enter_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl)
207 h2_beam_mutex_enter *enter = beam->m_enter;
209 void *ctx = beam->m_ctx;
211 return enter(ctx, pbl);
219 static void leave_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl)
222 pbl->leave(pbl->leave_ctx, pbl->mutex);
226 static apr_off_t calc_buffered(h2_bucket_beam *beam)
230 for (b = H2_BLIST_FIRST(&beam->red);
231 b != H2_BLIST_SENTINEL(&beam->red);
232 b = APR_BUCKET_NEXT(b)) {
233 if (b->length == ((apr_size_t)-1)) {
236 else if (APR_BUCKET_IS_FILE(b)) {
237 /* if unread, has no real mem footprint. how to test? */
246 static void r_purge_reds(h2_bucket_beam *beam)
249 /* delete all red buckets in purge brigade, needs to be called
250 * from red thread only */
251 while (!H2_BLIST_EMPTY(&beam->purge)) {
252 bred = H2_BLIST_FIRST(&beam->purge);
253 apr_bucket_delete(bred);
257 static apr_size_t calc_space_left(h2_bucket_beam *beam)
259 if (beam->max_buf_size > 0) {
260 apr_off_t len = calc_buffered(beam);
261 return (beam->max_buf_size > len? (beam->max_buf_size - len) : 0);
266 static apr_status_t wait_cond(h2_bucket_beam *beam, apr_thread_mutex_t *lock)
268 if (beam->timeout > 0) {
269 return apr_thread_cond_timedwait(beam->m_cond, lock, beam->timeout);
272 return apr_thread_cond_wait(beam->m_cond, lock);
276 static apr_status_t r_wait_space(h2_bucket_beam *beam, apr_read_type_e block,
277 h2_beam_lock *pbl, apr_off_t *premain)
279 *premain = calc_space_left(beam);
280 while (!beam->aborted && *premain <= 0
281 && (block == APR_BLOCK_READ) && pbl->mutex) {
282 apr_status_t status = wait_cond(beam, pbl->mutex);
283 if (APR_STATUS_IS_TIMEUP(status)) {
287 *premain = calc_space_left(beam);
289 return beam->aborted? APR_ECONNABORTED : APR_SUCCESS;
292 static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy)
295 apr_bucket *b, *next;
297 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
298 /* even when beam buckets are split, only the one where
299 * refcount drops to 0 will call us */
300 H2_BPROXY_REMOVE(proxy);
301 /* invoked from green thread, the last beam bucket for the red
302 * bucket bred is about to be destroyed.
303 * remove it from the hold, where it should be now */
305 for (b = H2_BLIST_FIRST(&beam->hold);
306 b != H2_BLIST_SENTINEL(&beam->hold);
307 b = APR_BUCKET_NEXT(b)) {
308 if (b == proxy->bred) {
312 if (b != H2_BLIST_SENTINEL(&beam->hold)) {
313 /* bucket is in hold as it should be, mark this one
314 * and all before it for purging. We might have placed meta
315 * buckets without a green proxy into the hold before it
316 * and schedule them for purging now */
317 for (b = H2_BLIST_FIRST(&beam->hold);
318 b != H2_BLIST_SENTINEL(&beam->hold);
320 next = APR_BUCKET_NEXT(b);
321 if (b == proxy->bred) {
322 APR_BUCKET_REMOVE(b);
323 H2_BLIST_INSERT_TAIL(&beam->purge, b);
326 else if (APR_BUCKET_IS_METADATA(b)) {
327 APR_BUCKET_REMOVE(b);
328 H2_BLIST_INSERT_TAIL(&beam->purge, b);
331 /* another data bucket before this one in hold. this
332 * is normal since DATA buckets need not be destroyed
340 /* it should be there unless we screwed up */
341 ap_log_perror(APLOG_MARK, APLOG_WARNING, 0, beam->red_pool,
342 APLOGNO(03384) "h2_beam(%d-%s): emitted bucket not "
343 "in hold, n=%d", beam->id, beam->tag,
345 AP_DEBUG_ASSERT(!proxy->bred);
348 /* notify anyone waiting on space to become available */
352 else if (beam->m_cond) {
353 apr_thread_cond_broadcast(beam->m_cond);
355 leave_yellow(beam, &bl);
359 static void report_consumption(h2_bucket_beam *beam, int force)
361 if (force || beam->received_bytes != beam->reported_consumed_bytes) {
362 if (beam->consumed_fn) {
363 beam->consumed_fn(beam->consumed_ctx, beam, beam->received_bytes
364 - beam->reported_consumed_bytes);
366 beam->reported_consumed_bytes = beam->received_bytes;
370 static void report_production(h2_bucket_beam *beam, int force)
372 if (force || beam->sent_bytes != beam->reported_produced_bytes) {
373 if (beam->produced_fn) {
374 beam->produced_fn(beam->produced_ctx, beam, beam->sent_bytes
375 - beam->reported_produced_bytes);
377 beam->reported_produced_bytes = beam->sent_bytes;
381 static void h2_blist_cleanup(h2_blist *bl)
385 while (!H2_BLIST_EMPTY(bl)) {
386 e = H2_BLIST_FIRST(bl);
387 apr_bucket_delete(e);
391 static apr_status_t beam_close(h2_bucket_beam *beam)
396 apr_thread_cond_broadcast(beam->m_cond);
402 static apr_status_t beam_cleanup(void *data)
404 h2_bucket_beam *beam = data;
408 h2_blist_cleanup(&beam->red);
409 report_consumption(beam, 0);
410 while (!H2_BPROXY_LIST_EMPTY(&beam->proxies)) {
411 h2_beam_proxy *proxy = H2_BPROXY_LIST_FIRST(&beam->proxies);
412 H2_BPROXY_REMOVE(proxy);
416 h2_blist_cleanup(&beam->purge);
417 h2_blist_cleanup(&beam->hold);
422 apr_status_t h2_beam_destroy(h2_bucket_beam *beam)
424 apr_pool_cleanup_kill(beam->red_pool, beam, beam_cleanup);
425 return beam_cleanup(beam);
428 apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *red_pool,
429 int id, const char *tag,
430 apr_size_t max_buf_size)
432 h2_bucket_beam *beam;
433 apr_status_t status = APR_SUCCESS;
435 beam = apr_pcalloc(red_pool, sizeof(*beam));
442 H2_BLIST_INIT(&beam->red);
443 H2_BLIST_INIT(&beam->hold);
444 H2_BLIST_INIT(&beam->purge);
445 H2_BPROXY_LIST_INIT(&beam->proxies);
446 beam->red_pool = red_pool;
447 beam->max_buf_size = max_buf_size;
449 apr_pool_pre_cleanup_register(red_pool, beam, beam_cleanup);
455 void h2_beam_buffer_size_set(h2_bucket_beam *beam, apr_size_t buffer_size)
459 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
460 beam->max_buf_size = buffer_size;
461 leave_yellow(beam, &bl);
465 apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam)
468 apr_size_t buffer_size = 0;
470 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
471 buffer_size = beam->max_buf_size;
472 leave_yellow(beam, &bl);
477 void h2_beam_mutex_set(h2_bucket_beam *beam,
478 h2_beam_mutex_enter m_enter,
479 apr_thread_cond_t *cond,
484 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
485 beam->m_enter = m_enter;
488 leave_yellow(beam, &bl);
492 void h2_beam_timeout_set(h2_bucket_beam *beam, apr_interval_time_t timeout)
496 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
497 beam->timeout = timeout;
498 leave_yellow(beam, &bl);
502 apr_interval_time_t h2_beam_timeout_get(h2_bucket_beam *beam)
505 apr_interval_time_t timeout = 0;
507 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
508 timeout = beam->timeout;
509 leave_yellow(beam, &bl);
514 void h2_beam_abort(h2_bucket_beam *beam)
518 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
520 h2_blist_cleanup(&beam->red);
522 report_consumption(beam, 0);
524 apr_thread_cond_broadcast(beam->m_cond);
526 leave_yellow(beam, &bl);
530 apr_status_t h2_beam_close(h2_bucket_beam *beam)
534 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
537 report_consumption(beam, 0);
538 leave_yellow(beam, &bl);
540 return beam->aborted? APR_ECONNABORTED : APR_SUCCESS;
543 apr_status_t h2_beam_shutdown(h2_bucket_beam *beam, apr_read_type_e block,
549 if ((status = enter_yellow(beam, &bl)) == APR_SUCCESS) {
552 h2_blist_cleanup(&beam->red);
553 if (!bl.mutex && beam->green) {
554 /* not protected, may process green in red call */
555 apr_brigade_destroy(beam->green);
561 while (status == APR_SUCCESS
562 && (!H2_BPROXY_LIST_EMPTY(&beam->proxies)
563 || (beam->green && !APR_BRIGADE_EMPTY(beam->green)))) {
564 if (block == APR_NONBLOCK_READ || !bl.mutex) {
569 apr_thread_cond_broadcast(beam->m_cond);
571 status = wait_cond(beam, bl.mutex);
573 leave_yellow(beam, &bl);
578 static apr_status_t append_bucket(h2_bucket_beam *beam,
580 apr_read_type_e block,
586 apr_off_t space_left = 0;
589 if (APR_BUCKET_IS_METADATA(bred)) {
590 if (APR_BUCKET_IS_EOS(bred)) {
593 APR_BUCKET_REMOVE(bred);
594 H2_BLIST_INSERT_TAIL(&beam->red, bred);
597 else if (APR_BUCKET_IS_FILE(bred)) {
598 /* file bucket lengths do not really count */
601 space_left = calc_space_left(beam);
602 if (space_left > 0 && bred->length == ((apr_size_t)-1)) {
604 status = apr_bucket_read(bred, &data, &len, APR_BLOCK_READ);
605 if (status != APR_SUCCESS) {
610 if (space_left < bred->length) {
611 status = r_wait_space(beam, block, pbl, &space_left);
612 if (status != APR_SUCCESS) {
615 if (space_left <= 0) {
619 /* space available, maybe need bucket split */
623 /* The fundamental problem is that reading a red bucket from
624 * a green thread is a total NO GO, because the bucket might use
625 * its pool/bucket_alloc from a foreign thread and that will
627 status = APR_ENOTIMPL;
628 if (beam->closed && bred->length > 0) {
631 else if (APR_BUCKET_IS_TRANSIENT(bred)) {
632 /* this takes care of transient buckets and converts them
633 * into heap ones. Other bucket types might or might not be
634 * affected by this. */
635 status = apr_bucket_setaside(bred, pool);
637 else if (APR_BUCKET_IS_HEAP(bred)) {
638 /* For heap buckets read from a green thread is fine. The
639 * data will be there and live until the bucket itself is
641 status = APR_SUCCESS;
643 else if (APR_BUCKET_IS_POOL(bred)) {
644 /* pool buckets are bastards that register at pool cleanup
645 * to morph themselves into heap buckets. That may happen anytime,
646 * even after the bucket data pointer has been read. So at
647 * any time inside the green thread, the pool bucket memory
648 * may disappear. yikes. */
649 status = apr_bucket_read(bred, &data, &len, APR_BLOCK_READ);
650 if (status == APR_SUCCESS) {
651 apr_bucket_heap_make(bred, data, len, NULL);
654 else if (APR_BUCKET_IS_FILE(bred)) {
655 /* For file buckets the problem is their internal readpool that
656 * is used on the first read to allocate buffer/mmap.
657 * Since setting aside a file bucket will de-register the
658 * file cleanup function from the previous pool, we need to
659 * call that from a red thread.
660 * Additionally, we allow callbacks to prevent beaming file
661 * handles across. The use case for this is to limit the number
662 * of open file handles and rather use a less efficient beam
664 apr_file_t *fd = ((apr_bucket_file *)bred->data)->fd;
666 if (beam->last_beamed != fd && beam->can_beam_fn) {
667 can_beam = beam->can_beam_fn(beam->can_beam_ctx, beam, fd);
670 beam->last_beamed = fd;
671 status = apr_bucket_setaside(bred, pool);
673 /* else: enter ENOTIMPL case below */
676 if (status == APR_ENOTIMPL) {
677 /* we have no knowledge about the internals of this bucket,
678 * but hope that after read, its data stays immutable for the
679 * lifetime of the bucket. (see pool bucket handling above for
680 * a counter example).
681 * We do the read while in a red thread, so that the bucket may
682 * use pools/allocators safely. */
683 if (space_left < APR_BUCKET_BUFF_SIZE) {
684 space_left = APR_BUCKET_BUFF_SIZE;
686 if (space_left < bred->length) {
687 apr_bucket_split(bred, space_left);
689 status = apr_bucket_read(bred, &data, &len, APR_BLOCK_READ);
690 if (status == APR_SUCCESS) {
691 status = apr_bucket_setaside(bred, pool);
695 if (status != APR_SUCCESS && status != APR_ENOTIMPL) {
699 APR_BUCKET_REMOVE(bred);
700 H2_BLIST_INSERT_TAIL(&beam->red, bred);
701 beam->sent_bytes += bred->length;
706 apr_status_t h2_beam_send(h2_bucket_beam *beam,
707 apr_bucket_brigade *red_brigade,
708 apr_read_type_e block)
711 apr_status_t status = APR_SUCCESS;
714 /* Called from the red thread to add buckets to the beam */
715 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
719 status = APR_ECONNABORTED;
721 else if (red_brigade) {
722 int force_report = !APR_BRIGADE_EMPTY(red_brigade);
723 while (!APR_BRIGADE_EMPTY(red_brigade)
724 && status == APR_SUCCESS) {
725 bred = APR_BRIGADE_FIRST(red_brigade);
726 status = append_bucket(beam, bred, block, beam->red_pool, &bl);
728 report_production(beam, force_report);
730 apr_thread_cond_broadcast(beam->m_cond);
733 report_consumption(beam, 0);
734 leave_yellow(beam, &bl);
739 apr_status_t h2_beam_receive(h2_bucket_beam *beam,
740 apr_bucket_brigade *bb,
741 apr_read_type_e block,
745 apr_bucket *bred, *bgreen, *ng;
747 apr_status_t status = APR_SUCCESS;
748 apr_off_t remain = readbytes;
750 /* Called from the green thread to take buckets from the beam */
751 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
754 if (beam->green && !APR_BRIGADE_EMPTY(beam->green)) {
755 apr_brigade_cleanup(beam->green);
757 status = APR_ECONNABORTED;
761 /* transfer enough buckets from our green brigade, if we have one */
763 && !APR_BRIGADE_EMPTY(beam->green)
764 && (readbytes <= 0 || remain >= 0)) {
765 bgreen = APR_BRIGADE_FIRST(beam->green);
766 if (readbytes > 0 && bgreen->length > 0 && remain <= 0) {
769 APR_BUCKET_REMOVE(bgreen);
770 APR_BRIGADE_INSERT_TAIL(bb, bgreen);
771 remain -= bgreen->length;
775 /* transfer from our red brigade, transforming red buckets to
776 * green ones until we have enough */
777 while (!H2_BLIST_EMPTY(&beam->red) && (readbytes <= 0 || remain >= 0)) {
778 bred = H2_BLIST_FIRST(&beam->red);
781 if (readbytes > 0 && bred->length > 0 && remain <= 0) {
785 if (APR_BUCKET_IS_METADATA(bred)) {
786 if (APR_BUCKET_IS_EOS(bred)) {
787 bgreen = apr_bucket_eos_create(bb->bucket_alloc);
788 beam->close_sent = 1;
790 else if (APR_BUCKET_IS_FLUSH(bred)) {
791 bgreen = apr_bucket_flush_create(bb->bucket_alloc);
794 /* put red into hold, no green sent out */
797 else if (APR_BUCKET_IS_FILE(bred)) {
798 /* This is set aside into the target brigade pool so that
799 * any read operation messes with that pool and not
801 apr_bucket_file *f = (apr_bucket_file *)bred->data;
802 apr_file_t *fd = f->fd;
803 int setaside = (f->readpool != bb->p);
806 status = apr_file_setaside(&fd, fd, bb->p);
807 if (status != APR_SUCCESS) {
810 ++beam->files_beamed;
812 ng = apr_brigade_insert_file(bb, fd, bred->start, bred->length,
815 /* disable mmap handling as this leads to segfaults when
816 * the underlying file is changed while memory pointer has
817 * been handed out. See also PR 59348 */
818 apr_bucket_file_enable_mmap(ng, 0);
820 remain -= bred->length;
822 APR_BUCKET_REMOVE(bred);
823 H2_BLIST_INSERT_TAIL(&beam->hold, bred);
828 /* create a "green" standin bucket. we took care about the
829 * underlying red bucket and its data when we placed it into
831 * the beam bucket will notify us on destruction that bred is
832 * no longer needed. */
833 bgreen = h2_beam_bucket_create(beam, bred, bb->bucket_alloc,
834 beam->buckets_sent++);
837 /* Place the red bucket into our hold, to be destroyed when no
838 * green bucket references it any more. */
839 APR_BUCKET_REMOVE(bred);
840 H2_BLIST_INSERT_TAIL(&beam->hold, bred);
841 beam->received_bytes += bred->length;
843 APR_BRIGADE_INSERT_TAIL(bb, bgreen);
844 remain -= bgreen->length;
849 if (readbytes > 0 && remain < 0) {
850 /* too much, put some back */
852 for (bgreen = APR_BRIGADE_FIRST(bb);
853 bgreen != APR_BRIGADE_SENTINEL(bb);
854 bgreen = APR_BUCKET_NEXT(bgreen)) {
855 remain -= bgreen->length;
857 apr_bucket_split(bgreen, bgreen->length+remain);
858 beam->green = apr_brigade_split_ex(bb,
859 APR_BUCKET_NEXT(bgreen),
867 && (!beam->green || APR_BRIGADE_EMPTY(beam->green))
868 && H2_BLIST_EMPTY(&beam->red)) {
869 /* beam is closed and we have nothing more to receive */
870 if (!beam->close_sent) {
871 apr_bucket *b = apr_bucket_eos_create(bb->bucket_alloc);
872 APR_BRIGADE_INSERT_TAIL(bb, b);
873 beam->close_sent = 1;
875 status = APR_SUCCESS;
880 status = APR_SUCCESS;
882 else if (beam->closed) {
885 else if (block == APR_BLOCK_READ && bl.mutex && beam->m_cond) {
886 status = wait_cond(beam, bl.mutex);
887 if (status != APR_SUCCESS) {
896 leave_yellow(beam, &bl);
901 void h2_beam_on_consumed(h2_bucket_beam *beam,
902 h2_beam_io_callback *cb, void *ctx)
906 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
907 beam->consumed_fn = cb;
908 beam->consumed_ctx = ctx;
909 leave_yellow(beam, &bl);
913 void h2_beam_on_produced(h2_bucket_beam *beam,
914 h2_beam_io_callback *cb, void *ctx)
918 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
919 beam->produced_fn = cb;
920 beam->produced_ctx = ctx;
921 leave_yellow(beam, &bl);
925 void h2_beam_on_file_beam(h2_bucket_beam *beam,
926 h2_beam_can_beam_callback *cb, void *ctx)
930 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
931 beam->can_beam_fn = cb;
932 beam->can_beam_ctx = ctx;
933 leave_yellow(beam, &bl);
938 apr_off_t h2_beam_get_buffered(h2_bucket_beam *beam)
944 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
945 for (b = H2_BLIST_FIRST(&beam->red);
946 b != H2_BLIST_SENTINEL(&beam->red);
947 b = APR_BUCKET_NEXT(b)) {
948 /* should all have determinate length */
951 leave_yellow(beam, &bl);
956 apr_off_t h2_beam_get_mem_used(h2_bucket_beam *beam)
962 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
963 for (b = H2_BLIST_FIRST(&beam->red);
964 b != H2_BLIST_SENTINEL(&beam->red);
965 b = APR_BUCKET_NEXT(b)) {
966 if (APR_BUCKET_IS_FILE(b)) {
970 /* should all have determinate length */
974 leave_yellow(beam, &bl);
979 int h2_beam_empty(h2_bucket_beam *beam)
984 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
985 empty = (H2_BLIST_EMPTY(&beam->red)
986 && (!beam->green || APR_BRIGADE_EMPTY(beam->green)));
987 leave_yellow(beam, &bl);
992 int h2_beam_holds_proxies(h2_bucket_beam *beam)
997 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
998 has_proxies = !H2_BPROXY_LIST_EMPTY(&beam->proxies);
999 leave_yellow(beam, &bl);
1004 int h2_beam_closed(h2_bucket_beam *beam)
1006 return beam->closed;
1009 int h2_beam_was_received(h2_bucket_beam *beam)
1014 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
1015 happend = (beam->received_bytes > 0);
1016 leave_yellow(beam, &bl);
1021 apr_size_t h2_beam_get_files_beamed(h2_bucket_beam *beam)
1026 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
1027 n = beam->files_beamed;
1028 leave_yellow(beam, &bl);
1033 int h2_beam_no_files(void *ctx, h2_bucket_beam *beam, apr_file_t *file)