1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2 * contributor license agreements. See the NOTICE file distributed with
3 * this work for additional information regarding copyright ownership.
4 * The ASF licenses this file to You under the Apache License, Version 2.0
5 * (the "License"); you may not use this file except in compliance with
6 * the License. You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 /* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
19 * Licensed under the Apache License, Version 2.0 (the "License");
20 * you may not use this file except in compliance with the License.
21 * You may obtain a copy of the License at
23 * http://www.apache.org/licenses/LICENSE-2.0
25 * Unless required by applicable law or agreed to in writing, software
26 * distributed under the License is distributed on an "AS IS" BASIS,
27 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
28 * See the License for the specific language governing permissions and
29 * limitations under the License.
33 #include <apr_atomic.h>
34 #include <apr_strings.h>
36 #include <apr_buckets.h>
37 #include <apr_thread_mutex.h>
38 #include <apr_thread_cond.h>
41 #include <http_protocol.h>
44 #include "h2_private.h"
46 #include "h2_bucket_beam.h"
48 static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy);
50 #define H2_BPROXY_NEXT(e) APR_RING_NEXT((e), link)
51 #define H2_BPROXY_PREV(e) APR_RING_PREV((e), link)
52 #define H2_BPROXY_REMOVE(e) APR_RING_REMOVE((e), link)
54 #define H2_BPROXY_LIST_INIT(b) APR_RING_INIT(&(b)->list, h2_beam_proxy, link);
55 #define H2_BPROXY_LIST_SENTINEL(b) APR_RING_SENTINEL(&(b)->list, h2_beam_proxy, link)
56 #define H2_BPROXY_LIST_EMPTY(b) APR_RING_EMPTY(&(b)->list, h2_beam_proxy, link)
57 #define H2_BPROXY_LIST_FIRST(b) APR_RING_FIRST(&(b)->list)
58 #define H2_BPROXY_LIST_LAST(b) APR_RING_LAST(&(b)->list)
59 #define H2_PROXY_BLIST_INSERT_HEAD(b, e) do { \
60 h2_beam_proxy *ap__b = (e); \
61 APR_RING_INSERT_HEAD(&(b)->list, ap__b, h2_beam_proxy, link); \
63 #define H2_BPROXY_LIST_INSERT_TAIL(b, e) do { \
64 h2_beam_proxy *ap__b = (e); \
65 APR_RING_INSERT_TAIL(&(b)->list, ap__b, h2_beam_proxy, link); \
67 #define H2_BPROXY_LIST_CONCAT(a, b) do { \
68 APR_RING_CONCAT(&(a)->list, &(b)->list, h2_beam_proxy, link); \
70 #define H2_BPROXY_LIST_PREPEND(a, b) do { \
71 APR_RING_PREPEND(&(a)->list, &(b)->list, h2_beam_proxy, link); \
75 /*******************************************************************************
76 * beam bucket with reference to beam and bucket it represents
77 ******************************************************************************/
79 const apr_bucket_type_t h2_bucket_type_beam;
81 #define H2_BUCKET_IS_BEAM(e) (e->type == &h2_bucket_type_beam)
83 struct h2_beam_proxy {
84 apr_bucket_refcount refcount;
85 APR_RING_ENTRY(h2_beam_proxy) link;
91 static const char Dummy = '\0';
93 static apr_status_t beam_bucket_read(apr_bucket *b, const char **str,
94 apr_size_t *len, apr_read_type_e block)
96 h2_beam_proxy *d = b->data;
99 apr_status_t status = apr_bucket_read(d->bsender, &data, len, block);
100 if (status == APR_SUCCESS) {
101 *str = data + b->start;
108 return APR_ECONNRESET;
111 static void beam_bucket_destroy(void *data)
113 h2_beam_proxy *d = data;
115 if (apr_bucket_shared_destroy(d)) {
116 /* When the beam gets destroyed before this bucket, it will
117 * NULLify its reference here. This is not protected by a mutex,
118 * so it will not help with race conditions.
119 * But it lets us shut down memory pool with circulare beam
122 h2_beam_emitted(d->beam, d);
128 static apr_bucket * h2_beam_bucket_make(apr_bucket *b,
129 h2_bucket_beam *beam,
130 apr_bucket *bsender, apr_size_t n)
134 d = apr_bucket_alloc(sizeof(*d), b->list);
135 H2_BPROXY_LIST_INSERT_TAIL(&beam->proxies, d);
137 d->bsender = bsender;
140 b = apr_bucket_shared_make(b, d, 0, bsender? bsender->length : 0);
141 b->type = &h2_bucket_type_beam;
146 static apr_bucket *h2_beam_bucket_create(h2_bucket_beam *beam,
148 apr_bucket_alloc_t *list,
151 apr_bucket *b = apr_bucket_alloc(sizeof(*b), list);
154 b->free = apr_bucket_free;
156 return h2_beam_bucket_make(b, beam, bsender, n);
159 const apr_bucket_type_t h2_bucket_type_beam = {
160 "BEAM", 5, APR_BUCKET_DATA,
163 apr_bucket_setaside_noop,
164 apr_bucket_shared_split,
165 apr_bucket_shared_copy
168 /*******************************************************************************
169 * h2_blist, a brigade without allocations
170 ******************************************************************************/
172 static apr_array_header_t *beamers;
174 static apr_status_t cleanup_beamers(void *dummy)
181 void h2_register_bucket_beamer(h2_bucket_beamer *beamer)
184 apr_pool_cleanup_register(apr_hook_global_pool, NULL,
185 cleanup_beamers, apr_pool_cleanup_null);
186 beamers = apr_array_make(apr_hook_global_pool, 10,
187 sizeof(h2_bucket_beamer*));
189 APR_ARRAY_PUSH(beamers, h2_bucket_beamer*) = beamer;
192 static apr_bucket *h2_beam_bucket(h2_bucket_beam *beam,
193 apr_bucket_brigade *dest,
194 const apr_bucket *src)
196 apr_bucket *b = NULL;
199 for (i = 0; i < beamers->nelts && b == NULL; ++i) {
200 h2_bucket_beamer *beamer;
202 beamer = APR_ARRAY_IDX(beamers, i, h2_bucket_beamer*);
203 b = beamer(beam, dest, src);
210 /*******************************************************************************
211 * bucket beam that can transport buckets across threads
212 ******************************************************************************/
214 static void mutex_leave(void *ctx, apr_thread_mutex_t *lock)
216 apr_thread_mutex_unlock(lock);
219 static apr_status_t mutex_enter(void *ctx, h2_beam_lock *pbl)
221 h2_bucket_beam *beam = ctx;
222 pbl->mutex = beam->lock;
223 pbl->leave = mutex_leave;
224 return apr_thread_mutex_lock(pbl->mutex);
227 static apr_status_t enter_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl)
229 return mutex_enter(beam, pbl);
232 static void leave_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl)
235 pbl->leave(pbl->leave_ctx, pbl->mutex);
239 static apr_off_t bucket_mem_used(apr_bucket *b)
241 if (APR_BUCKET_IS_FILE(b)) {
245 /* should all have determinate length */
250 static int report_consumption(h2_bucket_beam *beam, h2_beam_lock *pbl)
253 apr_off_t len = beam->received_bytes - beam->cons_bytes_reported;
254 h2_beam_io_callback *cb = beam->cons_io_cb;
258 void *ctx = beam->cons_ctx;
260 if (pbl) leave_yellow(beam, pbl);
262 if (pbl) enter_yellow(beam, pbl);
265 beam->cons_bytes_reported += len;
270 static void report_prod_io(h2_bucket_beam *beam, int force, h2_beam_lock *pbl)
272 apr_off_t len = beam->sent_bytes - beam->prod_bytes_reported;
273 if (force || len > 0) {
274 h2_beam_io_callback *cb = beam->prod_io_cb;
276 void *ctx = beam->prod_ctx;
278 leave_yellow(beam, pbl);
280 enter_yellow(beam, pbl);
282 beam->prod_bytes_reported += len;
286 static apr_size_t calc_buffered(h2_bucket_beam *beam)
290 for (b = H2_BLIST_FIRST(&beam->send_list);
291 b != H2_BLIST_SENTINEL(&beam->send_list);
292 b = APR_BUCKET_NEXT(b)) {
293 if (b->length == ((apr_size_t)-1)) {
296 else if (APR_BUCKET_IS_FILE(b)) {
297 /* if unread, has no real mem footprint. */
306 static void r_purge_sent(h2_bucket_beam *beam)
309 /* delete all sender buckets in purge brigade, needs to be called
310 * from sender thread only */
311 while (!H2_BLIST_EMPTY(&beam->purge_list)) {
312 b = H2_BLIST_FIRST(&beam->purge_list);
313 apr_bucket_delete(b);
317 static apr_size_t calc_space_left(h2_bucket_beam *beam)
319 if (beam->max_buf_size > 0) {
320 apr_off_t len = calc_buffered(beam);
321 return (beam->max_buf_size > len? (beam->max_buf_size - len) : 0);
326 static int buffer_is_empty(h2_bucket_beam *beam)
328 return ((!beam->recv_buffer || APR_BRIGADE_EMPTY(beam->recv_buffer))
329 && H2_BLIST_EMPTY(&beam->send_list));
332 static apr_status_t wait_empty(h2_bucket_beam *beam, apr_read_type_e block,
333 apr_thread_mutex_t *lock)
335 apr_status_t rv = APR_SUCCESS;
337 while (!buffer_is_empty(beam) && APR_SUCCESS == rv) {
338 if (APR_BLOCK_READ != block || !lock) {
341 else if (beam->timeout > 0) {
342 rv = apr_thread_cond_timedwait(beam->change, lock, beam->timeout);
345 rv = apr_thread_cond_wait(beam->change, lock);
351 static apr_status_t wait_not_empty(h2_bucket_beam *beam, apr_read_type_e block,
352 apr_thread_mutex_t *lock)
354 apr_status_t rv = APR_SUCCESS;
356 while (buffer_is_empty(beam) && APR_SUCCESS == rv) {
358 rv = APR_ECONNABORTED;
360 else if (beam->closed) {
363 else if (APR_BLOCK_READ != block || !lock) {
366 else if (beam->timeout > 0) {
367 rv = apr_thread_cond_timedwait(beam->change, lock, beam->timeout);
370 rv = apr_thread_cond_wait(beam->change, lock);
376 static apr_status_t wait_not_full(h2_bucket_beam *beam, apr_read_type_e block,
377 apr_size_t *pspace_left, h2_beam_lock *bl)
379 apr_status_t rv = APR_SUCCESS;
382 while (0 == (left = calc_space_left(beam)) && APR_SUCCESS == rv) {
384 rv = APR_ECONNABORTED;
386 else if (block != APR_BLOCK_READ || !bl->mutex) {
390 if (beam->timeout > 0) {
391 rv = apr_thread_cond_timedwait(beam->change, bl->mutex, beam->timeout);
394 rv = apr_thread_cond_wait(beam->change, bl->mutex);
402 static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy)
405 apr_bucket *b, *next;
407 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
408 /* even when beam buckets are split, only the one where
409 * refcount drops to 0 will call us */
410 H2_BPROXY_REMOVE(proxy);
411 /* invoked from receiver thread, the last beam bucket for the send
412 * bucket is about to be destroyed.
413 * remove it from the hold, where it should be now */
414 if (proxy->bsender) {
415 for (b = H2_BLIST_FIRST(&beam->hold_list);
416 b != H2_BLIST_SENTINEL(&beam->hold_list);
417 b = APR_BUCKET_NEXT(b)) {
418 if (b == proxy->bsender) {
422 if (b != H2_BLIST_SENTINEL(&beam->hold_list)) {
423 /* bucket is in hold as it should be, mark this one
424 * and all before it for purging. We might have placed meta
425 * buckets without a receiver proxy into the hold before it
426 * and schedule them for purging now */
427 for (b = H2_BLIST_FIRST(&beam->hold_list);
428 b != H2_BLIST_SENTINEL(&beam->hold_list);
430 next = APR_BUCKET_NEXT(b);
431 if (b == proxy->bsender) {
432 APR_BUCKET_REMOVE(b);
433 H2_BLIST_INSERT_TAIL(&beam->purge_list, b);
436 else if (APR_BUCKET_IS_METADATA(b)) {
437 APR_BUCKET_REMOVE(b);
438 H2_BLIST_INSERT_TAIL(&beam->purge_list, b);
441 /* another data bucket before this one in hold. this
442 * is normal since DATA buckets need not be destroyed
447 proxy->bsender = NULL;
450 /* it should be there unless we screwed up */
451 ap_log_perror(APLOG_MARK, APLOG_WARNING, 0, beam->send_pool,
452 APLOGNO(03384) "h2_beam(%d-%s): emitted bucket not "
453 "in hold, n=%d", beam->id, beam->tag,
455 ap_assert(!proxy->bsender);
458 /* notify anyone waiting on space to become available */
463 apr_thread_cond_broadcast(beam->change);
465 leave_yellow(beam, &bl);
469 static void h2_blist_cleanup(h2_blist *bl)
473 while (!H2_BLIST_EMPTY(bl)) {
474 e = H2_BLIST_FIRST(bl);
475 apr_bucket_delete(e);
479 static apr_status_t beam_close(h2_bucket_beam *beam)
483 apr_thread_cond_broadcast(beam->change);
488 int h2_beam_is_closed(h2_bucket_beam *beam)
493 static int pool_register(h2_bucket_beam *beam, apr_pool_t *pool,
494 apr_status_t (*cleanup)(void *))
496 if (pool && pool != beam->pool) {
497 apr_pool_pre_cleanup_register(pool, beam, cleanup);
503 static int pool_kill(h2_bucket_beam *beam, apr_pool_t *pool,
504 apr_status_t (*cleanup)(void *)) {
505 if (pool && pool != beam->pool) {
506 apr_pool_cleanup_kill(pool, beam, cleanup);
512 static apr_status_t beam_recv_cleanup(void *data)
514 h2_bucket_beam *beam = data;
515 /* receiver pool has gone away, clear references */
516 beam->recv_buffer = NULL;
517 beam->recv_pool = NULL;
521 static apr_status_t beam_send_cleanup(void *data)
523 h2_bucket_beam *beam = data;
524 /* sender is going away, clear up all references to its memory */
526 h2_blist_cleanup(&beam->send_list);
527 report_consumption(beam, NULL);
528 while (!H2_BPROXY_LIST_EMPTY(&beam->proxies)) {
529 h2_beam_proxy *proxy = H2_BPROXY_LIST_FIRST(&beam->proxies);
530 H2_BPROXY_REMOVE(proxy);
532 proxy->bsender = NULL;
534 h2_blist_cleanup(&beam->purge_list);
535 h2_blist_cleanup(&beam->hold_list);
536 beam->send_pool = NULL;
540 static void beam_set_send_pool(h2_bucket_beam *beam, apr_pool_t *pool)
542 if (beam->send_pool != pool) {
543 if (beam->send_pool && beam->send_pool != beam->pool) {
544 pool_kill(beam, beam->send_pool, beam_send_cleanup);
545 beam_send_cleanup(beam);
547 beam->send_pool = pool;
548 pool_register(beam, beam->send_pool, beam_send_cleanup);
552 static void recv_buffer_cleanup(h2_bucket_beam *beam, h2_beam_lock *bl)
554 if (beam->recv_buffer && !APR_BRIGADE_EMPTY(beam->recv_buffer)) {
555 apr_bucket_brigade *bb = beam->recv_buffer;
558 beam->recv_buffer = NULL;
559 apr_brigade_length(bb, 0, &bblen);
560 beam->received_bytes += bblen;
562 /* need to do this unlocked since bucket destroy might
563 * call this beam again. */
564 if (bl) leave_yellow(beam, bl);
565 apr_brigade_destroy(bb);
566 if (bl) enter_yellow(beam, bl);
568 if (beam->cons_ev_cb) {
569 beam->cons_ev_cb(beam->cons_ctx, beam);
574 static apr_status_t beam_cleanup(void *data)
576 h2_bucket_beam *beam = data;
577 apr_status_t status = APR_SUCCESS;
578 int safe_send = (beam->owner == H2_BEAM_OWNER_SEND);
579 int safe_recv = (beam->owner == H2_BEAM_OWNER_RECV);
582 * Owner of the beam is going away, depending on which side it owns,
583 * cleanup strategies will differ.
585 * In general, receiver holds references to memory from sender.
586 * Clean up receiver first, if safe, then cleanup sender, if safe.
589 /* When modify send is not safe, this means we still have multi-thread
590 * protection and the owner is receiving the buckets. If the sending
591 * side has not gone away, this means we could have dangling buckets
592 * in our lists that never get destroyed. This should not happen. */
593 ap_assert(safe_send || !beam->send_pool);
594 if (!H2_BLIST_EMPTY(&beam->send_list)) {
595 ap_assert(beam->send_pool);
599 if (beam->recv_pool) {
600 pool_kill(beam, beam->recv_pool, beam_recv_cleanup);
601 beam->recv_pool = NULL;
603 recv_buffer_cleanup(beam, NULL);
606 beam->recv_buffer = NULL;
607 beam->recv_pool = NULL;
610 if (safe_send && beam->send_pool) {
611 pool_kill(beam, beam->send_pool, beam_send_cleanup);
612 status = beam_send_cleanup(beam);
616 ap_assert(H2_BPROXY_LIST_EMPTY(&beam->proxies));
617 ap_assert(H2_BLIST_EMPTY(&beam->send_list));
618 ap_assert(H2_BLIST_EMPTY(&beam->hold_list));
619 ap_assert(H2_BLIST_EMPTY(&beam->purge_list));
624 apr_status_t h2_beam_destroy(h2_bucket_beam *beam)
626 apr_pool_cleanup_kill(beam->pool, beam, beam_cleanup);
627 return beam_cleanup(beam);
630 apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *pool,
631 int id, const char *tag,
632 h2_beam_owner_t owner,
633 apr_size_t max_buf_size,
634 apr_interval_time_t timeout)
636 h2_bucket_beam *beam;
637 apr_status_t rv = APR_SUCCESS;
639 beam = apr_pcalloc(pool, sizeof(*beam));
648 H2_BLIST_INIT(&beam->send_list);
649 H2_BLIST_INIT(&beam->hold_list);
650 H2_BLIST_INIT(&beam->purge_list);
651 H2_BPROXY_LIST_INIT(&beam->proxies);
652 beam->tx_mem_limits = 1;
653 beam->max_buf_size = max_buf_size;
654 beam->timeout = timeout;
656 rv = apr_thread_mutex_create(&beam->lock, APR_THREAD_MUTEX_DEFAULT, pool);
657 if (APR_SUCCESS == rv) {
658 rv = apr_thread_cond_create(&beam->change, pool);
659 if (APR_SUCCESS == rv) {
660 apr_pool_pre_cleanup_register(pool, beam, beam_cleanup);
667 void h2_beam_buffer_size_set(h2_bucket_beam *beam, apr_size_t buffer_size)
671 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
672 beam->max_buf_size = buffer_size;
673 leave_yellow(beam, &bl);
677 apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam)
680 apr_size_t buffer_size = 0;
682 if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
683 buffer_size = beam->max_buf_size;
684 leave_yellow(beam, &bl);
689 void h2_beam_timeout_set(h2_bucket_beam *beam, apr_interval_time_t timeout)
693 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
694 beam->timeout = timeout;
695 leave_yellow(beam, &bl);
699 apr_interval_time_t h2_beam_timeout_get(h2_bucket_beam *beam)
702 apr_interval_time_t timeout = 0;
704 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
705 timeout = beam->timeout;
706 leave_yellow(beam, &bl);
711 void h2_beam_abort(h2_bucket_beam *beam)
715 if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
716 if (!beam->aborted) {
719 h2_blist_cleanup(&beam->send_list);
720 report_consumption(beam, &bl);
722 apr_thread_cond_broadcast(beam->change);
723 leave_yellow(beam, &bl);
727 apr_status_t h2_beam_close(h2_bucket_beam *beam)
731 if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
734 report_consumption(beam, &bl);
735 leave_yellow(beam, &bl);
737 return beam->aborted? APR_ECONNABORTED : APR_SUCCESS;
740 apr_status_t h2_beam_leave(h2_bucket_beam *beam)
744 if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
745 recv_buffer_cleanup(beam, &bl);
748 leave_yellow(beam, &bl);
753 apr_status_t h2_beam_wait_empty(h2_bucket_beam *beam, apr_read_type_e block)
758 if ((status = enter_yellow(beam, &bl)) == APR_SUCCESS) {
759 status = wait_empty(beam, block, bl.mutex);
760 leave_yellow(beam, &bl);
765 static void move_to_hold(h2_bucket_beam *beam,
766 apr_bucket_brigade *sender_bb)
769 while (sender_bb && !APR_BRIGADE_EMPTY(sender_bb)) {
770 b = APR_BRIGADE_FIRST(sender_bb);
771 APR_BUCKET_REMOVE(b);
772 H2_BLIST_INSERT_TAIL(&beam->send_list, b);
776 static apr_status_t append_bucket(h2_bucket_beam *beam,
778 apr_read_type_e block,
779 apr_size_t *pspace_left,
785 int can_beam, check_len;
788 return APR_ECONNABORTED;
791 if (APR_BUCKET_IS_METADATA(b)) {
792 if (APR_BUCKET_IS_EOS(b)) {
795 APR_BUCKET_REMOVE(b);
796 H2_BLIST_INSERT_TAIL(&beam->send_list, b);
799 else if (APR_BUCKET_IS_FILE(b)) {
800 /* For file buckets the problem is their internal readpool that
801 * is used on the first read to allocate buffer/mmap.
802 * Since setting aside a file bucket will de-register the
803 * file cleanup function from the previous pool, we need to
804 * call that only from the sender thread.
806 * Currently, we do not handle file bucket with refcount > 1 as
807 * the beam is then not in complete control of the file's lifetime.
808 * Which results in the bug that a file get closed by the receiver
809 * while the sender or the beam still have buckets using it.
811 * Additionally, we allow callbacks to prevent beaming file
812 * handles across. The use case for this is to limit the number
813 * of open file handles and rather use a less efficient beam
815 apr_bucket_file *bf = b->data;
816 apr_file_t *fd = bf->fd;
817 can_beam = (bf->refcount.refcount == 1);
818 if (can_beam && beam->can_beam_fn) {
819 can_beam = beam->can_beam_fn(beam->can_beam_ctx, beam, fd);
821 check_len = !can_beam;
824 if (b->length == ((apr_size_t)-1)) {
826 status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
827 if (status != APR_SUCCESS) {
835 if (b->length > *pspace_left) {
836 apr_bucket_split(b, *pspace_left);
838 *pspace_left -= b->length;
841 /* The fundamental problem is that reading a sender bucket from
842 * a receiver thread is a total NO GO, because the bucket might use
843 * its pool/bucket_alloc from a foreign thread and that will
845 status = APR_ENOTIMPL;
846 if (APR_BUCKET_IS_TRANSIENT(b)) {
847 /* this takes care of transient buckets and converts them
848 * into heap ones. Other bucket types might or might not be
849 * affected by this. */
850 status = apr_bucket_setaside(b, beam->send_pool);
852 else if (APR_BUCKET_IS_HEAP(b)) {
853 /* For heap buckets read from a receiver thread is fine. The
854 * data will be there and live until the bucket itself is
856 status = APR_SUCCESS;
858 else if (APR_BUCKET_IS_POOL(b)) {
859 /* pool buckets are bastards that register at pool cleanup
860 * to morph themselves into heap buckets. That may happen anytime,
861 * even after the bucket data pointer has been read. So at
862 * any time inside the receiver thread, the pool bucket memory
863 * may disappear. yikes. */
864 status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
865 if (status == APR_SUCCESS) {
866 apr_bucket_heap_make(b, data, len, NULL);
869 else if (APR_BUCKET_IS_FILE(b) && can_beam) {
870 status = apr_bucket_setaside(b, beam->send_pool);
873 if (status == APR_ENOTIMPL) {
874 /* we have no knowledge about the internals of this bucket,
875 * but hope that after read, its data stays immutable for the
876 * lifetime of the bucket. (see pool bucket handling above for
877 * a counter example).
878 * We do the read while in the sender thread, so that the bucket may
879 * use pools/allocators safely. */
880 status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
881 if (status == APR_SUCCESS) {
882 status = apr_bucket_setaside(b, beam->send_pool);
886 if (status != APR_SUCCESS && status != APR_ENOTIMPL) {
890 APR_BUCKET_REMOVE(b);
891 H2_BLIST_INSERT_TAIL(&beam->send_list, b);
892 beam->sent_bytes += b->length;
897 void h2_beam_send_from(h2_bucket_beam *beam, apr_pool_t *p)
900 /* Called from the sender thread to add buckets to the beam */
901 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
903 beam_set_send_pool(beam, p);
904 leave_yellow(beam, &bl);
908 apr_status_t h2_beam_send(h2_bucket_beam *beam,
909 apr_bucket_brigade *sender_bb,
910 apr_read_type_e block)
913 apr_status_t rv = APR_SUCCESS;
914 apr_size_t space_left = 0;
917 /* Called from the sender thread to add buckets to the beam */
918 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
919 ap_assert(beam->send_pool);
923 move_to_hold(beam, sender_bb);
924 rv = APR_ECONNABORTED;
926 else if (sender_bb) {
927 int force_report = !APR_BRIGADE_EMPTY(sender_bb);
929 space_left = calc_space_left(beam);
930 while (!APR_BRIGADE_EMPTY(sender_bb) && APR_SUCCESS == rv) {
931 if (space_left <= 0) {
932 report_prod_io(beam, force_report, &bl);
933 rv = wait_not_full(beam, block, &space_left, &bl);
934 if (APR_SUCCESS != rv) {
938 b = APR_BRIGADE_FIRST(sender_bb);
939 rv = append_bucket(beam, b, block, &space_left, &bl);
942 report_prod_io(beam, force_report, &bl);
943 apr_thread_cond_broadcast(beam->change);
945 report_consumption(beam, &bl);
946 leave_yellow(beam, &bl);
951 apr_status_t h2_beam_receive(h2_bucket_beam *beam,
952 apr_bucket_brigade *bb,
953 apr_read_type_e block,
957 apr_bucket *bsender, *brecv, *ng;
959 apr_status_t status = APR_SUCCESS;
961 int transferred_buckets = 0;
963 /* Called from the receiver thread to take buckets from the beam */
964 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
965 if (readbytes <= 0) {
966 readbytes = APR_SIZE_MAX;
972 recv_buffer_cleanup(beam, &bl);
973 status = APR_ECONNABORTED;
977 /* transfer enough buckets from our receiver brigade, if we have one */
980 && !APR_BRIGADE_EMPTY(beam->recv_buffer)) {
982 brecv = APR_BRIGADE_FIRST(beam->recv_buffer);
983 if (brecv->length > 0 && remain <= 0) {
986 APR_BUCKET_REMOVE(brecv);
987 APR_BRIGADE_INSERT_TAIL(bb, brecv);
988 remain -= brecv->length;
992 /* transfer from our sender brigade, transforming sender buckets to
993 * receiver ones until we have enough */
994 while (remain >= 0 && !H2_BLIST_EMPTY(&beam->send_list)) {
997 bsender = H2_BLIST_FIRST(&beam->send_list);
998 if (bsender->length > 0 && remain <= 0) {
1002 if (APR_BUCKET_IS_METADATA(bsender)) {
1003 if (APR_BUCKET_IS_EOS(bsender)) {
1004 brecv = apr_bucket_eos_create(bb->bucket_alloc);
1005 beam->close_sent = 1;
1007 else if (APR_BUCKET_IS_FLUSH(bsender)) {
1008 brecv = apr_bucket_flush_create(bb->bucket_alloc);
1010 else if (AP_BUCKET_IS_ERROR(bsender)) {
1011 ap_bucket_error *eb = (ap_bucket_error *)bsender;
1012 brecv = ap_bucket_error_create(eb->status, eb->data,
1013 bb->p, bb->bucket_alloc);
1016 else if (bsender->length == 0) {
1017 APR_BUCKET_REMOVE(bsender);
1018 H2_BLIST_INSERT_TAIL(&beam->hold_list, bsender);
1021 else if (APR_BUCKET_IS_FILE(bsender)) {
1022 /* This is set aside into the target brigade pool so that
1023 * any read operation messes with that pool and not
1024 * the sender one. */
1025 apr_bucket_file *f = (apr_bucket_file *)bsender->data;
1026 apr_file_t *fd = f->fd;
1027 int setaside = (f->readpool != bb->p);
1030 status = apr_file_setaside(&fd, fd, bb->p);
1031 if (status != APR_SUCCESS) {
1034 ++beam->files_beamed;
1036 ng = apr_brigade_insert_file(bb, fd, bsender->start, bsender->length,
1039 /* disable mmap handling as this leads to segfaults when
1040 * the underlying file is changed while memory pointer has
1041 * been handed out. See also PR 59348 */
1042 apr_bucket_file_enable_mmap(ng, 0);
1044 APR_BUCKET_REMOVE(bsender);
1045 H2_BLIST_INSERT_TAIL(&beam->hold_list, bsender);
1047 remain -= bsender->length;
1049 ++transferred_buckets;
1053 /* create a "receiver" standin bucket. we took care about the
1054 * underlying sender bucket and its data when we placed it into
1055 * the sender brigade.
1056 * the beam bucket will notify us on destruction that bsender is
1057 * no longer needed. */
1058 brecv = h2_beam_bucket_create(beam, bsender, bb->bucket_alloc,
1059 beam->buckets_sent++);
1062 /* Place the sender bucket into our hold, to be destroyed when no
1063 * receiver bucket references it any more. */
1064 APR_BUCKET_REMOVE(bsender);
1065 H2_BLIST_INSERT_TAIL(&beam->hold_list, bsender);
1067 beam->received_bytes += bsender->length;
1068 ++transferred_buckets;
1071 APR_BRIGADE_INSERT_TAIL(bb, brecv);
1072 remain -= brecv->length;
1076 /* let outside hook determine how bucket is beamed */
1077 leave_yellow(beam, &bl);
1078 brecv = h2_beam_bucket(beam, bb, bsender);
1079 enter_yellow(beam, &bl);
1081 while (brecv && brecv != APR_BRIGADE_SENTINEL(bb)) {
1083 remain -= brecv->length;
1084 brecv = APR_BUCKET_NEXT(brecv);
1090 /* too much, put some back into out recv_buffer */
1092 for (brecv = APR_BRIGADE_FIRST(bb);
1093 brecv != APR_BRIGADE_SENTINEL(bb);
1094 brecv = APR_BUCKET_NEXT(brecv)) {
1095 remain -= (beam->tx_mem_limits? bucket_mem_used(brecv)
1098 apr_bucket_split(brecv, brecv->length+remain);
1099 beam->recv_buffer = apr_brigade_split_ex(bb,
1100 APR_BUCKET_NEXT(brecv),
1107 if (beam->closed && buffer_is_empty(beam)) {
1108 /* beam is closed and we have nothing more to receive */
1109 if (!beam->close_sent) {
1110 apr_bucket *b = apr_bucket_eos_create(bb->bucket_alloc);
1111 APR_BRIGADE_INSERT_TAIL(bb, b);
1112 beam->close_sent = 1;
1114 status = APR_SUCCESS;
1118 if (transferred_buckets > 0) {
1119 if (beam->cons_ev_cb) {
1120 beam->cons_ev_cb(beam->cons_ctx, beam);
1125 apr_thread_cond_broadcast(beam->change);
1126 status = APR_SUCCESS;
1129 status = wait_not_empty(beam, block, bl.mutex);
1130 if (status != APR_SUCCESS) {
1136 leave_yellow(beam, &bl);
1141 void h2_beam_on_consumed(h2_bucket_beam *beam,
1142 h2_beam_ev_callback *ev_cb,
1143 h2_beam_io_callback *io_cb, void *ctx)
1146 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
1147 beam->cons_ev_cb = ev_cb;
1148 beam->cons_io_cb = io_cb;
1149 beam->cons_ctx = ctx;
1150 leave_yellow(beam, &bl);
1154 void h2_beam_on_produced(h2_bucket_beam *beam,
1155 h2_beam_io_callback *io_cb, void *ctx)
1158 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
1159 beam->prod_io_cb = io_cb;
1160 beam->prod_ctx = ctx;
1161 leave_yellow(beam, &bl);
1165 void h2_beam_on_file_beam(h2_bucket_beam *beam,
1166 h2_beam_can_beam_callback *cb, void *ctx)
1170 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
1171 beam->can_beam_fn = cb;
1172 beam->can_beam_ctx = ctx;
1173 leave_yellow(beam, &bl);
1178 apr_off_t h2_beam_get_buffered(h2_bucket_beam *beam)
1184 if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
1185 for (b = H2_BLIST_FIRST(&beam->send_list);
1186 b != H2_BLIST_SENTINEL(&beam->send_list);
1187 b = APR_BUCKET_NEXT(b)) {
1188 /* should all have determinate length */
1191 leave_yellow(beam, &bl);
1196 apr_off_t h2_beam_get_mem_used(h2_bucket_beam *beam)
1202 if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
1203 for (b = H2_BLIST_FIRST(&beam->send_list);
1204 b != H2_BLIST_SENTINEL(&beam->send_list);
1205 b = APR_BUCKET_NEXT(b)) {
1206 l += bucket_mem_used(b);
1208 leave_yellow(beam, &bl);
1213 int h2_beam_empty(h2_bucket_beam *beam)
1218 if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
1219 empty = (H2_BLIST_EMPTY(&beam->send_list)
1220 && (!beam->recv_buffer || APR_BRIGADE_EMPTY(beam->recv_buffer)));
1221 leave_yellow(beam, &bl);
1226 int h2_beam_holds_proxies(h2_bucket_beam *beam)
1228 int has_proxies = 1;
1231 if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
1232 has_proxies = !H2_BPROXY_LIST_EMPTY(&beam->proxies);
1233 leave_yellow(beam, &bl);
1238 int h2_beam_was_received(h2_bucket_beam *beam)
1243 if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
1244 happend = (beam->received_bytes > 0);
1245 leave_yellow(beam, &bl);
1250 apr_size_t h2_beam_get_files_beamed(h2_bucket_beam *beam)
1255 if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
1256 n = beam->files_beamed;
1257 leave_yellow(beam, &bl);
1262 int h2_beam_no_files(void *ctx, h2_bucket_beam *beam, apr_file_t *file)
1267 int h2_beam_report_consumption(h2_bucket_beam *beam)
1271 if (enter_yellow(beam, &bl) == APR_SUCCESS) {
1272 rv = report_consumption(beam, &bl);
1273 leave_yellow(beam, &bl);
1278 void h2_beam_log(h2_bucket_beam *beam, conn_rec *c, int level, const char *msg)
1280 if (beam && APLOG_C_IS_LEVEL(c,level)) {
1281 ap_log_cerror(APLOG_MARK, level, 0, c,
1282 "beam(%ld-%d,%s,closed=%d,aborted=%d,empty=%d,buf=%ld): %s",
1283 (c->master? c->master->id : c->id), beam->id, beam->tag,
1284 beam->closed, beam->aborted, h2_beam_empty(beam),
1285 (long)h2_beam_get_buffered(beam), msg);