]> granicus.if.org Git - apache/blob - modules/http2/h2_stream.c
On the 2.4.x branch:
[apache] / modules / http2 / h2_stream.c
1 /* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <assert.h>
17 #include <stddef.h>
18
19 #include <apr_strings.h>
20
21 #include <httpd.h>
22 #include <http_core.h>
23 #include <http_connection.h>
24 #include <http_log.h>
25
26 #include <nghttp2/nghttp2.h>
27
28 #include "h2_private.h"
29 #include "h2.h"
30 #include "h2_bucket_beam.h"
31 #include "h2_conn.h"
32 #include "h2_config.h"
33 #include "h2_h2.h"
34 #include "h2_mplx.h"
35 #include "h2_push.h"
36 #include "h2_request.h"
37 #include "h2_headers.h"
38 #include "h2_session.h"
39 #include "h2_stream.h"
40 #include "h2_task.h"
41 #include "h2_ctx.h"
42 #include "h2_task.h"
43 #include "h2_util.h"
44
45
46 #define S_XXX (-2)
47 #define S_ERR (-1)
48 #define S_NOP (0)
49 #define S_IDL     (H2_SS_IDL + 1)
50 #define S_RS_L    (H2_SS_RSVD_L + 1)
51 #define S_RS_R    (H2_SS_RSVD_R + 1)
52 #define S_OPEN    (H2_SS_OPEN + 1)
53 #define S_CL_L    (H2_SS_CLOSED_L + 1)
54 #define S_CL_R    (H2_SS_CLOSED_R + 1)
55 #define S_CLS     (H2_SS_CLOSED + 1)
56 #define S_CLN     (H2_SS_CLEANUP + 1)
57
58 static const char *h2_ss_str(h2_stream_state_t state)
59 {
60     switch (state) {
61         case H2_SS_IDLE:
62             return "IDLE";
63         case H2_SS_RSVD_L:
64             return "RESERVED_LOCAL";
65         case H2_SS_RSVD_R:
66             return "RESERVED_REMOTE";
67         case H2_SS_OPEN:
68             return "OPEN";
69         case H2_SS_CLOSED_L:
70             return "HALF_CLOSED_LOCAL";
71         case H2_SS_CLOSED_R:
72             return "HALF_CLOSED_REMOTE";
73         case H2_SS_CLOSED:
74             return "CLOSED";
75         case H2_SS_CLEANUP:
76             return "CLEANUP";
77         default:
78             return "UNKNOWN";
79     }
80 }
81
82 const char *h2_stream_state_str(h2_stream *stream) 
83 {
84     return h2_ss_str(stream->state);
85 }
86
87 static int trans_on_send[][H2_SS_MAX] = {
88 /*                    S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS,  S_CLN, */        
89 /* DATA,         */ { S_ERR, S_ERR,  S_ERR,  S_NOP,  S_NOP,  S_ERR,  S_NOP,  S_NOP, },
90 /* HEADERS,      */ { S_ERR, S_ERR,  S_CL_R, S_NOP,  S_NOP,  S_ERR,  S_NOP,  S_NOP, },
91 /* PRIORITY,     */ { S_NOP, S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP, },   
92 /* RST_STREAM,   */ { S_CLS, S_CLS,  S_CLS,  S_CLS,  S_CLS,  S_CLS,  S_NOP,  S_NOP, },
93 /* SETTINGS,     */ { S_ERR, S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, },
94 /* PUSH_PROMISE, */ { S_RS_L,S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, }, 
95 /* PING,         */ { S_ERR, S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, },
96 /* GOAWAY,       */ { S_ERR, S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, },
97 /* WINDOW_UPDATE,*/ { S_NOP, S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP, },
98 /* CONT          */ { S_NOP, S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP, },
99 };
100 static int trans_on_recv[][H2_SS_MAX] = {
101 /*                    S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS,  S_CLN, */        
102 /* DATA,         */ { S_ERR, S_ERR,  S_ERR,  S_NOP,  S_ERR,  S_NOP,  S_NOP,  S_NOP, },
103 /* HEADERS,      */ { S_OPEN,S_CL_L, S_ERR,  S_NOP,  S_ERR,  S_NOP,  S_NOP,  S_NOP, },
104 /* PRIORITY,     */ { S_NOP, S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP, },   
105 /* RST_STREAM,   */ { S_ERR, S_CLS,  S_CLS,  S_CLS,  S_CLS,  S_CLS,  S_NOP,  S_NOP, },
106 /* SETTINGS,     */ { S_ERR, S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, },
107 /* PUSH_PROMISE, */ { S_RS_R,S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, }, 
108 /* PING,         */ { S_ERR, S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, },
109 /* GOAWAY,       */ { S_ERR, S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, },
110 /* WINDOW_UPDATE,*/ { S_NOP, S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP, },
111 /* CONT          */ { S_NOP, S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP, },
112 };
113 static int trans_on_event[][H2_SS_MAX] = {
114 /* H2_SEV_CLOSED_L*/{ S_XXX, S_ERR,  S_ERR,  S_CL_L, S_CLS,  S_XXX,  S_XXX,  S_XXX, },
115 /* H2_SEV_CLOSED_R*/{ S_ERR, S_ERR,  S_ERR,  S_CL_R, S_ERR,  S_CLS,  S_NOP,  S_NOP, },
116 /* H2_SEV_CANCELLED*/{S_CLS, S_CLS,  S_CLS,  S_CLS,  S_CLS,  S_CLS,  S_NOP,  S_NOP, },
117 /* H2_SEV_EOS_SENT*/{ S_NOP, S_XXX,  S_XXX,  S_XXX,  S_XXX,  S_CLS,  S_CLN,  S_XXX, },
118 };
119
120 static int on_map(h2_stream_state_t state, int map[H2_SS_MAX])
121 {
122     int op = map[state];
123     switch (op) {
124         case S_XXX:
125         case S_ERR:
126             return op;
127         case S_NOP:
128             return state;
129         default:
130             return op-1;
131     }
132 }
133
134 static int on_frame(h2_stream_state_t state, int frame_type, 
135                     int frame_map[][H2_SS_MAX], apr_size_t maxlen)
136 {
137     ap_assert(frame_type >= 0);
138     ap_assert(state >= 0);
139     if (frame_type >= maxlen) {
140         return state; /* NOP */
141     }
142     return on_map(state, frame_map[frame_type]);
143 }
144
145 static int on_frame_send(h2_stream_state_t state, int frame_type)
146 {
147     return on_frame(state, frame_type, trans_on_send, H2_ALEN(trans_on_send));
148 }
149
150 static int on_frame_recv(h2_stream_state_t state, int frame_type)
151 {
152     return on_frame(state, frame_type, trans_on_recv, H2_ALEN(trans_on_recv));
153 }
154
155 static int on_event(h2_stream_state_t state, h2_stream_event_t ev)
156 {
157     return on_map(state, trans_on_event[ev]);
158 }
159
160 static void H2_STREAM_OUT_LOG(int lvl, h2_stream *s, const char *tag)
161 {
162     if (APLOG_C_IS_LEVEL(s->session->c, lvl)) {
163         conn_rec *c = s->session->c;
164         char buffer[4 * 1024];
165         apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]);
166         
167         len = h2_util_bb_print(buffer, bmax, tag, "", s->out_buffer);
168         ap_log_cerror(APLOG_MARK, lvl, 0, c, 
169                       H2_STRM_MSG(s, "out-buffer(%s)"), len? buffer : "empty");
170     }
171 }
172
173 static apr_status_t close_input(h2_stream *stream)
174 {
175     conn_rec *c = stream->session->c;
176     apr_status_t status;
177     apr_bucket_brigade *tmp;
178     apr_bucket *b;
179
180     if (h2_beam_is_closed(stream->input)) {
181         return APR_SUCCESS;
182     }
183     
184     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
185                   H2_STRM_MSG(stream, "closing input"));
186     if (stream->rst_error) {
187         return APR_ECONNRESET;
188     }
189     
190     tmp = apr_brigade_create(stream->pool, c->bucket_alloc);
191     if (stream->trailers && !apr_is_empty_table(stream->trailers)) {
192         h2_headers *r = h2_headers_create(HTTP_OK, stream->trailers, 
193                                           NULL, stream->pool);
194         b = h2_bucket_headers_create(c->bucket_alloc, r);
195         APR_BRIGADE_INSERT_TAIL(tmp, b);
196         stream->trailers = NULL;
197         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
198                       H2_STRM_MSG(stream, "added trailers"));
199     }
200     
201     b = apr_bucket_eos_create(c->bucket_alloc);
202     APR_BRIGADE_INSERT_TAIL(tmp, b);
203     status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ);
204     apr_brigade_destroy(tmp);
205     h2_beam_close(stream->input);
206     return status;
207 }
208
209 static apr_status_t close_output(h2_stream *stream)
210 {
211     if (h2_beam_is_closed(stream->output)) {
212         return APR_SUCCESS;
213     }
214     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
215                   H2_STRM_MSG(stream, "closing output"));
216     return h2_beam_leave(stream->output);
217 }
218
219 static void on_state_enter(h2_stream *stream) 
220 {
221     if (stream->monitor && stream->monitor->on_state_enter) {
222         stream->monitor->on_state_enter(stream->monitor->ctx, stream);
223     }
224 }
225
226 static void on_state_event(h2_stream *stream, h2_stream_event_t ev) 
227 {
228     if (stream->monitor && stream->monitor->on_state_event) {
229         stream->monitor->on_state_event(stream->monitor->ctx, stream, ev);
230     }
231 }
232
233 static void on_state_invalid(h2_stream *stream) 
234 {
235     if (stream->monitor && stream->monitor->on_state_invalid) {
236         stream->monitor->on_state_invalid(stream->monitor->ctx, stream);
237     }
238     /* stream got an event/frame invalid in its state */
239     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
240                   H2_STRM_MSG(stream, "invalid state event")); 
241     switch (stream->state) {
242         case H2_SS_OPEN:
243         case H2_SS_RSVD_L:
244         case H2_SS_RSVD_R:
245         case H2_SS_CLOSED_L:
246         case H2_SS_CLOSED_R:
247             h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
248             break;
249         default:
250             break;
251     }
252 }
253
254 static apr_status_t transit(h2_stream *stream, int new_state)
255 {
256     if (new_state == stream->state) {
257         return APR_SUCCESS;
258     }
259     else if (new_state < 0) {
260         ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c, 
261                       H2_STRM_LOG(APLOGNO(03081), stream, "invalid transition"));
262         on_state_invalid(stream);
263         return APR_EINVAL;
264     }
265     
266     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, 
267                   H2_STRM_MSG(stream, "transit to [%s]"), h2_ss_str(new_state));
268     stream->state = new_state;
269     switch (new_state) {
270         case H2_SS_IDLE:
271             break;
272         case H2_SS_RSVD_L:
273             close_input(stream);
274             break;
275         case H2_SS_RSVD_R:
276             break;
277         case H2_SS_OPEN:
278             break;
279         case H2_SS_CLOSED_L:
280             close_output(stream);
281             break;
282         case H2_SS_CLOSED_R:
283             close_input(stream);
284             break;
285         case H2_SS_CLOSED:
286             close_input(stream);
287             close_output(stream);
288             if (stream->out_buffer) {
289                 apr_brigade_cleanup(stream->out_buffer);
290             }
291             break;
292         case H2_SS_CLEANUP:
293             break;
294     }
295     on_state_enter(stream);
296     return APR_SUCCESS;
297 }
298
299 void h2_stream_set_monitor(h2_stream *stream, h2_stream_monitor *monitor)
300 {
301     stream->monitor = monitor;
302 }
303
304 void h2_stream_dispatch(h2_stream *stream, h2_stream_event_t ev)
305 {
306     int new_state;
307     
308     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
309                   H2_STRM_MSG(stream, "dispatch event %d"), ev);
310     new_state = on_event(stream->state, ev);
311     if (new_state < 0) {
312         ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c, 
313                       H2_STRM_LOG(APLOGNO(10002), stream, "invalid event %d"), ev);
314         on_state_invalid(stream);
315         AP_DEBUG_ASSERT(new_state > S_XXX);
316         return;
317     }
318     else if (new_state == stream->state) {
319         /* nop */
320         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
321                       H2_STRM_MSG(stream, "ignored event %d"), ev);
322         return;
323     }
324     else {
325         on_state_event(stream, ev);
326         transit(stream, new_state);
327     }
328 }
329
330 static void set_policy_for(h2_stream *stream, h2_request *r) 
331 {
332     int enabled = h2_session_push_enabled(stream->session);
333     stream->push_policy = h2_push_policy_determine(r->headers, stream->pool, 
334                                                    enabled);
335     r->serialize = h2_config_geti(stream->session->config, H2_CONF_SER_HEADERS);
336 }
337
338 apr_status_t h2_stream_send_frame(h2_stream *stream, int ftype, int flags)
339 {
340     apr_status_t status = APR_SUCCESS;
341     int new_state, eos = 0;
342
343     new_state = on_frame_send(stream->state, ftype);
344     if (new_state < 0) {
345         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, 
346                       H2_STRM_MSG(stream, "invalid frame %d send"), ftype);
347         AP_DEBUG_ASSERT(new_state > S_XXX);
348         return transit(stream, new_state);
349     }
350     
351     switch (ftype) {
352         case NGHTTP2_DATA:
353             eos = (flags & NGHTTP2_FLAG_END_STREAM);
354             break;
355             
356         case NGHTTP2_HEADERS:
357             eos = (flags & NGHTTP2_FLAG_END_STREAM);
358             break;
359             
360         case NGHTTP2_PUSH_PROMISE:
361                 /* start pushed stream */
362                 ap_assert(stream->request == NULL);
363                 ap_assert(stream->rtmp != NULL);
364                 status = h2_request_end_headers(stream->rtmp, stream->pool, 0);
365                 if (status != APR_SUCCESS) {
366                     return status;
367                 }
368                 set_policy_for(stream, stream->rtmp);
369                 stream->request = stream->rtmp;
370                 stream->rtmp = NULL;
371             break;
372             
373         default:
374             break;
375     }
376     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, 
377                   H2_STRM_MSG(stream, "send frame %d, eos=%d"), ftype, eos);
378     status = transit(stream, new_state);
379     if (status == APR_SUCCESS && eos) {
380         status = transit(stream, on_event(stream->state, H2_SEV_CLOSED_L));
381     }
382     return status;
383 }
384
385 apr_status_t h2_stream_recv_frame(h2_stream *stream, int ftype, int flags)
386 {
387     apr_status_t status = APR_SUCCESS;
388     int new_state, eos = 0;
389
390     new_state = on_frame_recv(stream->state, ftype);
391     if (new_state < 0) {
392         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, 
393                       H2_STRM_MSG(stream, "invalid frame %d recv"), ftype);
394         AP_DEBUG_ASSERT(new_state > S_XXX);
395         return transit(stream, new_state);
396     }
397     
398     switch (ftype) {
399         case NGHTTP2_DATA:
400             eos = (flags & NGHTTP2_FLAG_END_STREAM);
401             break;
402             
403         case NGHTTP2_HEADERS:
404             eos = (flags & NGHTTP2_FLAG_END_STREAM);
405             if (stream->state == H2_SS_OPEN) {
406                 /* trailer HEADER */
407                 if (!eos) {
408                     h2_stream_rst(stream, H2_ERR_PROTOCOL_ERROR);
409                 }
410             }
411             else {
412                 /* request HEADER */
413                 ap_assert(stream->request == NULL);
414                 ap_assert(stream->rtmp != NULL);
415                 status = h2_request_end_headers(stream->rtmp, stream->pool, 0);
416                 if (status != APR_SUCCESS) {
417                     return status;
418                 }
419                 set_policy_for(stream, stream->rtmp);
420                 stream->request = stream->rtmp;
421                 stream->rtmp = NULL;
422             }
423             break;
424             
425         default:
426             break;
427     }
428     status = transit(stream, new_state);
429     if (status == APR_SUCCESS && eos) {
430         status = transit(stream, on_event(stream->state, H2_SEV_CLOSED_R));
431     }
432     return status;
433 }
434
435 apr_status_t h2_stream_recv_DATA(h2_stream *stream, uint8_t flags,
436                                     const uint8_t *data, size_t len)
437 {
438     h2_session *session = stream->session;
439     apr_status_t status = APR_SUCCESS;
440     apr_bucket_brigade *tmp;
441     
442     ap_assert(stream);
443     if (!stream->input) {
444         return APR_EOF;
445     }
446
447     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
448                   H2_STRM_MSG(stream, "recv DATA, len=%d"), (int)len);
449     
450     tmp = apr_brigade_create(stream->pool, session->c->bucket_alloc);
451     apr_brigade_write(tmp, NULL, NULL, (const char *)data, len);
452     status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ);
453     apr_brigade_destroy(tmp);
454     
455     stream->in_data_frames++;
456     stream->in_data_octets += len;
457     return status;
458 }
459
460 static void prep_output(h2_stream *stream) {
461     conn_rec *c = stream->session->c;
462     if (!stream->out_buffer) {
463         stream->out_buffer = apr_brigade_create(stream->pool, c->bucket_alloc);
464     }
465 }
466
467 h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session,
468                             h2_stream_monitor *monitor, int initiated_on)
469 {
470     h2_stream *stream = apr_pcalloc(pool, sizeof(h2_stream));
471     
472     stream->id           = id;
473     stream->initiated_on = initiated_on;
474     stream->created      = apr_time_now();
475     stream->state        = H2_SS_IDLE;
476     stream->pool         = pool;
477     stream->session      = session;
478     stream->monitor      = monitor;
479     stream->max_mem      = session->max_stream_mem;
480     
481     h2_beam_create(&stream->input, pool, id, "input", H2_BEAM_OWNER_SEND, 0);
482     h2_beam_send_from(stream->input, stream->pool);
483     h2_beam_create(&stream->output, pool, id, "output", H2_BEAM_OWNER_RECV, 0);
484     
485     ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
486                   H2_STRM_LOG(APLOGNO(03082), stream, "created"));
487     on_state_enter(stream);
488     return stream;
489 }
490
491 void h2_stream_cleanup(h2_stream *stream)
492 {
493     apr_status_t status;
494     
495     ap_assert(stream);
496     if (stream->out_buffer) {
497         /* remove any left over output buckets that may still have
498          * references into request pools */
499         apr_brigade_cleanup(stream->out_buffer);
500     }
501     h2_beam_abort(stream->input);
502     status = h2_beam_wait_empty(stream->input, APR_NONBLOCK_READ);
503     if (status == APR_EAGAIN) {
504         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, 
505                       H2_STRM_MSG(stream, "wait on input drain"));
506         status = h2_beam_wait_empty(stream->input, APR_BLOCK_READ);
507         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c, 
508                       H2_STRM_MSG(stream, "input drain returned"));
509     }
510 }
511
512 void h2_stream_destroy(h2_stream *stream)
513 {
514     ap_assert(stream);
515     ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, stream->session->c, 
516                   H2_STRM_MSG(stream, "destroy"));
517     if (stream->pool) {
518         apr_pool_destroy(stream->pool);
519         stream->pool = NULL;
520     }
521 }
522
523 apr_pool_t *h2_stream_detach_pool(h2_stream *stream)
524 {
525     apr_pool_t *pool = stream->pool;
526     stream->pool = NULL;
527     return pool;
528 }
529
530 void h2_stream_rst(h2_stream *stream, int error_code)
531 {
532     stream->rst_error = error_code;
533     h2_beam_abort(stream->input);
534     h2_beam_leave(stream->output);
535     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
536                   H2_STRM_MSG(stream, "reset, error=%d"), error_code);
537     h2_stream_dispatch(stream, H2_SEV_CANCELLED);
538 }
539
540 apr_status_t h2_stream_set_request_rec(h2_stream *stream, 
541                                        request_rec *r, int eos)
542 {
543     h2_request *req;
544     apr_status_t status;
545
546     ap_assert(stream->request == NULL);
547     ap_assert(stream->rtmp == NULL);
548     if (stream->rst_error) {
549         return APR_ECONNRESET;
550     }
551     status = h2_request_rcreate(&req, stream->pool, r);
552     if (status == APR_SUCCESS) {
553         ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, 
554                       H2_STRM_LOG(APLOGNO(03058), stream, 
555                       "set_request_rec %s host=%s://%s%s"),
556                       req->method, req->scheme, req->authority, req->path);
557         stream->rtmp = req;
558         /* simulate the frames that led to this */
559         return h2_stream_recv_frame(stream, NGHTTP2_HEADERS, 
560                                     NGHTTP2_FLAG_END_STREAM);
561     }
562     return status;
563 }
564
565 void h2_stream_set_request(h2_stream *stream, const h2_request *r)
566 {
567     ap_assert(stream->request == NULL);
568     ap_assert(stream->rtmp == NULL);
569     stream->rtmp = h2_request_clone(stream->pool, r);
570 }
571
572 static void set_error_response(h2_stream *stream, int http_status)
573 {
574     if (!h2_stream_is_ready(stream)) {
575         conn_rec *c = stream->session->c;
576         apr_bucket *b;
577         h2_headers *response;
578         
579         response = h2_headers_die(http_status, stream->request, stream->pool);
580         prep_output(stream);
581         b = apr_bucket_eos_create(c->bucket_alloc);
582         APR_BRIGADE_INSERT_HEAD(stream->out_buffer, b);
583         b = h2_bucket_headers_create(c->bucket_alloc, response);
584         APR_BRIGADE_INSERT_HEAD(stream->out_buffer, b);
585     }
586 }
587
588 static apr_status_t add_trailer(h2_stream *stream,
589                                 const char *name, size_t nlen,
590                                 const char *value, size_t vlen)
591 {
592     conn_rec *c = stream->session->c;
593     char *hname, *hvalue;
594
595     if (nlen == 0 || name[0] == ':') {
596         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, APR_EINVAL, c, 
597                       H2_STRM_LOG(APLOGNO(03060), stream, 
598                       "pseudo header in trailer"));
599         return APR_EINVAL;
600     }
601     if (h2_req_ignore_trailer(name, nlen)) {
602         return APR_SUCCESS;
603     }
604     if (!stream->trailers) {
605         stream->trailers = apr_table_make(stream->pool, 5);
606     }
607     hname = apr_pstrndup(stream->pool, name, nlen);
608     hvalue = apr_pstrndup(stream->pool, value, vlen);
609     h2_util_camel_case_header(hname, nlen);
610     apr_table_mergen(stream->trailers, hname, hvalue);
611     
612     return APR_SUCCESS;
613 }
614
615 apr_status_t h2_stream_add_header(h2_stream *stream,
616                                   const char *name, size_t nlen,
617                                   const char *value, size_t vlen)
618 {
619     h2_session *session = stream->session;
620     int error = 0;
621     apr_status_t status;
622     
623     if (stream->has_response) {
624         return APR_EINVAL;    
625     }
626     ++stream->request_headers_added;
627     if (name[0] == ':') {
628         if ((vlen) > session->s->limit_req_line) {
629             /* pseudo header: approximation of request line size check */
630             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
631                           H2_STRM_MSG(stream, "pseudo %s too long"), name);
632             error = HTTP_REQUEST_URI_TOO_LARGE;
633         }
634     }
635     else if ((nlen + 2 + vlen) > session->s->limit_req_fieldsize) {
636         /* header too long */
637         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
638                       H2_STRM_MSG(stream, "header %s too long"), name);
639         error = HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE;
640     }
641     
642     if (stream->request_headers_added > session->s->limit_req_fields + 4) {
643         /* too many header lines, include 4 pseudo headers */
644         if (stream->request_headers_added 
645             > session->s->limit_req_fields + 4 + 100) {
646             /* yeah, right */
647             h2_stream_rst(stream, H2_ERR_ENHANCE_YOUR_CALM);
648             return APR_ECONNRESET;
649         }
650         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
651                       H2_STRM_MSG(stream, "too many header lines")); 
652         error = HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE;
653     }
654     
655     if (error) {
656         set_error_response(stream, error);
657         return APR_EINVAL; 
658     }
659     else if (H2_SS_IDLE == stream->state) {
660         if (!stream->rtmp) {
661             stream->rtmp = h2_req_create(stream->id, stream->pool, 
662                                          NULL, NULL, NULL, NULL, NULL, 0);
663         }
664         status = h2_request_add_header(stream->rtmp, stream->pool,
665                                        name, nlen, value, vlen);
666     }
667     else  {
668         status = add_trailer(stream, name, nlen, value, vlen);
669     }
670     if (status != APR_SUCCESS) {
671         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
672                       H2_STRM_MSG(stream, "header %s not accepted"), name);
673         h2_stream_dispatch(stream, H2_SEV_CANCELLED);
674     }
675     return status;
676 }
677
678 static apr_bucket *get_first_headers_bucket(apr_bucket_brigade *bb)
679 {
680     if (bb) {
681         apr_bucket *b = APR_BRIGADE_FIRST(bb);
682         while (b != APR_BRIGADE_SENTINEL(bb)) {
683             if (H2_BUCKET_IS_HEADERS(b)) {
684                 return b;
685             }
686             b = APR_BUCKET_NEXT(b);
687         }
688     }
689     return NULL;
690 }
691
692 apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen, 
693                                    int *peos, h2_headers **presponse)
694 {
695     apr_status_t status = APR_SUCCESS;
696     apr_off_t requested, max_chunk = H2_DATA_CHUNK_SIZE;
697     apr_bucket *b, *e;
698     conn_rec *c;
699
700     if (presponse) {
701         *presponse = NULL;
702     }
703     
704     if (stream->rst_error) {
705         *plen = 0;
706         *peos = 1;
707         return APR_ECONNRESET;
708     }
709     
710     c = stream->session->c;
711     prep_output(stream);
712     
713     /* determine how much we'd like to send. We cannot send more than
714      * is requested. But we can reduce the size in case the master
715      * connection operates in smaller chunks. (TSL warmup) */
716     if (stream->session->io.write_size > 0) {
717         max_chunk = stream->session->io.write_size - 9; /* header bits */ 
718     }
719     *plen = requested = (*plen > 0)? H2MIN(*plen, max_chunk) : max_chunk;
720     
721     h2_util_bb_avail(stream->out_buffer, plen, peos);
722     if (!*peos && *plen < requested && *plen < stream->max_mem) {
723         H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "pre");
724         status = h2_beam_receive(stream->output, stream->out_buffer, 
725                                  APR_NONBLOCK_READ, stream->max_mem - *plen);
726         if (APR_STATUS_IS_EOF(status)) {
727             apr_bucket *eos = apr_bucket_eos_create(c->bucket_alloc);
728             APR_BRIGADE_INSERT_TAIL(stream->out_buffer, eos);
729             status = APR_SUCCESS;
730         }
731         else if (status == APR_EAGAIN) {
732             status = APR_SUCCESS;
733         }
734         *plen = requested;
735         h2_util_bb_avail(stream->out_buffer, plen, peos);
736         H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "post");
737     }
738     else {
739         H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "ok");
740     }
741
742     b = APR_BRIGADE_FIRST(stream->out_buffer);
743     while (b != APR_BRIGADE_SENTINEL(stream->out_buffer)) {
744         e = APR_BUCKET_NEXT(b);
745         if (APR_BUCKET_IS_FLUSH(b)
746             || (!APR_BUCKET_IS_METADATA(b) && b->length == 0)) {
747             APR_BUCKET_REMOVE(b);
748             apr_bucket_destroy(b);
749         }
750         else {
751             break;
752         }
753         b = e;
754     }
755
756     b = get_first_headers_bucket(stream->out_buffer);
757     if (b) {
758         /* there are HEADERS to submit */
759         *peos = 0;
760         *plen = 0;
761         if (b == APR_BRIGADE_FIRST(stream->out_buffer)) {
762             if (presponse) {
763                 *presponse = h2_bucket_headers_get(b);
764                 APR_BUCKET_REMOVE(b);
765                 apr_bucket_destroy(b);
766                 status = APR_SUCCESS;
767             }
768             else {
769                 /* someone needs to retrieve the response first */
770                 h2_mplx_keep_active(stream->session->mplx, stream->id);
771                 status = APR_EAGAIN;
772             }
773         }
774         else {
775             apr_bucket *e = APR_BRIGADE_FIRST(stream->out_buffer);
776             while (e != APR_BRIGADE_SENTINEL(stream->out_buffer)) {
777                 if (e == b) {
778                     break;
779                 }
780                 else if (e->length != (apr_size_t)-1) {
781                     *plen += e->length;
782                 }
783                 e = APR_BUCKET_NEXT(e);
784             }
785         }
786     }
787
788     if (status == APR_SUCCESS) {
789         if (presponse && *presponse) {
790             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
791                           H2_STRM_MSG(stream, "prepare, response %d"), 
792                           (*presponse)->status);
793         }
794         else if (*peos || *plen) {
795             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
796                           H2_STRM_MSG(stream, "prepare, len=%ld eos=%d"),
797                           (long)*plen, *peos);
798         }
799         else {
800             status = APR_EAGAIN;
801             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
802                           H2_STRM_MSG(stream, "prepare, no data"));
803         }
804     }
805     return status;
806 }
807
808 static int is_not_headers(apr_bucket *b)
809 {
810     return !H2_BUCKET_IS_HEADERS(b);
811 }
812
813 apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb, 
814                                apr_off_t *plen, int *peos)
815 {
816     conn_rec *c = stream->session->c;
817     apr_status_t status = APR_SUCCESS;
818
819     if (stream->rst_error) {
820         return APR_ECONNRESET;
821     }
822     status = h2_append_brigade(bb, stream->out_buffer, plen, peos, is_not_headers);
823     if (status == APR_SUCCESS && !*peos && !*plen) {
824         status = APR_EAGAIN;
825     }
826     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c,
827                   H2_STRM_MSG(stream, "read_to, len=%ld eos=%d"),
828                   (long)*plen, *peos);
829     return status;
830 }
831
832
833 apr_status_t h2_stream_submit_pushes(h2_stream *stream, h2_headers *response)
834 {
835     apr_status_t status = APR_SUCCESS;
836     apr_array_header_t *pushes;
837     int i;
838     
839     pushes = h2_push_collect_update(stream, stream->request, response);
840     if (pushes && !apr_is_empty_array(pushes)) {
841         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
842                       H2_STRM_MSG(stream, "found %d push candidates"),
843                       pushes->nelts);
844         for (i = 0; i < pushes->nelts; ++i) {
845             h2_push *push = APR_ARRAY_IDX(pushes, i, h2_push*);
846             h2_stream *s = h2_session_push(stream->session, stream, push);
847             if (!s) {
848                 status = APR_ECONNRESET;
849                 break;
850             }
851         }
852     }
853     return status;
854 }
855
856 apr_table_t *h2_stream_get_trailers(h2_stream *stream)
857 {
858     return NULL;
859 }
860
861 const h2_priority *h2_stream_get_priority(h2_stream *stream, 
862                                           h2_headers *response)
863 {
864     if (response && stream->initiated_on) {
865         const char *ctype = apr_table_get(response->headers, "content-type");
866         if (ctype) {
867             /* FIXME: Not good enough, config needs to come from request->server */
868             return h2_config_get_priority(stream->session->config, ctype);
869         }
870     }
871     return NULL;
872 }
873
874 int h2_stream_is_ready(h2_stream *stream)
875 {
876     if (stream->has_response) {
877         return 1;
878     }
879     else if (stream->out_buffer && get_first_headers_bucket(stream->out_buffer)) {
880         return 1;
881     }
882     return 0;
883 }
884
885 int h2_stream_was_closed(const h2_stream *stream)
886 {
887     switch (stream->state) {
888         case H2_SS_CLOSED:
889         case H2_SS_CLEANUP:
890             return 1;
891         default:
892             return 0;
893     }
894 }
895
896