]> granicus.if.org Git - apache/blob - modules/http2/h2_bucket_beam.c
33c791579e2217b6df831fbdf6cc35a3c524eb8b
[apache] / modules / http2 / h2_bucket_beam.c
1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2  * contributor license agreements.  See the NOTICE file distributed with
3  * this work for additional information regarding copyright ownership.
4  * The ASF licenses this file to You under the Apache License, Version 2.0
5  * (the "License"); you may not use this file except in compliance with
6  * the License.  You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16  
17 /* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
18  *
19  * Licensed under the Apache License, Version 2.0 (the "License");
20  * you may not use this file except in compliance with the License.
21  * You may obtain a copy of the License at
22  *
23  * http://www.apache.org/licenses/LICENSE-2.0
24  
25  * Unless required by applicable law or agreed to in writing, software
26  * distributed under the License is distributed on an "AS IS" BASIS,
27  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
28  * See the License for the specific language governing permissions and
29  * limitations under the License.
30  */
31
32 #include <apr_lib.h>
33 #include <apr_atomic.h>
34 #include <apr_strings.h>
35 #include <apr_time.h>
36 #include <apr_buckets.h>
37 #include <apr_thread_mutex.h>
38 #include <apr_thread_cond.h>
39
40 #include <httpd.h>
41 #include <http_protocol.h>
42 #include <http_log.h>
43
44 #include "h2_private.h"
45 #include "h2_util.h"
46 #include "h2_bucket_beam.h"
47
48 static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy);
49
50 #define H2_BPROXY_NEXT(e)             APR_RING_NEXT((e), link)
51 #define H2_BPROXY_PREV(e)             APR_RING_PREV((e), link)
52 #define H2_BPROXY_REMOVE(e)           APR_RING_REMOVE((e), link)
53
54 #define H2_BPROXY_LIST_INIT(b)        APR_RING_INIT(&(b)->list, h2_beam_proxy, link);
55 #define H2_BPROXY_LIST_SENTINEL(b)    APR_RING_SENTINEL(&(b)->list, h2_beam_proxy, link)
56 #define H2_BPROXY_LIST_EMPTY(b)       APR_RING_EMPTY(&(b)->list, h2_beam_proxy, link)
57 #define H2_BPROXY_LIST_FIRST(b)       APR_RING_FIRST(&(b)->list)
58 #define H2_BPROXY_LIST_LAST(b)        APR_RING_LAST(&(b)->list)
59 #define H2_PROXY_BLIST_INSERT_HEAD(b, e) do {                           \
60         h2_beam_proxy *ap__b = (e);                                        \
61         APR_RING_INSERT_HEAD(&(b)->list, ap__b, h2_beam_proxy, link);   \
62     } while (0)
63 #define H2_BPROXY_LIST_INSERT_TAIL(b, e) do {                           \
64         h2_beam_proxy *ap__b = (e);                                     \
65         APR_RING_INSERT_TAIL(&(b)->list, ap__b, h2_beam_proxy, link);   \
66     } while (0)
67 #define H2_BPROXY_LIST_CONCAT(a, b) do {                                        \
68         APR_RING_CONCAT(&(a)->list, &(b)->list, h2_beam_proxy, link);   \
69     } while (0)
70 #define H2_BPROXY_LIST_PREPEND(a, b) do {                                       \
71         APR_RING_PREPEND(&(a)->list, &(b)->list, h2_beam_proxy, link);  \
72     } while (0)
73
74
75 /*******************************************************************************
76  * beam bucket with reference to beam and bucket it represents
77  ******************************************************************************/
78
79 const apr_bucket_type_t h2_bucket_type_beam;
80
81 #define H2_BUCKET_IS_BEAM(e)     (e->type == &h2_bucket_type_beam)
82
83 struct h2_beam_proxy {
84     apr_bucket_refcount refcount;
85     APR_RING_ENTRY(h2_beam_proxy) link;
86     h2_bucket_beam *beam;
87     apr_bucket *bsender;
88     apr_size_t n;
89 };
90
91 static const char Dummy = '\0';
92
93 static apr_status_t beam_bucket_read(apr_bucket *b, const char **str, 
94                                      apr_size_t *len, apr_read_type_e block)
95 {
96     h2_beam_proxy *d = b->data;
97     if (d->bsender) {
98         const char *data;
99         apr_status_t status = apr_bucket_read(d->bsender, &data, len, block);
100         if (status == APR_SUCCESS) {
101             *str = data + b->start;
102             *len = b->length;
103         }
104         return status;
105     }
106     *str = &Dummy;
107     *len = 0;
108     return APR_ECONNRESET;
109 }
110
111 static void beam_bucket_destroy(void *data)
112 {
113     h2_beam_proxy *d = data;
114
115     if (apr_bucket_shared_destroy(d)) {
116         /* When the beam gets destroyed before this bucket, it will
117          * NULLify its reference here. This is not protected by a mutex,
118          * so it will not help with race conditions.
119          * But it lets us shut down memory pool with circulare beam
120          * references. */
121         if (d->beam) {
122             h2_beam_emitted(d->beam, d);
123         }
124         apr_bucket_free(d);
125     }
126 }
127
128 static apr_bucket * h2_beam_bucket_make(apr_bucket *b, 
129                                         h2_bucket_beam *beam,
130                                         apr_bucket *bsender, apr_size_t n)
131 {
132     h2_beam_proxy *d;
133
134     d = apr_bucket_alloc(sizeof(*d), b->list);
135     H2_BPROXY_LIST_INSERT_TAIL(&beam->proxies, d);
136     d->beam = beam;
137     d->bsender = bsender;
138     d->n = n;
139     
140     b = apr_bucket_shared_make(b, d, 0, bsender? bsender->length : 0);
141     b->type = &h2_bucket_type_beam;
142
143     return b;
144 }
145
146 static apr_bucket *h2_beam_bucket_create(h2_bucket_beam *beam,
147                                          apr_bucket *bsender,
148                                          apr_bucket_alloc_t *list,
149                                          apr_size_t n)
150 {
151     apr_bucket *b = apr_bucket_alloc(sizeof(*b), list);
152
153     APR_BUCKET_INIT(b);
154     b->free = apr_bucket_free;
155     b->list = list;
156     return h2_beam_bucket_make(b, beam, bsender, n);
157 }
158
159 const apr_bucket_type_t h2_bucket_type_beam = {
160     "BEAM", 5, APR_BUCKET_DATA,
161     beam_bucket_destroy,
162     beam_bucket_read,
163     apr_bucket_setaside_noop,
164     apr_bucket_shared_split,
165     apr_bucket_shared_copy
166 };
167
168 /*******************************************************************************
169  * h2_blist, a brigade without allocations
170  ******************************************************************************/
171
172 static apr_array_header_t *beamers;
173
174 static apr_status_t cleanup_beamers(void *dummy)
175 {
176     (void)dummy;
177     beamers = NULL;
178     return APR_SUCCESS;
179 }
180
181 void h2_register_bucket_beamer(h2_bucket_beamer *beamer)
182 {
183     if (!beamers) {
184         apr_pool_cleanup_register(apr_hook_global_pool, NULL,
185                                   cleanup_beamers, apr_pool_cleanup_null);
186         beamers = apr_array_make(apr_hook_global_pool, 10, 
187                                  sizeof(h2_bucket_beamer*));
188     }
189     APR_ARRAY_PUSH(beamers, h2_bucket_beamer*) = beamer;
190 }
191
192 static apr_bucket *h2_beam_bucket(h2_bucket_beam *beam, 
193                                   apr_bucket_brigade *dest,
194                                   const apr_bucket *src)
195 {
196     apr_bucket *b = NULL;
197     int i;
198     if (beamers) {
199         for (i = 0; i < beamers->nelts && b == NULL; ++i) {
200             h2_bucket_beamer *beamer;
201             
202             beamer = APR_ARRAY_IDX(beamers, i, h2_bucket_beamer*);
203             b = beamer(beam, dest, src);
204         }
205     }
206     return b;
207 }
208
209
210 /*******************************************************************************
211  * bucket beam that can transport buckets across threads
212  ******************************************************************************/
213
214 static void mutex_leave(void *ctx, apr_thread_mutex_t *lock)
215 {
216     apr_thread_mutex_unlock(lock);
217 }
218
219 static apr_status_t mutex_enter(void *ctx, h2_beam_lock *pbl)
220 {
221     h2_bucket_beam *beam = ctx;
222     pbl->mutex = beam->lock;
223     pbl->leave = mutex_leave;
224     return apr_thread_mutex_lock(pbl->mutex);
225 }
226
227 static apr_status_t enter_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl)
228 {
229     return mutex_enter(beam, pbl);
230 }
231
232 static void leave_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl)
233 {
234     if (pbl->leave) {
235         pbl->leave(pbl->leave_ctx, pbl->mutex);
236     }
237 }
238
239 static apr_off_t bucket_mem_used(apr_bucket *b)
240 {
241     if (APR_BUCKET_IS_FILE(b)) {
242         return 0;
243     }
244     else {
245         /* should all have determinate length */
246         return b->length;
247     }
248 }
249
250 static int report_consumption(h2_bucket_beam *beam, h2_beam_lock *pbl)
251 {
252     int rv = 0;
253     apr_off_t len = beam->received_bytes - beam->cons_bytes_reported;
254     h2_beam_io_callback *cb = beam->cons_io_cb;
255      
256     if (len > 0) {
257         if (cb) {
258             void *ctx = beam->cons_ctx;
259             
260             if (pbl) leave_yellow(beam, pbl);
261             cb(ctx, beam, len);
262             if (pbl) enter_yellow(beam, pbl);
263             rv = 1;
264         }
265         beam->cons_bytes_reported += len;
266     }
267     return rv;
268 }
269
270 static void report_prod_io(h2_bucket_beam *beam, int force, h2_beam_lock *pbl)
271 {
272     apr_off_t len = beam->sent_bytes - beam->prod_bytes_reported;
273     if (force || len > 0) {
274         h2_beam_io_callback *cb = beam->prod_io_cb; 
275         if (cb) {
276             void *ctx = beam->prod_ctx;
277             
278             leave_yellow(beam, pbl);
279             cb(ctx, beam, len);
280             enter_yellow(beam, pbl);
281         }
282         beam->prod_bytes_reported += len;
283     }
284 }
285
286 static apr_size_t calc_buffered(h2_bucket_beam *beam)
287 {
288     apr_size_t len = 0;
289     apr_bucket *b;
290     for (b = H2_BLIST_FIRST(&beam->send_list); 
291          b != H2_BLIST_SENTINEL(&beam->send_list);
292          b = APR_BUCKET_NEXT(b)) {
293         if (b->length == ((apr_size_t)-1)) {
294             /* do not count */
295         }
296         else if (APR_BUCKET_IS_FILE(b)) {
297             /* if unread, has no real mem footprint. */
298         }
299         else {
300             len += b->length;
301         }
302     }
303     return len;
304 }
305
306 static void r_purge_sent(h2_bucket_beam *beam)
307 {
308     apr_bucket *b;
309     /* delete all sender buckets in purge brigade, needs to be called
310      * from sender thread only */
311     while (!H2_BLIST_EMPTY(&beam->purge_list)) {
312         b = H2_BLIST_FIRST(&beam->purge_list);
313         apr_bucket_delete(b);
314     }
315 }
316
317 static apr_size_t calc_space_left(h2_bucket_beam *beam)
318 {
319     if (beam->max_buf_size > 0) {
320         apr_off_t len = calc_buffered(beam);
321         return (beam->max_buf_size > len? (beam->max_buf_size - len) : 0);
322     }
323     return APR_SIZE_MAX;
324 }
325
326 static int buffer_is_empty(h2_bucket_beam *beam)
327 {
328     return ((!beam->recv_buffer || APR_BRIGADE_EMPTY(beam->recv_buffer))
329             && H2_BLIST_EMPTY(&beam->send_list));
330 }
331
332 static apr_status_t wait_empty(h2_bucket_beam *beam, apr_read_type_e block,  
333                                apr_thread_mutex_t *lock)
334 {
335     apr_status_t rv = APR_SUCCESS;
336     
337     while (!buffer_is_empty(beam) && APR_SUCCESS == rv) {
338         if (APR_BLOCK_READ != block || !lock) {
339             rv = APR_EAGAIN;
340         }
341         else if (beam->timeout > 0) {
342             rv = apr_thread_cond_timedwait(beam->change, lock, beam->timeout);
343         }
344         else {
345             rv = apr_thread_cond_wait(beam->change, lock);
346         }
347     }
348     return rv;
349 }
350
351 static apr_status_t wait_not_empty(h2_bucket_beam *beam, apr_read_type_e block,  
352                                    apr_thread_mutex_t *lock)
353 {
354     apr_status_t rv = APR_SUCCESS;
355     
356     while (buffer_is_empty(beam) && APR_SUCCESS == rv) {
357         if (beam->aborted) {
358             rv = APR_ECONNABORTED;
359         }
360         else if (beam->closed) {
361             rv = APR_EOF;
362         }
363         else if (APR_BLOCK_READ != block || !lock) {
364             rv = APR_EAGAIN;
365         }
366         else if (beam->timeout > 0) {
367             rv = apr_thread_cond_timedwait(beam->change, lock, beam->timeout);
368         }
369         else {
370             rv = apr_thread_cond_wait(beam->change, lock);
371         }
372     }
373     return rv;
374 }
375
376 static apr_status_t wait_not_full(h2_bucket_beam *beam, apr_read_type_e block, 
377                                   apr_size_t *pspace_left, h2_beam_lock *bl)
378 {
379     apr_status_t rv = APR_SUCCESS;
380     apr_size_t left;
381     
382     while (0 == (left = calc_space_left(beam)) && APR_SUCCESS == rv) {
383         if (beam->aborted) {
384             rv = APR_ECONNABORTED;
385         }
386         else if (block != APR_BLOCK_READ || !bl->mutex) {
387             rv = APR_EAGAIN;
388         }
389         else {
390             if (beam->timeout > 0) {
391                 rv = apr_thread_cond_timedwait(beam->change, bl->mutex, beam->timeout);
392             }
393             else {
394                 rv = apr_thread_cond_wait(beam->change, bl->mutex);
395             }
396         }
397     }
398     *pspace_left = left;
399     return rv;
400 }
401
402 static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy)
403 {
404     h2_beam_lock bl;
405     apr_bucket *b, *next;
406
407     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
408         /* even when beam buckets are split, only the one where
409          * refcount drops to 0 will call us */
410         H2_BPROXY_REMOVE(proxy);
411         /* invoked from receiver thread, the last beam bucket for the send
412          * bucket is about to be destroyed.
413          * remove it from the hold, where it should be now */
414         if (proxy->bsender) {
415             for (b = H2_BLIST_FIRST(&beam->hold_list); 
416                  b != H2_BLIST_SENTINEL(&beam->hold_list);
417                  b = APR_BUCKET_NEXT(b)) {
418                  if (b == proxy->bsender) {
419                     break;
420                  }
421             }
422             if (b != H2_BLIST_SENTINEL(&beam->hold_list)) {
423                 /* bucket is in hold as it should be, mark this one
424                  * and all before it for purging. We might have placed meta
425                  * buckets without a receiver proxy into the hold before it 
426                  * and schedule them for purging now */
427                 for (b = H2_BLIST_FIRST(&beam->hold_list); 
428                      b != H2_BLIST_SENTINEL(&beam->hold_list);
429                      b = next) {
430                     next = APR_BUCKET_NEXT(b);
431                     if (b == proxy->bsender) {
432                         APR_BUCKET_REMOVE(b);
433                         H2_BLIST_INSERT_TAIL(&beam->purge_list, b);
434                         break;
435                     }
436                     else if (APR_BUCKET_IS_METADATA(b)) {
437                         APR_BUCKET_REMOVE(b);
438                         H2_BLIST_INSERT_TAIL(&beam->purge_list, b);
439                     }
440                     else {
441                         /* another data bucket before this one in hold. this
442                          * is normal since DATA buckets need not be destroyed
443                          * in order */
444                     }
445                 }
446                 
447                 proxy->bsender = NULL;
448             }
449             else {
450                 /* it should be there unless we screwed up */
451                 ap_log_perror(APLOG_MARK, APLOG_WARNING, 0, beam->send_pool, 
452                               APLOGNO(03384) "h2_beam(%d-%s): emitted bucket not "
453                               "in hold, n=%d", beam->id, beam->tag, 
454                               (int)proxy->n);
455                 ap_assert(!proxy->bsender);
456             }
457         }
458         /* notify anyone waiting on space to become available */
459         if (!bl.mutex) {
460             r_purge_sent(beam);
461         }
462         else {
463             apr_thread_cond_broadcast(beam->change);
464         }
465         leave_yellow(beam, &bl);
466     }
467 }
468
469 static void h2_blist_cleanup(h2_blist *bl)
470 {
471     apr_bucket *e;
472
473     while (!H2_BLIST_EMPTY(bl)) {
474         e = H2_BLIST_FIRST(bl);
475         apr_bucket_delete(e);
476     }
477 }
478
479 static apr_status_t beam_close(h2_bucket_beam *beam)
480 {
481     if (!beam->closed) {
482         beam->closed = 1;
483         apr_thread_cond_broadcast(beam->change);
484     }
485     return APR_SUCCESS;
486 }
487
488 int h2_beam_is_closed(h2_bucket_beam *beam)
489 {
490     return beam->closed;
491 }
492
493 static int pool_register(h2_bucket_beam *beam, apr_pool_t *pool, 
494                          apr_status_t (*cleanup)(void *))
495 {
496     if (pool && pool != beam->pool) {
497         apr_pool_pre_cleanup_register(pool, beam, cleanup);
498         return 1;
499     }
500     return 0;
501 }
502
503 static int pool_kill(h2_bucket_beam *beam, apr_pool_t *pool,
504                      apr_status_t (*cleanup)(void *)) {
505     if (pool && pool != beam->pool) {
506         apr_pool_cleanup_kill(pool, beam, cleanup);
507         return 1;
508     }
509     return 0;
510 }
511
512 static apr_status_t beam_recv_cleanup(void *data)
513 {
514     h2_bucket_beam *beam = data;
515     /* receiver pool has gone away, clear references */
516     beam->recv_buffer = NULL;
517     beam->recv_pool = NULL;
518     return APR_SUCCESS;
519 }
520
521 static apr_status_t beam_send_cleanup(void *data)
522 {
523     h2_bucket_beam *beam = data;
524     /* sender is going away, clear up all references to its memory */
525     r_purge_sent(beam);
526     h2_blist_cleanup(&beam->send_list);
527     report_consumption(beam, NULL);
528     while (!H2_BPROXY_LIST_EMPTY(&beam->proxies)) {
529         h2_beam_proxy *proxy = H2_BPROXY_LIST_FIRST(&beam->proxies);
530         H2_BPROXY_REMOVE(proxy);
531         proxy->beam = NULL;
532         proxy->bsender = NULL;
533     }
534     h2_blist_cleanup(&beam->purge_list);
535     h2_blist_cleanup(&beam->hold_list);
536     beam->send_pool = NULL;
537     return APR_SUCCESS;
538 }
539
540 static void beam_set_send_pool(h2_bucket_beam *beam, apr_pool_t *pool) 
541 {
542     if (beam->send_pool != pool) {
543         if (beam->send_pool && beam->send_pool != beam->pool) {
544             pool_kill(beam, beam->send_pool, beam_send_cleanup);
545             beam_send_cleanup(beam);
546         }
547         beam->send_pool = pool;
548         pool_register(beam, beam->send_pool, beam_send_cleanup);
549     }
550 }
551
552 static void recv_buffer_cleanup(h2_bucket_beam *beam, h2_beam_lock *bl)
553 {
554     if (beam->recv_buffer && !APR_BRIGADE_EMPTY(beam->recv_buffer)) {
555         apr_bucket_brigade *bb = beam->recv_buffer;
556         apr_off_t bblen = 0;
557         
558         beam->recv_buffer = NULL;
559         apr_brigade_length(bb, 0, &bblen);
560         beam->received_bytes += bblen;
561         
562         /* need to do this unlocked since bucket destroy might 
563          * call this beam again. */
564         if (bl) leave_yellow(beam, bl);
565         apr_brigade_destroy(bb);
566         if (bl) enter_yellow(beam, bl);
567         
568         if (beam->cons_ev_cb) { 
569             beam->cons_ev_cb(beam->cons_ctx, beam);
570         }
571     }
572 }
573
574 static apr_status_t beam_cleanup(void *data)
575 {
576     h2_bucket_beam *beam = data;
577     apr_status_t status = APR_SUCCESS;
578     int safe_send = (beam->owner == H2_BEAM_OWNER_SEND);
579     int safe_recv = (beam->owner == H2_BEAM_OWNER_RECV);
580     
581     /* 
582      * Owner of the beam is going away, depending on which side it owns,
583      * cleanup strategies will differ.
584      *
585      * In general, receiver holds references to memory from sender. 
586      * Clean up receiver first, if safe, then cleanup sender, if safe.
587      */
588      
589     /* When modify send is not safe, this means we still have multi-thread
590      * protection and the owner is receiving the buckets. If the sending
591      * side has not gone away, this means we could have dangling buckets
592      * in our lists that never get destroyed. This should not happen. */
593     ap_assert(safe_send || !beam->send_pool);
594     if (!H2_BLIST_EMPTY(&beam->send_list)) {
595         ap_assert(beam->send_pool);
596     }
597     
598     if (safe_recv) {
599         if (beam->recv_pool) {
600             pool_kill(beam, beam->recv_pool, beam_recv_cleanup);
601             beam->recv_pool = NULL;
602         }
603         recv_buffer_cleanup(beam, NULL);
604     }
605     else {
606         beam->recv_buffer = NULL;
607         beam->recv_pool = NULL;
608     }
609     
610     if (safe_send && beam->send_pool) {
611         pool_kill(beam, beam->send_pool, beam_send_cleanup);
612         status = beam_send_cleanup(beam);
613     }
614     
615     if (safe_recv) {
616         ap_assert(H2_BPROXY_LIST_EMPTY(&beam->proxies));
617         ap_assert(H2_BLIST_EMPTY(&beam->send_list));
618         ap_assert(H2_BLIST_EMPTY(&beam->hold_list));
619         ap_assert(H2_BLIST_EMPTY(&beam->purge_list));
620     }
621     return status;
622 }
623
624 apr_status_t h2_beam_destroy(h2_bucket_beam *beam)
625 {
626     apr_pool_cleanup_kill(beam->pool, beam, beam_cleanup);
627     return beam_cleanup(beam);
628 }
629
630 apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *pool, 
631                             int id, const char *tag, 
632                             h2_beam_owner_t owner,
633                             apr_size_t max_buf_size,
634                             apr_interval_time_t timeout)
635 {
636     h2_bucket_beam *beam;
637     apr_status_t rv = APR_SUCCESS;
638     
639     beam = apr_pcalloc(pool, sizeof(*beam));
640     if (!beam) {
641         return APR_ENOMEM;
642     }
643
644     beam->id = id;
645     beam->tag = tag;
646     beam->pool = pool;
647     beam->owner = owner;
648     H2_BLIST_INIT(&beam->send_list);
649     H2_BLIST_INIT(&beam->hold_list);
650     H2_BLIST_INIT(&beam->purge_list);
651     H2_BPROXY_LIST_INIT(&beam->proxies);
652     beam->tx_mem_limits = 1;
653     beam->max_buf_size = max_buf_size;
654     beam->timeout = timeout;
655
656     rv = apr_thread_mutex_create(&beam->lock, APR_THREAD_MUTEX_DEFAULT, pool);
657     if (APR_SUCCESS == rv) {
658         rv = apr_thread_cond_create(&beam->change, pool);
659         if (APR_SUCCESS == rv) {
660             apr_pool_pre_cleanup_register(pool, beam, beam_cleanup);
661             *pbeam = beam;
662         }
663     }
664     return rv;
665 }
666
667 void h2_beam_buffer_size_set(h2_bucket_beam *beam, apr_size_t buffer_size)
668 {
669     h2_beam_lock bl;
670     
671     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
672         beam->max_buf_size = buffer_size;
673         leave_yellow(beam, &bl);
674     }
675 }
676
677 apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam)
678 {
679     h2_beam_lock bl;
680     apr_size_t buffer_size = 0;
681     
682     if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
683         buffer_size = beam->max_buf_size;
684         leave_yellow(beam, &bl);
685     }
686     return buffer_size;
687 }
688
689 void h2_beam_timeout_set(h2_bucket_beam *beam, apr_interval_time_t timeout)
690 {
691     h2_beam_lock bl;
692     
693     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
694         beam->timeout = timeout;
695         leave_yellow(beam, &bl);
696     }
697 }
698
699 apr_interval_time_t h2_beam_timeout_get(h2_bucket_beam *beam)
700 {
701     h2_beam_lock bl;
702     apr_interval_time_t timeout = 0;
703     
704     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
705         timeout = beam->timeout;
706         leave_yellow(beam, &bl);
707     }
708     return timeout;
709 }
710
711 void h2_beam_abort(h2_bucket_beam *beam)
712 {
713     h2_beam_lock bl;
714     
715     if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
716         if (!beam->aborted) {
717             beam->aborted = 1;
718             r_purge_sent(beam);
719             h2_blist_cleanup(&beam->send_list);
720             report_consumption(beam, &bl);
721         }
722         apr_thread_cond_broadcast(beam->change);
723         leave_yellow(beam, &bl);
724     }
725 }
726
727 apr_status_t h2_beam_close(h2_bucket_beam *beam)
728 {
729     h2_beam_lock bl;
730     
731     if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
732         r_purge_sent(beam);
733         beam_close(beam);
734         report_consumption(beam, &bl);
735         leave_yellow(beam, &bl);
736     }
737     return beam->aborted? APR_ECONNABORTED : APR_SUCCESS;
738 }
739
740 apr_status_t h2_beam_leave(h2_bucket_beam *beam)
741 {
742     h2_beam_lock bl;
743     
744     if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
745         recv_buffer_cleanup(beam, &bl);
746         beam->aborted = 1;
747         beam_close(beam);
748         leave_yellow(beam, &bl);
749     }
750     return APR_SUCCESS;
751 }
752
753 apr_status_t h2_beam_wait_empty(h2_bucket_beam *beam, apr_read_type_e block)
754 {
755     apr_status_t status;
756     h2_beam_lock bl;
757     
758     if ((status = enter_yellow(beam, &bl)) == APR_SUCCESS) {
759         status = wait_empty(beam, block, bl.mutex);
760         leave_yellow(beam, &bl);
761     }
762     return status;
763 }
764
765 static void move_to_hold(h2_bucket_beam *beam, 
766                          apr_bucket_brigade *sender_bb)
767 {
768     apr_bucket *b;
769     while (sender_bb && !APR_BRIGADE_EMPTY(sender_bb)) {
770         b = APR_BRIGADE_FIRST(sender_bb);
771         APR_BUCKET_REMOVE(b);
772         H2_BLIST_INSERT_TAIL(&beam->send_list, b);
773     }
774 }
775
776 static apr_status_t append_bucket(h2_bucket_beam *beam, 
777                                   apr_bucket *b,
778                                   apr_read_type_e block,
779                                   apr_size_t *pspace_left,
780                                   h2_beam_lock *pbl)
781 {
782     const char *data;
783     apr_size_t len;
784     apr_status_t status;
785     int can_beam, check_len;
786     
787     if (beam->aborted) {
788         return APR_ECONNABORTED;
789     }
790     
791     if (APR_BUCKET_IS_METADATA(b)) {
792         if (APR_BUCKET_IS_EOS(b)) {
793             beam->closed = 1;
794         }
795         APR_BUCKET_REMOVE(b);
796         H2_BLIST_INSERT_TAIL(&beam->send_list, b);
797         return APR_SUCCESS;
798     }
799     else if (APR_BUCKET_IS_FILE(b)) {
800         /* For file buckets the problem is their internal readpool that
801          * is used on the first read to allocate buffer/mmap.
802          * Since setting aside a file bucket will de-register the
803          * file cleanup function from the previous pool, we need to
804          * call that only from the sender thread.
805          *
806          * Currently, we do not handle file bucket with refcount > 1 as
807          * the beam is then not in complete control of the file's lifetime.
808          * Which results in the bug that a file get closed by the receiver
809          * while the sender or the beam still have buckets using it. 
810          * 
811          * Additionally, we allow callbacks to prevent beaming file
812          * handles across. The use case for this is to limit the number 
813          * of open file handles and rather use a less efficient beam
814          * transport. */
815         apr_bucket_file *bf = b->data;
816         apr_file_t *fd = bf->fd;
817         can_beam = (bf->refcount.refcount == 1);
818         if (can_beam && beam->can_beam_fn) {
819             can_beam = beam->can_beam_fn(beam->can_beam_ctx, beam, fd);
820         }
821         check_len = !can_beam;
822     }
823     else {
824         if (b->length == ((apr_size_t)-1)) {
825             const char *data;
826             status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
827             if (status != APR_SUCCESS) {
828                 return status;
829             }
830         }
831         check_len = 1;
832     }
833     
834     if (check_len) {
835         if (b->length > *pspace_left) {
836             apr_bucket_split(b, *pspace_left);
837         }
838         *pspace_left -= b->length;
839     }
840
841     /* The fundamental problem is that reading a sender bucket from
842      * a receiver thread is a total NO GO, because the bucket might use
843      * its pool/bucket_alloc from a foreign thread and that will
844      * corrupt. */
845     status = APR_ENOTIMPL;
846     if (APR_BUCKET_IS_TRANSIENT(b)) {
847         /* this takes care of transient buckets and converts them
848          * into heap ones. Other bucket types might or might not be
849          * affected by this. */
850         status = apr_bucket_setaside(b, beam->send_pool);
851     }
852     else if (APR_BUCKET_IS_HEAP(b)) {
853         /* For heap buckets read from a receiver thread is fine. The
854          * data will be there and live until the bucket itself is
855          * destroyed. */
856         status = APR_SUCCESS;
857     }
858     else if (APR_BUCKET_IS_POOL(b)) {
859         /* pool buckets are bastards that register at pool cleanup
860          * to morph themselves into heap buckets. That may happen anytime,
861          * even after the bucket data pointer has been read. So at
862          * any time inside the receiver thread, the pool bucket memory
863          * may disappear. yikes. */
864         status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
865         if (status == APR_SUCCESS) {
866             apr_bucket_heap_make(b, data, len, NULL);
867         }
868     }
869     else if (APR_BUCKET_IS_FILE(b) && can_beam) {
870         status = apr_bucket_setaside(b, beam->send_pool);
871     }
872     
873     if (status == APR_ENOTIMPL) {
874         /* we have no knowledge about the internals of this bucket,
875          * but hope that after read, its data stays immutable for the
876          * lifetime of the bucket. (see pool bucket handling above for
877          * a counter example).
878          * We do the read while in the sender thread, so that the bucket may
879          * use pools/allocators safely. */
880         status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
881         if (status == APR_SUCCESS) {
882             status = apr_bucket_setaside(b, beam->send_pool);
883         }
884     }
885     
886     if (status != APR_SUCCESS && status != APR_ENOTIMPL) {
887         return status;
888     }
889     
890     APR_BUCKET_REMOVE(b);
891     H2_BLIST_INSERT_TAIL(&beam->send_list, b);
892     beam->sent_bytes += b->length;
893
894     return APR_SUCCESS;
895 }
896
897 void h2_beam_send_from(h2_bucket_beam *beam, apr_pool_t *p)
898 {
899     h2_beam_lock bl;
900     /* Called from the sender thread to add buckets to the beam */
901     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
902         r_purge_sent(beam);
903         beam_set_send_pool(beam, p);
904         leave_yellow(beam, &bl);
905     }
906 }
907
908 apr_status_t h2_beam_send(h2_bucket_beam *beam, 
909                           apr_bucket_brigade *sender_bb, 
910                           apr_read_type_e block)
911 {
912     apr_bucket *b;
913     apr_status_t rv = APR_SUCCESS;
914     apr_size_t space_left = 0;
915     h2_beam_lock bl;
916
917     /* Called from the sender thread to add buckets to the beam */
918     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
919         ap_assert(beam->send_pool);
920         r_purge_sent(beam);
921         
922         if (beam->aborted) {
923             move_to_hold(beam, sender_bb);
924             rv = APR_ECONNABORTED;
925         }
926         else if (sender_bb) {
927             int force_report = !APR_BRIGADE_EMPTY(sender_bb);
928             
929             space_left = calc_space_left(beam);
930             while (!APR_BRIGADE_EMPTY(sender_bb) && APR_SUCCESS == rv) {
931                 if (space_left <= 0) {
932                     report_prod_io(beam, force_report, &bl);
933                     rv = wait_not_full(beam, block, &space_left, &bl);
934                     if (APR_SUCCESS != rv) {
935                         break;
936                     }
937                 }
938                 b = APR_BRIGADE_FIRST(sender_bb);
939                 rv = append_bucket(beam, b, block, &space_left, &bl);
940             }
941             
942             report_prod_io(beam, force_report, &bl);
943             apr_thread_cond_broadcast(beam->change);
944         }
945         report_consumption(beam, &bl);
946         leave_yellow(beam, &bl);
947     }
948     return rv;
949 }
950
951 apr_status_t h2_beam_receive(h2_bucket_beam *beam, 
952                              apr_bucket_brigade *bb, 
953                              apr_read_type_e block,
954                              apr_off_t readbytes)
955 {
956     h2_beam_lock bl;
957     apr_bucket *bsender, *brecv, *ng;
958     int transferred = 0;
959     apr_status_t status = APR_SUCCESS;
960     apr_off_t remain;
961     int transferred_buckets = 0;
962     
963     /* Called from the receiver thread to take buckets from the beam */
964     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
965         if (readbytes <= 0) {
966             readbytes = APR_SIZE_MAX;
967         }
968         remain = readbytes;
969         
970 transfer:
971         if (beam->aborted) {
972             recv_buffer_cleanup(beam, &bl);
973             status = APR_ECONNABORTED;
974             goto leave;
975         }
976
977         /* transfer enough buckets from our receiver brigade, if we have one */
978         while (remain >= 0 
979                && beam->recv_buffer 
980                && !APR_BRIGADE_EMPTY(beam->recv_buffer)) {
981                
982             brecv = APR_BRIGADE_FIRST(beam->recv_buffer);
983             if (brecv->length > 0 && remain <= 0) {
984                 break;
985             }            
986             APR_BUCKET_REMOVE(brecv);
987             APR_BRIGADE_INSERT_TAIL(bb, brecv);
988             remain -= brecv->length;
989             ++transferred;
990         }
991
992         /* transfer from our sender brigade, transforming sender buckets to
993          * receiver ones until we have enough */
994         while (remain >= 0 && !H2_BLIST_EMPTY(&beam->send_list)) {
995                
996             brecv = NULL;
997             bsender = H2_BLIST_FIRST(&beam->send_list);            
998             if (bsender->length > 0 && remain <= 0) {
999                 break;
1000             }
1001                         
1002             if (APR_BUCKET_IS_METADATA(bsender)) {
1003                 if (APR_BUCKET_IS_EOS(bsender)) {
1004                     brecv = apr_bucket_eos_create(bb->bucket_alloc);
1005                     beam->close_sent = 1;
1006                 }
1007                 else if (APR_BUCKET_IS_FLUSH(bsender)) {
1008                     brecv = apr_bucket_flush_create(bb->bucket_alloc);
1009                 }
1010                 else if (AP_BUCKET_IS_ERROR(bsender)) {
1011                     ap_bucket_error *eb = (ap_bucket_error *)bsender;
1012                     brecv = ap_bucket_error_create(eb->status, eb->data,
1013                                                     bb->p, bb->bucket_alloc);
1014                 }
1015             }
1016             else if (bsender->length == 0) {
1017                 APR_BUCKET_REMOVE(bsender);
1018                 H2_BLIST_INSERT_TAIL(&beam->hold_list, bsender);
1019                 continue;
1020             }
1021             else if (APR_BUCKET_IS_FILE(bsender)) {
1022                 /* This is set aside into the target brigade pool so that 
1023                  * any read operation messes with that pool and not 
1024                  * the sender one. */
1025                 apr_bucket_file *f = (apr_bucket_file *)bsender->data;
1026                 apr_file_t *fd = f->fd;
1027                 int setaside = (f->readpool != bb->p);
1028                 
1029                 if (setaside) {
1030                     status = apr_file_setaside(&fd, fd, bb->p);
1031                     if (status != APR_SUCCESS) {
1032                         goto leave;
1033                     }
1034                     ++beam->files_beamed;
1035                 }
1036                 ng = apr_brigade_insert_file(bb, fd, bsender->start, bsender->length, 
1037                                              bb->p);
1038 #if APR_HAS_MMAP
1039                 /* disable mmap handling as this leads to segfaults when
1040                  * the underlying file is changed while memory pointer has
1041                  * been handed out. See also PR 59348 */
1042                 apr_bucket_file_enable_mmap(ng, 0);
1043 #endif
1044                 APR_BUCKET_REMOVE(bsender);
1045                 H2_BLIST_INSERT_TAIL(&beam->hold_list, bsender);
1046
1047                 remain -= bsender->length;
1048                 ++transferred;
1049                 ++transferred_buckets;
1050                 continue;
1051             }
1052             else {
1053                 /* create a "receiver" standin bucket. we took care about the
1054                  * underlying sender bucket and its data when we placed it into
1055                  * the sender brigade.
1056                  * the beam bucket will notify us on destruction that bsender is
1057                  * no longer needed. */
1058                 brecv = h2_beam_bucket_create(beam, bsender, bb->bucket_alloc,
1059                                                beam->buckets_sent++);
1060             }
1061             
1062             /* Place the sender bucket into our hold, to be destroyed when no
1063              * receiver bucket references it any more. */
1064             APR_BUCKET_REMOVE(bsender);
1065             H2_BLIST_INSERT_TAIL(&beam->hold_list, bsender);
1066             
1067             beam->received_bytes += bsender->length;
1068             ++transferred_buckets;
1069             
1070             if (brecv) {
1071                 APR_BRIGADE_INSERT_TAIL(bb, brecv);
1072                 remain -= brecv->length;
1073                 ++transferred;
1074             }
1075             else {
1076                 /* let outside hook determine how bucket is beamed */
1077                 leave_yellow(beam, &bl);
1078                 brecv = h2_beam_bucket(beam, bb, bsender);
1079                 enter_yellow(beam, &bl);
1080                 
1081                 while (brecv && brecv != APR_BRIGADE_SENTINEL(bb)) {
1082                     ++transferred;
1083                     remain -= brecv->length;
1084                     brecv = APR_BUCKET_NEXT(brecv);
1085                 }
1086             }
1087         }
1088
1089         if (remain < 0) {
1090             /* too much, put some back into out recv_buffer */
1091             remain = readbytes;
1092             for (brecv = APR_BRIGADE_FIRST(bb);
1093                  brecv != APR_BRIGADE_SENTINEL(bb);
1094                  brecv = APR_BUCKET_NEXT(brecv)) {
1095                 remain -= (beam->tx_mem_limits? bucket_mem_used(brecv) 
1096                            : brecv->length);
1097                 if (remain < 0) {
1098                     apr_bucket_split(brecv, brecv->length+remain);
1099                     beam->recv_buffer = apr_brigade_split_ex(bb, 
1100                                                              APR_BUCKET_NEXT(brecv), 
1101                                                              beam->recv_buffer);
1102                     break;
1103                 }
1104             }
1105         }
1106
1107         if (beam->closed && buffer_is_empty(beam)) {
1108             /* beam is closed and we have nothing more to receive */ 
1109             if (!beam->close_sent) {
1110                 apr_bucket *b = apr_bucket_eos_create(bb->bucket_alloc);
1111                 APR_BRIGADE_INSERT_TAIL(bb, b);
1112                 beam->close_sent = 1;
1113                 ++transferred;
1114                 status = APR_SUCCESS;
1115             }
1116         }
1117         
1118         if (transferred_buckets > 0) {
1119            if (beam->cons_ev_cb) { 
1120                beam->cons_ev_cb(beam->cons_ctx, beam);
1121             }
1122         }
1123         
1124         if (transferred) {
1125             apr_thread_cond_broadcast(beam->change);
1126             status = APR_SUCCESS;
1127         }
1128         else {
1129             status = wait_not_empty(beam, block, bl.mutex);
1130             if (status != APR_SUCCESS) {
1131                 goto leave;
1132             }
1133             goto transfer;
1134         }
1135 leave:        
1136         leave_yellow(beam, &bl);
1137     }
1138     return status;
1139 }
1140
1141 void h2_beam_on_consumed(h2_bucket_beam *beam, 
1142                          h2_beam_ev_callback *ev_cb,
1143                          h2_beam_io_callback *io_cb, void *ctx)
1144 {
1145     h2_beam_lock bl;
1146     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
1147         beam->cons_ev_cb = ev_cb;
1148         beam->cons_io_cb = io_cb;
1149         beam->cons_ctx = ctx;
1150         leave_yellow(beam, &bl);
1151     }
1152 }
1153
1154 void h2_beam_on_produced(h2_bucket_beam *beam, 
1155                          h2_beam_io_callback *io_cb, void *ctx)
1156 {
1157     h2_beam_lock bl;
1158     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
1159         beam->prod_io_cb = io_cb;
1160         beam->prod_ctx = ctx;
1161         leave_yellow(beam, &bl);
1162     }
1163 }
1164
1165 void h2_beam_on_file_beam(h2_bucket_beam *beam, 
1166                           h2_beam_can_beam_callback *cb, void *ctx)
1167 {
1168     h2_beam_lock bl;
1169     
1170     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
1171         beam->can_beam_fn = cb;
1172         beam->can_beam_ctx = ctx;
1173         leave_yellow(beam, &bl);
1174     }
1175 }
1176
1177
1178 apr_off_t h2_beam_get_buffered(h2_bucket_beam *beam)
1179 {
1180     apr_bucket *b;
1181     apr_off_t l = 0;
1182     h2_beam_lock bl;
1183     
1184     if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
1185         for (b = H2_BLIST_FIRST(&beam->send_list); 
1186             b != H2_BLIST_SENTINEL(&beam->send_list);
1187             b = APR_BUCKET_NEXT(b)) {
1188             /* should all have determinate length */
1189             l += b->length;
1190         }
1191         leave_yellow(beam, &bl);
1192     }
1193     return l;
1194 }
1195
1196 apr_off_t h2_beam_get_mem_used(h2_bucket_beam *beam)
1197 {
1198     apr_bucket *b;
1199     apr_off_t l = 0;
1200     h2_beam_lock bl;
1201     
1202     if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
1203         for (b = H2_BLIST_FIRST(&beam->send_list); 
1204             b != H2_BLIST_SENTINEL(&beam->send_list);
1205             b = APR_BUCKET_NEXT(b)) {
1206             l += bucket_mem_used(b);
1207         }
1208         leave_yellow(beam, &bl);
1209     }
1210     return l;
1211 }
1212
1213 int h2_beam_empty(h2_bucket_beam *beam)
1214 {
1215     int empty = 1;
1216     h2_beam_lock bl;
1217     
1218     if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
1219         empty = (H2_BLIST_EMPTY(&beam->send_list) 
1220                  && (!beam->recv_buffer || APR_BRIGADE_EMPTY(beam->recv_buffer)));
1221         leave_yellow(beam, &bl);
1222     }
1223     return empty;
1224 }
1225
1226 int h2_beam_holds_proxies(h2_bucket_beam *beam)
1227 {
1228     int has_proxies = 1;
1229     h2_beam_lock bl;
1230     
1231     if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
1232         has_proxies = !H2_BPROXY_LIST_EMPTY(&beam->proxies);
1233         leave_yellow(beam, &bl);
1234     }
1235     return has_proxies;
1236 }
1237
1238 int h2_beam_was_received(h2_bucket_beam *beam)
1239 {
1240     int happend = 0;
1241     h2_beam_lock bl;
1242     
1243     if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
1244         happend = (beam->received_bytes > 0);
1245         leave_yellow(beam, &bl);
1246     }
1247     return happend;
1248 }
1249
1250 apr_size_t h2_beam_get_files_beamed(h2_bucket_beam *beam)
1251 {
1252     apr_size_t n = 0;
1253     h2_beam_lock bl;
1254     
1255     if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
1256         n = beam->files_beamed;
1257         leave_yellow(beam, &bl);
1258     }
1259     return n;
1260 }
1261
1262 int h2_beam_no_files(void *ctx, h2_bucket_beam *beam, apr_file_t *file)
1263 {
1264     return 0;
1265 }
1266
1267 int h2_beam_report_consumption(h2_bucket_beam *beam)
1268 {
1269     h2_beam_lock bl;
1270     int rv = 0;
1271     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
1272         rv = report_consumption(beam, &bl);
1273         leave_yellow(beam, &bl);
1274     }
1275     return rv;
1276 }
1277
1278 void h2_beam_log(h2_bucket_beam *beam, conn_rec *c, int level, const char *msg)
1279 {
1280     if (beam && APLOG_C_IS_LEVEL(c,level)) {
1281         ap_log_cerror(APLOG_MARK, level, 0, c, 
1282                       "beam(%ld-%d,%s,closed=%d,aborted=%d,empty=%d,buf=%ld): %s", 
1283                       (c->master? c->master->id : c->id), beam->id, beam->tag, 
1284                       beam->closed, beam->aborted, h2_beam_empty(beam), 
1285                       (long)h2_beam_get_buffered(beam), msg);
1286     }
1287 }
1288
1289