]> granicus.if.org Git - apache/blob - modules/http2/h2_bucket_beam.c
1338ba68b011f3b24f6cf5495f13deefa015aa3f
[apache] / modules / http2 / h2_bucket_beam.c
1 /* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <apr_lib.h>
17 #include <apr_strings.h>
18 #include <apr_time.h>
19 #include <apr_buckets.h>
20 #include <apr_thread_mutex.h>
21 #include <apr_thread_cond.h>
22
23 #include <httpd.h>
24 #include <http_log.h>
25
26 #include "h2_private.h"
27 #include "h2_util.h"
28 #include "h2_bucket_beam.h"
29
30 static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy);
31
32 #define H2_BPROXY_NEXT(e)             APR_RING_NEXT((e), link)
33 #define H2_BPROXY_PREV(e)             APR_RING_PREV((e), link)
34 #define H2_BPROXY_REMOVE(e)           APR_RING_REMOVE((e), link)
35
36 #define H2_BPROXY_LIST_INIT(b)        APR_RING_INIT(&(b)->list, h2_beam_proxy, link);
37 #define H2_BPROXY_LIST_SENTINEL(b)    APR_RING_SENTINEL(&(b)->list, h2_beam_proxy, link)
38 #define H2_BPROXY_LIST_EMPTY(b)       APR_RING_EMPTY(&(b)->list, h2_beam_proxy, link)
39 #define H2_BPROXY_LIST_FIRST(b)       APR_RING_FIRST(&(b)->list)
40 #define H2_BPROXY_LIST_LAST(b)        APR_RING_LAST(&(b)->list)
41 #define H2_PROXY_BLIST_INSERT_HEAD(b, e) do {                           \
42         h2_beam_proxy *ap__b = (e);                                        \
43         APR_RING_INSERT_HEAD(&(b)->list, ap__b, h2_beam_proxy, link);   \
44     } while (0)
45 #define H2_BPROXY_LIST_INSERT_TAIL(b, e) do {                           \
46         h2_beam_proxy *ap__b = (e);                                     \
47         APR_RING_INSERT_TAIL(&(b)->list, ap__b, h2_beam_proxy, link);   \
48     } while (0)
49 #define H2_BPROXY_LIST_CONCAT(a, b) do {                                        \
50         APR_RING_CONCAT(&(a)->list, &(b)->list, h2_beam_proxy, link);   \
51     } while (0)
52 #define H2_BPROXY_LIST_PREPEND(a, b) do {                                       \
53         APR_RING_PREPEND(&(a)->list, &(b)->list, h2_beam_proxy, link);  \
54     } while (0)
55
56
57 /*******************************************************************************
58  * beam bucket with reference to beam and bucket it represents
59  ******************************************************************************/
60
61 const apr_bucket_type_t h2_bucket_type_beam;
62
63 #define H2_BUCKET_IS_BEAM(e)     (e->type == &h2_bucket_type_beam)
64
65 struct h2_beam_proxy {
66     apr_bucket_refcount refcount;
67     APR_RING_ENTRY(h2_beam_proxy) link;
68     h2_bucket_beam *beam;
69     apr_bucket *bred;
70     apr_size_t n;
71 };
72
73 static const char Dummy = '\0';
74
75 static apr_status_t beam_bucket_read(apr_bucket *b, const char **str, 
76                                      apr_size_t *len, apr_read_type_e block)
77 {
78     h2_beam_proxy *d = b->data;
79     if (d->bred) {
80         const char *data;
81         apr_status_t status = apr_bucket_read(d->bred, &data, len, block);
82         if (status == APR_SUCCESS) {
83             *str = data + b->start;
84             *len = b->length;
85         }
86         return status;
87     }
88     *str = &Dummy;
89     *len = 0;
90     return APR_ECONNRESET;
91 }
92
93 static void beam_bucket_destroy(void *data)
94 {
95     h2_beam_proxy *d = data;
96
97     if (apr_bucket_shared_destroy(d)) {
98         /* When the beam gets destroyed before this bucket, it will
99          * NULLify its reference here. This is not protected by a mutex,
100          * so it will not help with race conditions.
101          * But it lets us shut down memory pool with circulare beam
102          * references. */
103         if (d->beam) {
104             h2_beam_emitted(d->beam, d);
105         }
106         apr_bucket_free(d);
107     }
108 }
109
110 static apr_bucket * h2_beam_bucket_make(apr_bucket *b, 
111                                         h2_bucket_beam *beam,
112                                         apr_bucket *bred, apr_size_t n)
113 {
114     h2_beam_proxy *d;
115
116     d = apr_bucket_alloc(sizeof(*d), b->list);
117     H2_BPROXY_LIST_INSERT_TAIL(&beam->proxies, d);
118     d->beam = beam;
119     d->bred = bred;
120     d->n = n;
121     
122     b = apr_bucket_shared_make(b, d, 0, bred? bred->length : 0);
123     b->type = &h2_bucket_type_beam;
124
125     return b;
126 }
127
128 static apr_bucket *h2_beam_bucket_create(h2_bucket_beam *beam,
129                                          apr_bucket *bred,
130                                          apr_bucket_alloc_t *list,
131                                          apr_size_t n)
132 {
133     apr_bucket *b = apr_bucket_alloc(sizeof(*b), list);
134
135     APR_BUCKET_INIT(b);
136     b->free = apr_bucket_free;
137     b->list = list;
138     return h2_beam_bucket_make(b, beam, bred, n);
139 }
140
141 /*static apr_status_t beam_bucket_setaside(apr_bucket *b, apr_pool_t *pool)
142 {
143     apr_status_t status = APR_SUCCESS;
144     h2_beam_proxy *d = b->data;
145     if (d->bred) {
146         const char *data;
147         apr_size_t len;
148         
149         status = apr_bucket_read(d->bred, &data, &len, APR_BLOCK_READ);
150         if (status == APR_SUCCESS) {
151             b = apr_bucket_heap_make(b, (char *)data + b->start, b->length, NULL);
152             if (b == NULL) {
153                 return APR_ENOMEM;
154             }
155         }
156     }
157     return status;
158 }*/
159
160 const apr_bucket_type_t h2_bucket_type_beam = {
161     "BEAM", 5, APR_BUCKET_DATA,
162     beam_bucket_destroy,
163     beam_bucket_read,
164     apr_bucket_setaside_noop,
165     apr_bucket_shared_split,
166     apr_bucket_shared_copy
167 };
168
169 /*******************************************************************************
170  * h2_blist, a brigade without allocations
171  ******************************************************************************/
172  
173 apr_size_t h2_util_bl_print(char *buffer, apr_size_t bmax, 
174                             const char *tag, const char *sep, 
175                             h2_blist *bl)
176 {
177     apr_size_t off = 0;
178     const char *sp = "";
179     apr_bucket *b;
180     
181     if (bl) {
182         memset(buffer, 0, bmax--);
183         off += apr_snprintf(buffer+off, bmax-off, "%s(", tag);
184         for (b = H2_BLIST_FIRST(bl); 
185              bmax && (b != H2_BLIST_SENTINEL(bl));
186              b = APR_BUCKET_NEXT(b)) {
187             
188             off += h2_util_bucket_print(buffer+off, bmax-off, b, sp);
189             sp = " ";
190         }
191         off += apr_snprintf(buffer+off, bmax-off, ")%s", sep);
192     }
193     else {
194         off += apr_snprintf(buffer+off, bmax-off, "%s(null)%s", tag, sep);
195     }
196     return off;
197 }
198
199
200
201 /*******************************************************************************
202  * bucket beam that can transport buckets across threads
203  ******************************************************************************/
204
205 static apr_status_t enter_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl)
206 {
207     h2_beam_mutex_enter *enter = beam->m_enter;
208     if (enter) {
209         void *ctx = beam->m_ctx;
210         if (ctx) {
211             return enter(ctx, pbl);
212         }
213     }
214     pbl->mutex = NULL;
215     pbl->leave = NULL;
216     return APR_SUCCESS;
217 }
218
219 static void leave_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl)
220 {
221     if (pbl->leave) {
222         pbl->leave(pbl->leave_ctx, pbl->mutex);
223     }
224 }
225
226 static apr_off_t calc_buffered(h2_bucket_beam *beam)
227 {
228     apr_off_t len = 0;
229     apr_bucket *b;
230     for (b = H2_BLIST_FIRST(&beam->red); 
231          b != H2_BLIST_SENTINEL(&beam->red);
232          b = APR_BUCKET_NEXT(b)) {
233         if (b->length == ((apr_size_t)-1)) {
234             /* do not count */
235         }
236         else if (APR_BUCKET_IS_FILE(b)) {
237             /* if unread, has no real mem footprint. how to test? */
238         }
239         else {
240             len += b->length;
241         }
242     }
243     return len;
244 }
245
246 static void r_purge_reds(h2_bucket_beam *beam)
247 {
248     apr_bucket *bred;
249     /* delete all red buckets in purge brigade, needs to be called
250      * from red thread only */
251     while (!H2_BLIST_EMPTY(&beam->purge)) {
252         bred = H2_BLIST_FIRST(&beam->purge);
253         apr_bucket_delete(bred);
254     }
255 }
256
257 static apr_size_t calc_space_left(h2_bucket_beam *beam)
258 {
259     if (beam->max_buf_size > 0) {
260         apr_off_t len = calc_buffered(beam);
261         return (beam->max_buf_size > len? (beam->max_buf_size - len) : 0);
262     }
263     return APR_SIZE_MAX;
264 }
265
266 static apr_status_t wait_cond(h2_bucket_beam *beam, apr_thread_mutex_t *lock)
267 {
268     if (beam->timeout > 0) {
269         return apr_thread_cond_timedwait(beam->m_cond, lock, beam->timeout);
270     }
271     else {
272         return apr_thread_cond_wait(beam->m_cond, lock);
273     }
274 }
275
276 static apr_status_t r_wait_space(h2_bucket_beam *beam, apr_read_type_e block,
277                                  h2_beam_lock *pbl, apr_off_t *premain) 
278 {
279     *premain = calc_space_left(beam);
280     while (!beam->aborted && *premain <= 0 
281            && (block == APR_BLOCK_READ) && pbl->mutex) {
282         apr_status_t status = wait_cond(beam, pbl->mutex);
283         if (APR_STATUS_IS_TIMEUP(status)) {
284             return status;
285         }
286         r_purge_reds(beam);
287         *premain = calc_space_left(beam);
288     }
289     return beam->aborted? APR_ECONNABORTED : APR_SUCCESS;
290 }
291
292 static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy)
293 {
294     h2_beam_lock bl;
295     apr_bucket *b, *next;
296
297     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
298         /* even when beam buckets are split, only the one where
299          * refcount drops to 0 will call us */
300         H2_BPROXY_REMOVE(proxy);
301         /* invoked from green thread, the last beam bucket for the red
302          * bucket bred is about to be destroyed.
303          * remove it from the hold, where it should be now */
304         if (proxy->bred) {
305             for (b = H2_BLIST_FIRST(&beam->hold); 
306                  b != H2_BLIST_SENTINEL(&beam->hold);
307                  b = APR_BUCKET_NEXT(b)) {
308                  if (b == proxy->bred) {
309                     break;
310                  }
311             }
312             if (b != H2_BLIST_SENTINEL(&beam->hold)) {
313                 /* bucket is in hold as it should be, mark this one
314                  * and all before it for purging. We might have placed meta
315                  * buckets without a green proxy into the hold before it 
316                  * and schedule them for purging now */
317                 for (b = H2_BLIST_FIRST(&beam->hold); 
318                      b != H2_BLIST_SENTINEL(&beam->hold);
319                      b = next) {
320                     next = APR_BUCKET_NEXT(b);
321                     if (b == proxy->bred) {
322                         APR_BUCKET_REMOVE(b);
323                         H2_BLIST_INSERT_TAIL(&beam->purge, b);
324                         break;
325                     }
326                     else if (APR_BUCKET_IS_METADATA(b)) {
327                         APR_BUCKET_REMOVE(b);
328                         H2_BLIST_INSERT_TAIL(&beam->purge, b);
329                     }
330                     else {
331                         /* another data bucket before this one in hold. this
332                          * is normal since DATA buckets need not be destroyed
333                          * in order */
334                     }
335                 }
336                 
337                 proxy->bred = NULL;
338             }
339             else {
340                 /* it should be there unless we screwed up */
341                 ap_log_perror(APLOG_MARK, APLOG_WARNING, 0, beam->red_pool, 
342                               APLOGNO(03384) "h2_beam(%d-%s): emitted bucket not "
343                               "in hold, n=%d", beam->id, beam->tag, 
344                               (int)proxy->n);
345                 AP_DEBUG_ASSERT(!proxy->bred);
346             }
347         }
348         /* notify anyone waiting on space to become available */
349         if (!bl.mutex) {
350             r_purge_reds(beam);
351         }
352         else if (beam->m_cond) {
353             apr_thread_cond_broadcast(beam->m_cond);
354         }
355         leave_yellow(beam, &bl);
356     }
357 }
358
359 static void report_consumption(h2_bucket_beam *beam, int force)
360 {
361     if (force || beam->received_bytes != beam->reported_consumed_bytes) {
362         if (beam->consumed_fn) { 
363             beam->consumed_fn(beam->consumed_ctx, beam, beam->received_bytes
364                               - beam->reported_consumed_bytes);
365         }
366         beam->reported_consumed_bytes = beam->received_bytes;
367     }
368 }
369
370 static void report_production(h2_bucket_beam *beam, int force)
371 {
372     if (force || beam->sent_bytes != beam->reported_produced_bytes) {
373         if (beam->produced_fn) { 
374             beam->produced_fn(beam->produced_ctx, beam, beam->sent_bytes
375                               - beam->reported_produced_bytes);
376         }
377         beam->reported_produced_bytes = beam->sent_bytes;
378     }
379 }
380
381 static void h2_blist_cleanup(h2_blist *bl)
382 {
383     apr_bucket *e;
384
385     while (!H2_BLIST_EMPTY(bl)) {
386         e = H2_BLIST_FIRST(bl);
387         apr_bucket_delete(e);
388     }
389 }
390
391 static apr_status_t beam_close(h2_bucket_beam *beam)
392 {
393     if (!beam->closed) {
394         beam->closed = 1;
395         if (beam->m_cond) {
396             apr_thread_cond_broadcast(beam->m_cond);
397         }
398     }
399     return APR_SUCCESS;
400 }
401
402 static apr_status_t beam_cleanup(void *data)
403 {
404     h2_bucket_beam *beam = data;
405     
406     beam_close(beam);
407     r_purge_reds(beam);
408     h2_blist_cleanup(&beam->red);
409     report_consumption(beam, 0);
410     while (!H2_BPROXY_LIST_EMPTY(&beam->proxies)) {
411         h2_beam_proxy *proxy = H2_BPROXY_LIST_FIRST(&beam->proxies);
412         H2_BPROXY_REMOVE(proxy);
413         proxy->beam = NULL;
414         proxy->bred = NULL;
415     }
416     h2_blist_cleanup(&beam->purge);
417     h2_blist_cleanup(&beam->hold);
418     
419     return APR_SUCCESS;
420 }
421
422 apr_status_t h2_beam_destroy(h2_bucket_beam *beam)
423 {
424     apr_pool_cleanup_kill(beam->red_pool, beam, beam_cleanup);
425     return beam_cleanup(beam);
426 }
427
428 apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *red_pool, 
429                             int id, const char *tag, 
430                             apr_size_t max_buf_size)
431 {
432     h2_bucket_beam *beam;
433     apr_status_t status = APR_SUCCESS;
434     
435     beam = apr_pcalloc(red_pool, sizeof(*beam));
436     if (!beam) {
437         return APR_ENOMEM;
438     }
439
440     beam->id = id;
441     beam->tag = tag;
442     H2_BLIST_INIT(&beam->red);
443     H2_BLIST_INIT(&beam->hold);
444     H2_BLIST_INIT(&beam->purge);
445     H2_BPROXY_LIST_INIT(&beam->proxies);
446     beam->red_pool = red_pool;
447     beam->max_buf_size = max_buf_size;
448
449     apr_pool_pre_cleanup_register(red_pool, beam, beam_cleanup);
450     *pbeam = beam;
451     
452     return status;
453 }
454
455 void h2_beam_buffer_size_set(h2_bucket_beam *beam, apr_size_t buffer_size)
456 {
457     h2_beam_lock bl;
458     
459     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
460         beam->max_buf_size = buffer_size;
461         leave_yellow(beam, &bl);
462     }
463 }
464
465 apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam)
466 {
467     h2_beam_lock bl;
468     apr_size_t buffer_size = 0;
469     
470     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
471         buffer_size = beam->max_buf_size;
472         leave_yellow(beam, &bl);
473     }
474     return buffer_size;
475 }
476
477 void h2_beam_mutex_set(h2_bucket_beam *beam, 
478                        h2_beam_mutex_enter m_enter,
479                        apr_thread_cond_t *cond,
480                        void *m_ctx)
481 {
482     h2_beam_lock bl;
483     
484     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
485         beam->m_enter = m_enter;
486         beam->m_ctx   = m_ctx;
487         beam->m_cond  = cond;
488         leave_yellow(beam, &bl);
489     }
490 }
491
492 void h2_beam_timeout_set(h2_bucket_beam *beam, apr_interval_time_t timeout)
493 {
494     h2_beam_lock bl;
495     
496     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
497         beam->timeout = timeout;
498         leave_yellow(beam, &bl);
499     }
500 }
501
502 apr_interval_time_t h2_beam_timeout_get(h2_bucket_beam *beam)
503 {
504     h2_beam_lock bl;
505     apr_interval_time_t timeout = 0;
506     
507     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
508         timeout = beam->timeout;
509         leave_yellow(beam, &bl);
510     }
511     return timeout;
512 }
513
514 void h2_beam_abort(h2_bucket_beam *beam)
515 {
516     h2_beam_lock bl;
517     
518     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
519         r_purge_reds(beam);
520         h2_blist_cleanup(&beam->red);
521         beam->aborted = 1;
522         report_consumption(beam, 0);
523         if (beam->m_cond) {
524             apr_thread_cond_broadcast(beam->m_cond);
525         }
526         leave_yellow(beam, &bl);
527     }
528 }
529
530 apr_status_t h2_beam_close(h2_bucket_beam *beam)
531 {
532     h2_beam_lock bl;
533     
534     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
535         r_purge_reds(beam);
536         beam_close(beam);
537         report_consumption(beam, 0);
538         leave_yellow(beam, &bl);
539     }
540     return beam->aborted? APR_ECONNABORTED : APR_SUCCESS;
541 }
542
543 apr_status_t h2_beam_shutdown(h2_bucket_beam *beam, apr_read_type_e block,
544                               int clear_buffers)
545 {
546     apr_status_t status;
547     h2_beam_lock bl;
548     
549     if ((status = enter_yellow(beam, &bl)) == APR_SUCCESS) {
550         if (clear_buffers) {
551             r_purge_reds(beam);
552             h2_blist_cleanup(&beam->red);
553             if (!bl.mutex && beam->green) {
554                 /* not protected, may process green in red call */
555                 apr_brigade_destroy(beam->green);
556                 beam->green = NULL;
557             }
558         }
559         beam_close(beam);
560         
561         while (status == APR_SUCCESS 
562                && (!H2_BPROXY_LIST_EMPTY(&beam->proxies)
563                    || (beam->green && !APR_BRIGADE_EMPTY(beam->green)))) {
564             if (block == APR_NONBLOCK_READ || !bl.mutex) {
565                 status = APR_EAGAIN;
566                 break;
567             }
568             if (beam->m_cond) {
569                 apr_thread_cond_broadcast(beam->m_cond);
570             }
571             status = wait_cond(beam, bl.mutex);
572         }
573         leave_yellow(beam, &bl);
574     }
575     return status;
576 }
577
578 static apr_status_t append_bucket(h2_bucket_beam *beam, 
579                                   apr_bucket *bred,
580                                   apr_read_type_e block,
581                                   apr_pool_t *pool,
582                                   h2_beam_lock *pbl)
583 {
584     const char *data;
585     apr_size_t len;
586     apr_off_t space_left = 0;
587     apr_status_t status;
588     
589     if (APR_BUCKET_IS_METADATA(bred)) {
590         if (APR_BUCKET_IS_EOS(bred)) {
591             beam->closed = 1;
592         }
593         APR_BUCKET_REMOVE(bred);
594         H2_BLIST_INSERT_TAIL(&beam->red, bred);
595         return APR_SUCCESS;
596     }
597     else if (APR_BUCKET_IS_FILE(bred)) {
598         /* file bucket lengths do not really count */
599     }
600     else {
601         space_left = calc_space_left(beam);
602         if (space_left > 0 && bred->length == ((apr_size_t)-1)) {
603             const char *data;
604             status = apr_bucket_read(bred, &data, &len, APR_BLOCK_READ);
605             if (status != APR_SUCCESS) {
606                 return status;
607             }
608         }
609         
610         if (space_left < bred->length) {
611             status = r_wait_space(beam, block, pbl, &space_left);
612             if (status != APR_SUCCESS) {
613                 return status;
614             }
615             if (space_left <= 0) {
616                 return APR_EAGAIN;
617             }
618         }
619         /* space available, maybe need bucket split */
620     }
621     
622
623     /* The fundamental problem is that reading a red bucket from
624      * a green thread is a total NO GO, because the bucket might use
625      * its pool/bucket_alloc from a foreign thread and that will
626      * corrupt. */
627     status = APR_ENOTIMPL;
628     if (beam->closed && bred->length > 0) {
629         status = APR_EOF;
630     }
631     else if (APR_BUCKET_IS_TRANSIENT(bred)) {
632         /* this takes care of transient buckets and converts them
633          * into heap ones. Other bucket types might or might not be
634          * affected by this. */
635         status = apr_bucket_setaside(bred, pool);
636     }
637     else if (APR_BUCKET_IS_HEAP(bred)) {
638         /* For heap buckets read from a green thread is fine. The
639          * data will be there and live until the bucket itself is
640          * destroyed. */
641         status = APR_SUCCESS;
642     }
643     else if (APR_BUCKET_IS_POOL(bred)) {
644         /* pool buckets are bastards that register at pool cleanup
645          * to morph themselves into heap buckets. That may happen anytime,
646          * even after the bucket data pointer has been read. So at
647          * any time inside the green thread, the pool bucket memory
648          * may disappear. yikes. */
649         status = apr_bucket_read(bred, &data, &len, APR_BLOCK_READ);
650         if (status == APR_SUCCESS) {
651             apr_bucket_heap_make(bred, data, len, NULL);
652         }
653     }
654     else if (APR_BUCKET_IS_FILE(bred)) {
655         /* For file buckets the problem is their internal readpool that
656          * is used on the first read to allocate buffer/mmap.
657          * Since setting aside a file bucket will de-register the
658          * file cleanup function from the previous pool, we need to
659          * call that from a red thread. 
660          * Additionally, we allow callbacks to prevent beaming file
661          * handles across. The use case for this is to limit the number 
662          * of open file handles and rather use a less efficient beam
663          * transport. */
664         apr_file_t *fd = ((apr_bucket_file *)bred->data)->fd;
665         int can_beam = 1;
666         if (beam->last_beamed != fd && beam->can_beam_fn) {
667             can_beam = beam->can_beam_fn(beam->can_beam_ctx, beam, fd);
668         }
669         if (can_beam) {
670             beam->last_beamed = fd;
671             status = apr_bucket_setaside(bred, pool);
672         }
673         /* else: enter ENOTIMPL case below */
674     }
675     
676     if (status == APR_ENOTIMPL) {
677         /* we have no knowledge about the internals of this bucket,
678          * but hope that after read, its data stays immutable for the
679          * lifetime of the bucket. (see pool bucket handling above for
680          * a counter example).
681          * We do the read while in a red thread, so that the bucket may
682          * use pools/allocators safely. */
683         if (space_left < APR_BUCKET_BUFF_SIZE) {
684             space_left = APR_BUCKET_BUFF_SIZE;
685         }
686         if (space_left < bred->length) {
687             apr_bucket_split(bred, space_left);
688         }
689         status = apr_bucket_read(bred, &data, &len, APR_BLOCK_READ);
690         if (status == APR_SUCCESS) {
691             status = apr_bucket_setaside(bred, pool);
692         }
693     }
694     
695     if (status != APR_SUCCESS && status != APR_ENOTIMPL) {
696         return status;
697     }
698     
699     APR_BUCKET_REMOVE(bred);
700     H2_BLIST_INSERT_TAIL(&beam->red, bred);
701     beam->sent_bytes += bred->length;
702     
703     return APR_SUCCESS;
704 }
705
706 apr_status_t h2_beam_send(h2_bucket_beam *beam, 
707                           apr_bucket_brigade *red_brigade, 
708                           apr_read_type_e block)
709 {
710     apr_bucket *bred;
711     apr_status_t status = APR_SUCCESS;
712     h2_beam_lock bl;
713
714     /* Called from the red thread to add buckets to the beam */
715     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
716         r_purge_reds(beam);
717         
718         if (beam->aborted) {
719             status = APR_ECONNABORTED;
720         }
721         else if (red_brigade) {
722             int force_report = !APR_BRIGADE_EMPTY(red_brigade); 
723             while (!APR_BRIGADE_EMPTY(red_brigade)
724                    && status == APR_SUCCESS) {
725                 bred = APR_BRIGADE_FIRST(red_brigade);
726                 status = append_bucket(beam, bred, block, beam->red_pool, &bl);
727             }
728             report_production(beam, force_report);
729             if (beam->m_cond) {
730                 apr_thread_cond_broadcast(beam->m_cond);
731             }
732         }
733         report_consumption(beam, 0);
734         leave_yellow(beam, &bl);
735     }
736     return status;
737 }
738
739 apr_status_t h2_beam_receive(h2_bucket_beam *beam, 
740                              apr_bucket_brigade *bb, 
741                              apr_read_type_e block,
742                              apr_off_t readbytes)
743 {
744     h2_beam_lock bl;
745     apr_bucket *bred, *bgreen, *ng;
746     int transferred = 0;
747     apr_status_t status = APR_SUCCESS;
748     apr_off_t remain = readbytes;
749     
750     /* Called from the green thread to take buckets from the beam */
751     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
752 transfer:
753         if (beam->aborted) {
754             if (beam->green && !APR_BRIGADE_EMPTY(beam->green)) {
755                 apr_brigade_cleanup(beam->green);
756             }
757             status = APR_ECONNABORTED;
758             goto leave;
759         }
760
761         /* transfer enough buckets from our green brigade, if we have one */
762         while (beam->green
763                && !APR_BRIGADE_EMPTY(beam->green)
764                && (readbytes <= 0 || remain >= 0)) {
765             bgreen = APR_BRIGADE_FIRST(beam->green);
766             if (readbytes > 0 && bgreen->length > 0 && remain <= 0) {
767                 break;
768             }            
769             APR_BUCKET_REMOVE(bgreen);
770             APR_BRIGADE_INSERT_TAIL(bb, bgreen);
771             remain -= bgreen->length;
772             ++transferred;
773         }
774
775         /* transfer from our red brigade, transforming red buckets to
776          * green ones until we have enough */
777         while (!H2_BLIST_EMPTY(&beam->red) && (readbytes <= 0 || remain >= 0)) {
778             bred = H2_BLIST_FIRST(&beam->red);
779             bgreen = NULL;
780             
781             if (readbytes > 0 && bred->length > 0 && remain <= 0) {
782                 break;
783             }
784                         
785             if (APR_BUCKET_IS_METADATA(bred)) {
786                 if (APR_BUCKET_IS_EOS(bred)) {
787                     bgreen = apr_bucket_eos_create(bb->bucket_alloc);
788                     beam->close_sent = 1;
789                 }
790                 else if (APR_BUCKET_IS_FLUSH(bred)) {
791                     bgreen = apr_bucket_flush_create(bb->bucket_alloc);
792                 }
793                 else {
794                     /* put red into hold, no green sent out */
795                 }
796             }
797             else if (APR_BUCKET_IS_FILE(bred)) {
798                 /* This is set aside into the target brigade pool so that 
799                  * any read operation messes with that pool and not 
800                  * the red one. */
801                 apr_bucket_file *f = (apr_bucket_file *)bred->data;
802                 apr_file_t *fd = f->fd;
803                 int setaside = (f->readpool != bb->p);
804                 
805                 if (setaside) {
806                     status = apr_file_setaside(&fd, fd, bb->p);
807                     if (status != APR_SUCCESS) {
808                         goto leave;
809                     }
810                     ++beam->files_beamed;
811                 }
812                 ng = apr_brigade_insert_file(bb, fd, bred->start, bred->length, 
813                                              bb->p);
814 #if APR_HAS_MMAP
815                 /* disable mmap handling as this leads to segfaults when
816                  * the underlying file is changed while memory pointer has
817                  * been handed out. See also PR 59348 */
818                 apr_bucket_file_enable_mmap(ng, 0);
819 #endif
820                 remain -= bred->length;
821                 ++transferred;
822                 APR_BUCKET_REMOVE(bred);
823                 H2_BLIST_INSERT_TAIL(&beam->hold, bred);
824                 ++transferred;
825                 continue;
826             }
827             else {
828                 /* create a "green" standin bucket. we took care about the
829                  * underlying red bucket and its data when we placed it into
830                  * the red brigade.
831                  * the beam bucket will notify us on destruction that bred is
832                  * no longer needed. */
833                 bgreen = h2_beam_bucket_create(beam, bred, bb->bucket_alloc,
834                                                beam->buckets_sent++);
835             }
836             
837             /* Place the red bucket into our hold, to be destroyed when no
838              * green bucket references it any more. */
839             APR_BUCKET_REMOVE(bred);
840             H2_BLIST_INSERT_TAIL(&beam->hold, bred);
841             beam->received_bytes += bred->length;
842             if (bgreen) {
843                 APR_BRIGADE_INSERT_TAIL(bb, bgreen);
844                 remain -= bgreen->length;
845                 ++transferred;
846             }
847         }
848
849         if (readbytes > 0 && remain < 0) {
850             /* too much, put some back */
851             remain = readbytes;
852             for (bgreen = APR_BRIGADE_FIRST(bb);
853                  bgreen != APR_BRIGADE_SENTINEL(bb);
854                  bgreen = APR_BUCKET_NEXT(bgreen)) {
855                  remain -= bgreen->length;
856                  if (remain < 0) {
857                      apr_bucket_split(bgreen, bgreen->length+remain);
858                      beam->green = apr_brigade_split_ex(bb, 
859                                                         APR_BUCKET_NEXT(bgreen), 
860                                                         beam->green);
861                      break;
862                  }
863             }
864         }
865
866         if (beam->closed 
867             && (!beam->green || APR_BRIGADE_EMPTY(beam->green))
868             && H2_BLIST_EMPTY(&beam->red)) {
869             /* beam is closed and we have nothing more to receive */ 
870             if (!beam->close_sent) {
871                 apr_bucket *b = apr_bucket_eos_create(bb->bucket_alloc);
872                 APR_BRIGADE_INSERT_TAIL(bb, b);
873                 beam->close_sent = 1;
874                 ++transferred;
875                 status = APR_SUCCESS;
876             }
877         }
878         
879         if (transferred) {
880             status = APR_SUCCESS;
881         }
882         else if (beam->closed) {
883             status = APR_EOF;
884         }
885         else if (block == APR_BLOCK_READ && bl.mutex && beam->m_cond) {
886             status = wait_cond(beam, bl.mutex);
887             if (status != APR_SUCCESS) {
888                 goto leave;
889             }
890             goto transfer;
891         }
892         else {
893             status = APR_EAGAIN;
894         }
895 leave:        
896         leave_yellow(beam, &bl);
897     }
898     return status;
899 }
900
901 void h2_beam_on_consumed(h2_bucket_beam *beam, 
902                          h2_beam_io_callback *cb, void *ctx)
903 {
904     h2_beam_lock bl;
905     
906     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
907         beam->consumed_fn = cb;
908         beam->consumed_ctx = ctx;
909         leave_yellow(beam, &bl);
910     }
911 }
912
913 void h2_beam_on_produced(h2_bucket_beam *beam, 
914                          h2_beam_io_callback *cb, void *ctx)
915 {
916     h2_beam_lock bl;
917     
918     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
919         beam->produced_fn = cb;
920         beam->produced_ctx = ctx;
921         leave_yellow(beam, &bl);
922     }
923 }
924
925 void h2_beam_on_file_beam(h2_bucket_beam *beam, 
926                           h2_beam_can_beam_callback *cb, void *ctx)
927 {
928     h2_beam_lock bl;
929     
930     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
931         beam->can_beam_fn = cb;
932         beam->can_beam_ctx = ctx;
933         leave_yellow(beam, &bl);
934     }
935 }
936
937
938 apr_off_t h2_beam_get_buffered(h2_bucket_beam *beam)
939 {
940     apr_bucket *b;
941     apr_off_t l = 0;
942     h2_beam_lock bl;
943     
944     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
945         for (b = H2_BLIST_FIRST(&beam->red); 
946             b != H2_BLIST_SENTINEL(&beam->red);
947             b = APR_BUCKET_NEXT(b)) {
948             /* should all have determinate length */
949             l += b->length;
950         }
951         leave_yellow(beam, &bl);
952     }
953     return l;
954 }
955
956 apr_off_t h2_beam_get_mem_used(h2_bucket_beam *beam)
957 {
958     apr_bucket *b;
959     apr_off_t l = 0;
960     h2_beam_lock bl;
961     
962     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
963         for (b = H2_BLIST_FIRST(&beam->red); 
964             b != H2_BLIST_SENTINEL(&beam->red);
965             b = APR_BUCKET_NEXT(b)) {
966             if (APR_BUCKET_IS_FILE(b)) {
967                 /* do not count */
968             }
969             else {
970                 /* should all have determinate length */
971                 l += b->length;
972             }
973         }
974         leave_yellow(beam, &bl);
975     }
976     return l;
977 }
978
979 int h2_beam_empty(h2_bucket_beam *beam)
980 {
981     int empty = 1;
982     h2_beam_lock bl;
983     
984     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
985         empty = (H2_BLIST_EMPTY(&beam->red) 
986                  && (!beam->green || APR_BRIGADE_EMPTY(beam->green)));
987         leave_yellow(beam, &bl);
988     }
989     return empty;
990 }
991
992 int h2_beam_holds_proxies(h2_bucket_beam *beam)
993 {
994     int has_proxies = 1;
995     h2_beam_lock bl;
996     
997     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
998         has_proxies = !H2_BPROXY_LIST_EMPTY(&beam->proxies);
999         leave_yellow(beam, &bl);
1000     }
1001     return has_proxies;
1002 }
1003
1004 int h2_beam_closed(h2_bucket_beam *beam)
1005 {
1006     return beam->closed;
1007 }
1008
1009 int h2_beam_was_received(h2_bucket_beam *beam)
1010 {
1011     int happend = 0;
1012     h2_beam_lock bl;
1013     
1014     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
1015         happend = (beam->received_bytes > 0);
1016         leave_yellow(beam, &bl);
1017     }
1018     return happend;
1019 }
1020
1021 apr_size_t h2_beam_get_files_beamed(h2_bucket_beam *beam)
1022 {
1023     apr_size_t n = 0;
1024     h2_beam_lock bl;
1025     
1026     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
1027         n = beam->files_beamed;
1028         leave_yellow(beam, &bl);
1029     }
1030     return n;
1031 }
1032
1033 int h2_beam_no_files(void *ctx, h2_bucket_beam *beam, apr_file_t *file)
1034 {
1035     return 0;
1036 }
1037