]> granicus.if.org Git - apache/blob - modules/http2/h2_bucket_beam.h
f260762366e3a46fb54b6f6ac9e46009b1e019dd
[apache] / modules / http2 / h2_bucket_beam.h
1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2  * contributor license agreements.  See the NOTICE file distributed with
3  * this work for additional information regarding copyright ownership.
4  * The ASF licenses this file to You under the Apache License, Version 2.0
5  * (the "License"); you may not use this file except in compliance with
6  * the License.  You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #ifndef h2_bucket_beam_h
18 #define h2_bucket_beam_h
19
20 struct apr_thread_mutex_t;
21 struct apr_thread_cond_t;
22
23 /*******************************************************************************
24  * apr_bucket list without bells and whistles
25  ******************************************************************************/
26  
27 /**
28  * h2_blist can hold a list of buckets just like apr_bucket_brigade, but
29  * does not to any allocations or related features.
30  */
31 typedef struct {
32     APR_RING_HEAD(h2_bucket_list, apr_bucket) list;
33 } h2_blist;
34
35 #define H2_BLIST_INIT(b)        APR_RING_INIT(&(b)->list, apr_bucket, link);
36 #define H2_BLIST_SENTINEL(b)    APR_RING_SENTINEL(&(b)->list, apr_bucket, link)
37 #define H2_BLIST_EMPTY(b)       APR_RING_EMPTY(&(b)->list, apr_bucket, link)
38 #define H2_BLIST_FIRST(b)       APR_RING_FIRST(&(b)->list)
39 #define H2_BLIST_LAST(b)        APR_RING_LAST(&(b)->list)
40 #define H2_BLIST_INSERT_HEAD(b, e) do {                         \
41         apr_bucket *ap__b = (e);                                        \
42         APR_RING_INSERT_HEAD(&(b)->list, ap__b, apr_bucket, link);      \
43     } while (0)
44 #define H2_BLIST_INSERT_TAIL(b, e) do {                         \
45         apr_bucket *ap__b = (e);                                        \
46         APR_RING_INSERT_TAIL(&(b)->list, ap__b, apr_bucket, link);      \
47     } while (0)
48 #define H2_BLIST_CONCAT(a, b) do {                                      \
49         APR_RING_CONCAT(&(a)->list, &(b)->list, apr_bucket, link);      \
50     } while (0)
51 #define H2_BLIST_PREPEND(a, b) do {                                     \
52         APR_RING_PREPEND(&(a)->list, &(b)->list, apr_bucket, link);     \
53     } while (0)
54
55 /*******************************************************************************
56  * h2_bucket_beam
57  ******************************************************************************/
58
59 /**
60  * A h2_bucket_beam solves the task of transferring buckets, esp. their data,
61  * across threads with zero buffer copies.
62  *
63  * When a thread, let's call it the sender thread, wants to send buckets to
64  * another, the green thread, it creates a h2_bucket_beam and adds buckets
65  * via the h2_beam_send(). It gives the beam to the green thread which then
66  * can receive buckets into its own brigade via h2_beam_receive().
67  *
68  * Sending and receiving can happen concurrently.
69  *
70  * The beam can limit the amount of data it accepts via the buffer_size. This
71  * can also be adjusted during its lifetime. Sends and receives can be done blocking. 
72  * A timeout can be set for such blocks.
73  *
74  * Care needs to be taken when terminating the beam. The beam registers at
75  * the pool it was created with and will cleanup after itself. However, if
76  * received buckets do still exist, already freed memory might be accessed.
77  * The beam does a assertion on this condition.
78  * 
79  * The proper way of shutting down a beam is to first make sure there are no
80  * more green buckets out there, then cleanup the beam to purge eventually
81  * still existing sender buckets and then, possibly, terminate the beam itself
82  * (or the pool it was created with).
83  *
84  * The following restrictions apply to bucket transport:
85  * - only EOS and FLUSH meta buckets are copied through. All other meta buckets
86  *   are kept in the beams hold.
87  * - all kind of data buckets are transported through:
88  *   - transient buckets are converted to heap ones on send
89  *   - heap and pool buckets require no extra handling
90  *   - buckets with indeterminate length are read on send
91  *   - file buckets will transfer the file itself into a new bucket, if allowed
92  *   - all other buckets are read on send to make sure data is present
93  *
94  * This assures that when the sender thread sends its sender buckets, the data
95  * is made accessible while still on the sender side. The sender bucket then enters
96  * the beams hold storage.
97  * When the green thread calls receive, sender buckets in the hold are wrapped
98  * into special beam buckets. Beam buckets on read present the data directly
99  * from the internal sender one, but otherwise live on the green side. When a
100  * beam bucket gets destroyed, it notifies its beam that the corresponding
101  * sender bucket from the hold may be destroyed.
102  * Since the destruction of green buckets happens in the green thread, any
103  * corresponding sender bucket can not immediately be destroyed, as that would
104  * result in race conditions.
105  * Instead, the beam transfers such sender buckets from the hold to the purge
106  * storage. Next time there is a call from the sender side, the buckets in
107  * purge will be deleted.
108  *
109  * There are callbacks that can be registesender with a beam:
110  * - a "consumed" callback that gets called on the sender side with the
111  *   amount of data that has been received by the green side. The amount
112  *   is a delta from the last callback invocation. The sender side can trigger
113  *   these callbacks by calling h2_beam_send() with a NULL brigade.
114  * - a "can_beam_file" callback that can prohibit the transfer of file handles
115  *   through the beam. This will cause file buckets to be read on send and
116  *   its data buffer will then be transports just like a heap bucket would.
117  *   When no callback is registered, no restrictions apply and all files are
118  *   passed through.
119  *   File handles transfersender to the green side will stay there until the
120  *   receiving brigade's pool is destroyed/cleared. If the pool lives very
121  *   long or if many different files are beamed, the process might run out
122  *   of available file handles.
123  *
124  * The name "beam" of course is inspired by good old transporter
125  * technology where humans are kept inside the transporter's memory
126  * buffers until the transmission is complete. Star gates use a similar trick.
127  */
128
129 typedef void h2_beam_mutex_leave(void *ctx,  struct apr_thread_mutex_t *lock);
130
131 typedef struct {
132     apr_thread_mutex_t *mutex;
133     h2_beam_mutex_leave *leave;
134     void *leave_ctx;
135 } h2_beam_lock;
136
137 typedef struct h2_bucket_beam h2_bucket_beam;
138
139 typedef apr_status_t h2_beam_mutex_enter(void *ctx, h2_beam_lock *pbl);
140
141 typedef void h2_beam_io_callback(void *ctx, h2_bucket_beam *beam,
142                                  apr_off_t bytes);
143 typedef void h2_beam_ev_callback(void *ctx, h2_bucket_beam *beam);
144
145 typedef struct h2_beam_proxy h2_beam_proxy;
146 typedef struct {
147     APR_RING_HEAD(h2_beam_proxy_list, h2_beam_proxy) list;
148 } h2_bproxy_list;
149
150 typedef int h2_beam_can_beam_callback(void *ctx, h2_bucket_beam *beam,
151                                       apr_file_t *file);
152
153 typedef enum {
154     H2_BEAM_OWNER_SEND,
155     H2_BEAM_OWNER_RECV
156 } h2_beam_owner_t;
157
158 /**
159  * Will deny all transfer of apr_file_t across the beam and force
160  * a data copy instead.
161  */
162 int h2_beam_no_files(void *ctx, h2_bucket_beam *beam, apr_file_t *file);
163
164 struct h2_bucket_beam {
165     int id;
166     const char *tag;
167     apr_pool_t *pool;
168     h2_beam_owner_t owner;
169     h2_blist send_list;
170     h2_blist hold_list;
171     h2_blist purge_list;
172     apr_bucket_brigade *recv_buffer;
173     h2_bproxy_list proxies;
174     apr_pool_t *send_pool;
175     apr_pool_t *recv_pool;
176     
177     apr_size_t max_buf_size;
178     apr_interval_time_t timeout;
179
180     apr_off_t sent_bytes;     /* amount of bytes send */
181     apr_off_t received_bytes; /* amount of bytes received */
182
183     apr_size_t buckets_sent;  /* # of beam buckets sent */
184     apr_size_t files_beamed;  /* how many file handles have been set aside */
185     
186     unsigned int aborted : 1;
187     unsigned int closed : 1;
188     unsigned int close_sent : 1;
189     unsigned int tx_mem_limits : 1; /* only memory size counts on transfers */
190
191     struct apr_thread_mutex_t *lock;
192     struct apr_thread_cond_t *change;
193     
194     apr_off_t cons_bytes_reported;    /* amount of bytes reported as consumed */
195     h2_beam_ev_callback *cons_ev_cb;
196     h2_beam_io_callback *cons_io_cb;
197     void *cons_ctx;
198
199     apr_off_t prod_bytes_reported;    /* amount of bytes reported as produced */
200     h2_beam_io_callback *prod_io_cb;
201     void *prod_ctx;
202
203     h2_beam_can_beam_callback *can_beam_fn;
204     void *can_beam_ctx;
205 };
206
207 /**
208  * Creates a new bucket beam for transfer of buckets across threads.
209  *
210  * The pool the beam is created with will be protected by the given 
211  * mutex and will be used in multiple threads. It needs a pool allocator
212  * that is only used inside that same mutex.
213  *
214  * @param pbeam         will hold the created beam on return
215  * @param pool          pool owning the beam, beam will cleanup when pool released
216  * @param id            identifier of the beam
217  * @param tag           tag identifying beam for logging
218  * @param owner         if the beam is owned by the sender or receiver, e.g. if
219  *                      the pool owner is using this beam for sending or receiving
220  * @param buffer_size   maximum memory footprint of buckets buffered in beam, or
221  *                      0 for no limitation
222  * @param timeout       timeout for blocking operations
223  */
224 apr_status_t h2_beam_create(h2_bucket_beam **pbeam,
225                             apr_pool_t *pool, 
226                             int id, const char *tag,
227                             h2_beam_owner_t owner,  
228                             apr_size_t buffer_size,
229                             apr_interval_time_t timeout);
230
231 /**
232  * Destroys the beam immediately without cleanup.
233  */ 
234 apr_status_t h2_beam_destroy(h2_bucket_beam *beam);
235
236 /**
237  * Send buckets from the given brigade through the beam. Will hold buckets 
238  * internally as long as they have not been processed by the receiving side.
239  * All accepted buckets are removed from the given brigade. Will return with
240  * APR_EAGAIN on non-blocking sends when not all buckets could be accepted.
241  * 
242  * Call from the sender side only.
243  */
244 apr_status_t h2_beam_send(h2_bucket_beam *beam,  
245                           apr_bucket_brigade *bb, 
246                           apr_read_type_e block);
247
248 /**
249  * Register the pool from which future buckets are send. This defines
250  * the lifetime of the buckets, e.g. the pool should not be cleared/destroyed
251  * until the data is no longer needed (or has been received).
252  */
253 void h2_beam_send_from(h2_bucket_beam *beam, apr_pool_t *p);
254
255 /**
256  * Receive buckets from the beam into the given brigade. Will return APR_EOF
257  * when reading past an EOS bucket. Reads can be blocking until data is 
258  * available or the beam has been closed. Non-blocking calls return APR_EAGAIN
259  * if no data is available.
260  *
261  * Call from the receiver side only.
262  */
263 apr_status_t h2_beam_receive(h2_bucket_beam *beam, 
264                              apr_bucket_brigade *green_buckets, 
265                              apr_read_type_e block,
266                              apr_off_t readbytes);
267
268 /**
269  * Determine if beam is empty. 
270  */
271 int h2_beam_empty(h2_bucket_beam *beam);
272
273 /**
274  * Determine if beam has handed out proxy buckets that are not destroyed. 
275  */
276 int h2_beam_holds_proxies(h2_bucket_beam *beam);
277
278 /**
279  * Abort the beam. Will cleanup any buffered buckets and answer all send
280  * and receives with APR_ECONNABORTED.
281  * 
282  * Call from the sender side only.
283  */
284 void h2_beam_abort(h2_bucket_beam *beam);
285
286 /**
287  * Close the beam. Sending an EOS bucket serves the same purpose. 
288  * 
289  * Call from the sender side only.
290  */
291 apr_status_t h2_beam_close(h2_bucket_beam *beam);
292
293 /**
294  * Receives leaves the beam, e.g. will no longer read. This will
295  * interrupt any sender blocked writing and fail future send. 
296  * 
297  * Call from the receiver side only.
298  */
299 apr_status_t h2_beam_leave(h2_bucket_beam *beam);
300
301 int h2_beam_is_closed(h2_bucket_beam *beam);
302
303 /**
304  * Return APR_SUCCESS when all buckets in transit have been handled. 
305  * When called with APR_BLOCK_READ and a mutex set, will wait until the green
306  * side has consumed all data. Otherwise APR_EAGAIN is returned.
307  * With clear_buffers set, any queued data is discarded.
308  * If a timeout is set on the beam, waiting might also time out and
309  * return APR_ETIMEUP.
310  *
311  * Call from the sender side only.
312  */
313 apr_status_t h2_beam_wait_empty(h2_bucket_beam *beam, apr_read_type_e block);
314
315 /** 
316  * Set/get the timeout for blocking read/write operations. Only works
317  * if a mutex has been set for the beam.
318  */
319 void h2_beam_timeout_set(h2_bucket_beam *beam, 
320                          apr_interval_time_t timeout);
321 apr_interval_time_t h2_beam_timeout_get(h2_bucket_beam *beam);
322
323 /**
324  * Set/get the maximum buffer size for beam data (memory footprint).
325  */
326 void h2_beam_buffer_size_set(h2_bucket_beam *beam, 
327                              apr_size_t buffer_size);
328 apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam);
329
330 /**
331  * Register a callback to be invoked on the sender side with the
332  * amount of bytes that have been consumed by the receiver, since the
333  * last callback invocation or reset.
334  * @param beam the beam to set the callback on
335  * @param ev_cb the callback or NULL, called when bytes are consumed
336  * @param io_cb the callback or NULL, called on sender with bytes consumed
337  * @param ctx  the context to use in callback invocation
338  * 
339  * Call from the sender side, io callbacks invoked on sender side, ev callback
340  * from any side.
341  */
342 void h2_beam_on_consumed(h2_bucket_beam *beam, 
343                          h2_beam_ev_callback *ev_cb,
344                          h2_beam_io_callback *io_cb, void *ctx);
345
346 /**
347  * Call any registered consumed handler, if any changes have happened
348  * since the last invocation. 
349  * @return !=0 iff a handler has been called
350  *
351  * Needs to be invoked from the sending side.
352  */
353 int h2_beam_report_consumption(h2_bucket_beam *beam);
354
355 /**
356  * Register a callback to be invoked on the receiver side with the
357  * amount of bytes that have been produces by the sender, since the
358  * last callback invocation or reset.
359  * @param beam the beam to set the callback on
360  * @param io_cb the callback or NULL, called on receiver with bytes produced
361  * @param ctx  the context to use in callback invocation
362  * 
363  * Call from the receiver side, callbacks invoked on either side.
364  */
365 void h2_beam_on_produced(h2_bucket_beam *beam, 
366                          h2_beam_io_callback *io_cb, void *ctx);
367
368 /**
369  * Register a callback that may prevent a file from being beam as
370  * file handle, forcing the file content to be copied. Then no callback
371  * is set (NULL), file handles are transferred directly.
372  * @param beam the beam to set the callback on
373  * @param io_cb the callback or NULL, called on receiver with bytes produced
374  * @param ctx  the context to use in callback invocation
375  * 
376  * Call from the receiver side, callbacks invoked on either side.
377  */
378 void h2_beam_on_file_beam(h2_bucket_beam *beam, 
379                           h2_beam_can_beam_callback *cb, void *ctx);
380
381 /**
382  * Get the amount of bytes currently buffered in the beam (unread).
383  */
384 apr_off_t h2_beam_get_buffered(h2_bucket_beam *beam);
385
386 /**
387  * Get the memory used by the buffered buckets, approximately.
388  */
389 apr_off_t h2_beam_get_mem_used(h2_bucket_beam *beam);
390
391 /**
392  * Return != 0 iff (some) data from the beam has been received.
393  */
394 int h2_beam_was_received(h2_bucket_beam *beam);
395
396 apr_size_t h2_beam_get_files_beamed(h2_bucket_beam *beam);
397
398 typedef apr_bucket *h2_bucket_beamer(h2_bucket_beam *beam, 
399                                      apr_bucket_brigade *dest,
400                                      const apr_bucket *src);
401
402 void h2_register_bucket_beamer(h2_bucket_beamer *beamer);
403
404 void h2_beam_log(h2_bucket_beam *beam, conn_rec *c, int level, const char *msg);
405
406 #endif /* h2_bucket_beam_h */