]> granicus.if.org Git - apache/blob - modules/http2/h2_stream.c
0ef45ae3049b315b6008da92812fa47309d33229
[apache] / modules / http2 / h2_stream.c
1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2  * contributor license agreements.  See the NOTICE file distributed with
3  * this work for additional information regarding copyright ownership.
4  * The ASF licenses this file to You under the Apache License, Version 2.0
5  * (the "License"); you may not use this file except in compliance with
6  * the License.  You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16  
17 /* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
18  *
19  * Licensed under the Apache License, Version 2.0 (the "License");
20  * you may not use this file except in compliance with the License.
21  * You may obtain a copy of the License at
22  *
23  * http://www.apache.org/licenses/LICENSE-2.0
24  
25  * Unless required by applicable law or agreed to in writing, software
26  * distributed under the License is distributed on an "AS IS" BASIS,
27  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
28  * See the License for the specific language governing permissions and
29  * limitations under the License.
30  */
31
32 #include <assert.h>
33 #include <stddef.h>
34
35 #include <apr_strings.h>
36
37 #include <httpd.h>
38 #include <http_core.h>
39 #include <http_connection.h>
40 #include <http_log.h>
41
42 #include <nghttp2/nghttp2.h>
43
44 #include "h2_private.h"
45 #include "h2.h"
46 #include "h2_bucket_beam.h"
47 #include "h2_conn.h"
48 #include "h2_config.h"
49 #include "h2_h2.h"
50 #include "h2_mplx.h"
51 #include "h2_push.h"
52 #include "h2_request.h"
53 #include "h2_headers.h"
54 #include "h2_session.h"
55 #include "h2_stream.h"
56 #include "h2_task.h"
57 #include "h2_ctx.h"
58 #include "h2_task.h"
59 #include "h2_util.h"
60
61
62 static const char *h2_ss_str(h2_stream_state_t state)
63 {
64     switch (state) {
65         case H2_SS_IDLE:
66             return "IDLE";
67         case H2_SS_RSVD_L:
68             return "RESERVED_LOCAL";
69         case H2_SS_RSVD_R:
70             return "RESERVED_REMOTE";
71         case H2_SS_OPEN:
72             return "OPEN";
73         case H2_SS_CLOSED_L:
74             return "HALF_CLOSED_LOCAL";
75         case H2_SS_CLOSED_R:
76             return "HALF_CLOSED_REMOTE";
77         case H2_SS_CLOSED:
78             return "CLOSED";
79         case H2_SS_CLEANUP:
80             return "CLEANUP";
81         default:
82             return "UNKNOWN";
83     }
84 }
85
86 const char *h2_stream_state_str(h2_stream *stream) 
87 {
88     return h2_ss_str(stream->state);
89 }
90
91 /* Abbreviations for stream transit tables */
92 #define S_XXX     (-2)                      /* Programming Error */
93 #define S_ERR     (-1)                      /* Protocol Error */
94 #define S_NOP     (0)                       /* No Change */
95 #define S_IDL     (H2_SS_IDL + 1)
96 #define S_RS_L    (H2_SS_RSVD_L + 1)
97 #define S_RS_R    (H2_SS_RSVD_R + 1)
98 #define S_OPEN    (H2_SS_OPEN + 1)
99 #define S_CL_L    (H2_SS_CLOSED_L + 1)
100 #define S_CL_R    (H2_SS_CLOSED_R + 1)
101 #define S_CLS     (H2_SS_CLOSED + 1)
102 #define S_CLN     (H2_SS_CLEANUP + 1)
103
104 /* state transisitions when certain frame types are sent */
105 static int trans_on_send[][H2_SS_MAX] = {
106 /*S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS,  S_CLN, */        
107 { S_ERR, S_ERR,  S_ERR,  S_NOP,  S_NOP,  S_ERR,  S_NOP,  S_NOP, },/* DATA */ 
108 { S_ERR, S_ERR,  S_CL_R, S_NOP,  S_NOP,  S_ERR,  S_NOP,  S_NOP, },/* HEADERS */ 
109 { S_NOP, S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP, },/* PRIORITY */    
110 { S_CLS, S_CLS,  S_CLS,  S_CLS,  S_CLS,  S_CLS,  S_NOP,  S_NOP, },/* RST_STREAM */ 
111 { S_ERR, S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, },/* SETTINGS */ 
112 { S_RS_L,S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, },/* PUSH_PROMISE */  
113 { S_ERR, S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, },/* PING */ 
114 { S_ERR, S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, },/* GOAWAY */ 
115 { S_NOP, S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP, },/* WINDOW_UPDATE */ 
116 { S_NOP, S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP, },/* CONT */ 
117 };
118 /* state transisitions when certain frame types are received */
119 static int trans_on_recv[][H2_SS_MAX] = {
120 /*S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS,  S_CLN, */        
121 { S_ERR, S_ERR,  S_ERR,  S_NOP,  S_ERR,  S_NOP,  S_NOP,  S_NOP, },/* DATA */ 
122 { S_OPEN,S_CL_L, S_ERR,  S_NOP,  S_ERR,  S_NOP,  S_NOP,  S_NOP, },/* HEADERS */ 
123 { S_NOP, S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP, },/* PRIORITY */    
124 { S_ERR, S_CLS,  S_CLS,  S_CLS,  S_CLS,  S_CLS,  S_NOP,  S_NOP, },/* RST_STREAM */ 
125 { S_ERR, S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, },/* SETTINGS */ 
126 { S_RS_R,S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, },/* PUSH_PROMISE */  
127 { S_ERR, S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, },/* PING */ 
128 { S_ERR, S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, },/* GOAWAY */ 
129 { S_NOP, S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP, },/* WINDOW_UPDATE */ 
130 { S_NOP, S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP, },/* CONT */ 
131 };
132 /* state transisitions when certain events happen */
133 static int trans_on_event[][H2_SS_MAX] = {
134 /*S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS,  S_CLN, */        
135 { S_XXX, S_ERR,  S_ERR,  S_CL_L, S_CLS,  S_XXX,  S_XXX,  S_XXX, },/* EV_CLOSED_L*/
136 { S_ERR, S_ERR,  S_ERR,  S_CL_R, S_ERR,  S_CLS,  S_NOP,  S_NOP, },/* EV_CLOSED_R*/
137 { S_CLS, S_CLS,  S_CLS,  S_CLS,  S_CLS,  S_CLS,  S_NOP,  S_NOP, },/* EV_CANCELLED*/
138 { S_NOP, S_XXX,  S_XXX,  S_XXX,  S_XXX,  S_CLS,  S_CLN,  S_XXX, },/* EV_EOS_SENT*/
139 };
140
141 static int on_map(h2_stream_state_t state, int map[H2_SS_MAX])
142 {
143     int op = map[state];
144     switch (op) {
145         case S_XXX:
146         case S_ERR:
147             return op;
148         case S_NOP:
149             return state;
150         default:
151             return op-1;
152     }
153 }
154
155 static int on_frame(h2_stream_state_t state, int frame_type, 
156                     int frame_map[][H2_SS_MAX], apr_size_t maxlen)
157 {
158     ap_assert(frame_type >= 0);
159     ap_assert(state >= 0);
160     if (frame_type >= maxlen) {
161         return state; /* NOP, ignore unknown frame types */
162     }
163     return on_map(state, frame_map[frame_type]);
164 }
165
166 static int on_frame_send(h2_stream_state_t state, int frame_type)
167 {
168     return on_frame(state, frame_type, trans_on_send, H2_ALEN(trans_on_send));
169 }
170
171 static int on_frame_recv(h2_stream_state_t state, int frame_type)
172 {
173     return on_frame(state, frame_type, trans_on_recv, H2_ALEN(trans_on_recv));
174 }
175
176 static int on_event(h2_stream* stream, h2_stream_event_t ev)
177 {
178     if (stream->monitor && stream->monitor->on_event) {
179         stream->monitor->on_event(stream->monitor->ctx, stream, ev);
180     }
181     if (ev < H2_ALEN(trans_on_event)) {
182         return on_map(stream->state, trans_on_event[ev]);
183     }
184     return stream->state;
185 }
186
187 static void H2_STREAM_OUT_LOG(int lvl, h2_stream *s, const char *tag)
188 {
189     if (APLOG_C_IS_LEVEL(s->session->c, lvl)) {
190         conn_rec *c = s->session->c;
191         char buffer[4 * 1024];
192         apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]);
193         
194         len = h2_util_bb_print(buffer, bmax, tag, "", s->out_buffer);
195         ap_log_cerror(APLOG_MARK, lvl, 0, c, 
196                       H2_STRM_MSG(s, "out-buffer(%s)"), len? buffer : "empty");
197     }
198 }
199
200 static apr_status_t setup_input(h2_stream *stream) {
201     if (stream->input == NULL) {
202         int empty = (stream->input_eof 
203                      && (!stream->in_buffer 
204                          || APR_BRIGADE_EMPTY(stream->in_buffer)));
205         if (!empty) {
206             h2_beam_create(&stream->input, stream->pool, stream->id, 
207                            "input", H2_BEAM_OWNER_SEND, 0, 
208                            stream->session->s->timeout);
209             h2_beam_send_from(stream->input, stream->pool);
210         }
211     }
212     return APR_SUCCESS;
213 }
214
215 static apr_status_t close_input(h2_stream *stream)
216 {
217     conn_rec *c = stream->session->c;
218     apr_status_t status = APR_SUCCESS;
219
220     stream->input_eof = 1;
221     if (stream->input && h2_beam_is_closed(stream->input)) {
222         return APR_SUCCESS;
223     }
224     
225     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
226                   H2_STRM_MSG(stream, "closing input"));
227     if (stream->rst_error) {
228         return APR_ECONNRESET;
229     }
230     
231     if (stream->trailers && !apr_is_empty_table(stream->trailers)) {
232         apr_bucket *b;
233         h2_headers *r;
234         
235         if (!stream->in_buffer) {
236             stream->in_buffer = apr_brigade_create(stream->pool, c->bucket_alloc);
237         }
238         
239         r = h2_headers_create(HTTP_OK, stream->trailers, NULL, stream->pool);
240         stream->trailers = NULL;        
241         b = h2_bucket_headers_create(c->bucket_alloc, r);
242         APR_BRIGADE_INSERT_TAIL(stream->in_buffer, b);
243         
244         b = apr_bucket_eos_create(c->bucket_alloc);
245         APR_BRIGADE_INSERT_TAIL(stream->in_buffer, b);
246         
247         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
248                       H2_STRM_MSG(stream, "added trailers"));
249         h2_stream_dispatch(stream, H2_SEV_IN_DATA_PENDING);
250     }
251     if (stream->input) {
252         h2_stream_flush_input(stream);
253         return h2_beam_close(stream->input);
254     }
255     return status;
256 }
257
258 static apr_status_t close_output(h2_stream *stream)
259 {
260     if (!stream->output || h2_beam_is_closed(stream->output)) {
261         return APR_SUCCESS;
262     }
263     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
264                   H2_STRM_MSG(stream, "closing output"));
265     return h2_beam_leave(stream->output);
266 }
267
268 static void on_state_enter(h2_stream *stream) 
269 {
270     if (stream->monitor && stream->monitor->on_state_enter) {
271         stream->monitor->on_state_enter(stream->monitor->ctx, stream);
272     }
273 }
274
275 static void on_state_event(h2_stream *stream, h2_stream_event_t ev) 
276 {
277     if (stream->monitor && stream->monitor->on_state_event) {
278         stream->monitor->on_state_event(stream->monitor->ctx, stream, ev);
279     }
280 }
281
282 static void on_state_invalid(h2_stream *stream) 
283 {
284     if (stream->monitor && stream->monitor->on_state_invalid) {
285         stream->monitor->on_state_invalid(stream->monitor->ctx, stream);
286     }
287     /* stream got an event/frame invalid in its state */
288     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
289                   H2_STRM_MSG(stream, "invalid state event")); 
290     switch (stream->state) {
291         case H2_SS_OPEN:
292         case H2_SS_RSVD_L:
293         case H2_SS_RSVD_R:
294         case H2_SS_CLOSED_L:
295         case H2_SS_CLOSED_R:
296             h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
297             break;
298         default:
299             break;
300     }
301 }
302
303 static apr_status_t transit(h2_stream *stream, int new_state)
304 {
305     if (new_state == stream->state) {
306         return APR_SUCCESS;
307     }
308     else if (new_state < 0) {
309         ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c, 
310                       H2_STRM_LOG(APLOGNO(03081), stream, "invalid transition"));
311         on_state_invalid(stream);
312         return APR_EINVAL;
313     }
314     
315     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, 
316                   H2_STRM_MSG(stream, "transit to [%s]"), h2_ss_str(new_state));
317     stream->state = new_state;
318     switch (new_state) {
319         case H2_SS_IDLE:
320             break;
321         case H2_SS_RSVD_L:
322             close_input(stream);
323             break;
324         case H2_SS_RSVD_R:
325             break;
326         case H2_SS_OPEN:
327             break;
328         case H2_SS_CLOSED_L:
329             close_output(stream);
330             break;
331         case H2_SS_CLOSED_R:
332             close_input(stream);
333             break;
334         case H2_SS_CLOSED:
335             close_input(stream);
336             close_output(stream);
337             if (stream->out_buffer) {
338                 apr_brigade_cleanup(stream->out_buffer);
339             }
340             break;
341         case H2_SS_CLEANUP:
342             break;
343     }
344     on_state_enter(stream);
345     return APR_SUCCESS;
346 }
347
348 void h2_stream_set_monitor(h2_stream *stream, h2_stream_monitor *monitor)
349 {
350     stream->monitor = monitor;
351 }
352
353 void h2_stream_dispatch(h2_stream *stream, h2_stream_event_t ev)
354 {
355     int new_state;
356     
357     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
358                   H2_STRM_MSG(stream, "dispatch event %d"), ev);
359     new_state = on_event(stream, ev);
360     if (new_state < 0) {
361         ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c, 
362                       H2_STRM_LOG(APLOGNO(10002), stream, "invalid event %d"), ev);
363         on_state_invalid(stream);
364         AP_DEBUG_ASSERT(new_state > S_XXX);
365         return;
366     }
367     else if (new_state == stream->state) {
368         /* nop */
369         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
370                       H2_STRM_MSG(stream, "non-state event %d"), ev);
371         return;
372     }
373     else {
374         on_state_event(stream, ev);
375         transit(stream, new_state);
376     }
377 }
378
379 static void set_policy_for(h2_stream *stream, h2_request *r) 
380 {
381     int enabled = h2_session_push_enabled(stream->session);
382     stream->push_policy = h2_push_policy_determine(r->headers, stream->pool, 
383                                                    enabled);
384     r->serialize = h2_config_geti(stream->session->config, H2_CONF_SER_HEADERS);
385 }
386
387 apr_status_t h2_stream_send_frame(h2_stream *stream, int ftype, int flags)
388 {
389     apr_status_t status = APR_SUCCESS;
390     int new_state, eos = 0;
391
392     new_state = on_frame_send(stream->state, ftype);
393     if (new_state < 0) {
394         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, 
395                       H2_STRM_MSG(stream, "invalid frame %d send"), ftype);
396         AP_DEBUG_ASSERT(new_state > S_XXX);
397         return transit(stream, new_state);
398     }
399     
400     switch (ftype) {
401         case NGHTTP2_DATA:
402             eos = (flags & NGHTTP2_FLAG_END_STREAM);
403             break;
404             
405         case NGHTTP2_HEADERS:
406             eos = (flags & NGHTTP2_FLAG_END_STREAM);
407             break;
408             
409         case NGHTTP2_PUSH_PROMISE:
410                 /* start pushed stream */
411                 ap_assert(stream->request == NULL);
412                 ap_assert(stream->rtmp != NULL);
413                 status = h2_request_end_headers(stream->rtmp, stream->pool, 1);
414                 if (status != APR_SUCCESS) {
415                     return status;
416                 }
417                 set_policy_for(stream, stream->rtmp);
418                 stream->request = stream->rtmp;
419                 stream->rtmp = NULL;
420             break;
421             
422         default:
423             break;
424     }
425     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, 
426                   H2_STRM_MSG(stream, "send frame %d, eos=%d"), ftype, eos);
427     status = transit(stream, new_state);
428     if (status == APR_SUCCESS && eos) {
429         status = transit(stream, on_event(stream, H2_SEV_CLOSED_L));
430     }
431     return status;
432 }
433
434 apr_status_t h2_stream_recv_frame(h2_stream *stream, int ftype, int flags)
435 {
436     apr_status_t status = APR_SUCCESS;
437     int new_state, eos = 0;
438
439     new_state = on_frame_recv(stream->state, ftype);
440     if (new_state < 0) {
441         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, 
442                       H2_STRM_MSG(stream, "invalid frame %d recv"), ftype);
443         AP_DEBUG_ASSERT(new_state > S_XXX);
444         return transit(stream, new_state);
445     }
446     
447     switch (ftype) {
448         case NGHTTP2_DATA:
449             eos = (flags & NGHTTP2_FLAG_END_STREAM);
450             break;
451             
452         case NGHTTP2_HEADERS:
453             eos = (flags & NGHTTP2_FLAG_END_STREAM);
454             if (stream->state == H2_SS_OPEN) {
455                 /* trailer HEADER */
456                 if (!eos) {
457                     h2_stream_rst(stream, H2_ERR_PROTOCOL_ERROR);
458                 }
459             }
460             else {
461                 /* request HEADER */
462                 ap_assert(stream->request == NULL);
463                 if (stream->rtmp == NULL) {
464                     /* This can only happen, if the stream has received no header
465                      * name/value pairs at all. The lastest nghttp2 version have become
466                      * pretty good at detecting this early. In any case, we have
467                      * to abort the connection here, since this is clearly a protocol error */
468                     return APR_EINVAL;
469                 }
470                 status = h2_request_end_headers(stream->rtmp, stream->pool, eos);
471                 if (status != APR_SUCCESS) {
472                     return status;
473                 }
474                 set_policy_for(stream, stream->rtmp);
475                 stream->request = stream->rtmp;
476                 stream->rtmp = NULL;
477             }
478             break;
479             
480         default:
481             break;
482     }
483     status = transit(stream, new_state);
484     if (status == APR_SUCCESS && eos) {
485         status = transit(stream, on_event(stream, H2_SEV_CLOSED_R));
486     }
487     return status;
488 }
489
490 apr_status_t h2_stream_flush_input(h2_stream *stream)
491 {
492     apr_status_t status = APR_SUCCESS;
493     
494     if (stream->in_buffer && !APR_BRIGADE_EMPTY(stream->in_buffer)) {
495         setup_input(stream);
496         status = h2_beam_send(stream->input, stream->in_buffer, APR_BLOCK_READ);
497         stream->in_last_write = apr_time_now();
498     }
499     if (stream->input_eof 
500         && stream->input && !h2_beam_is_closed(stream->input)) {
501         status = h2_beam_close(stream->input);
502     }
503     return status;
504 }
505
506 apr_status_t h2_stream_recv_DATA(h2_stream *stream, uint8_t flags,
507                                     const uint8_t *data, size_t len)
508 {
509     h2_session *session = stream->session;
510     apr_status_t status = APR_SUCCESS;
511     
512     stream->in_data_frames++;
513     if (len > 0) {
514         if (APLOGctrace3(session->c)) {
515             const char *load = apr_pstrndup(stream->pool, (const char *)data, len);
516             ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, session->c,
517                           H2_STRM_MSG(stream, "recv DATA, len=%d: -->%s<--"), 
518                           (int)len, load);
519         }
520         else {
521             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
522                           H2_STRM_MSG(stream, "recv DATA, len=%d"), (int)len);
523         }
524         stream->in_data_octets += len;
525         if (!stream->in_buffer) {
526             stream->in_buffer = apr_brigade_create(stream->pool, 
527                                                    session->c->bucket_alloc);
528         }
529         apr_brigade_write(stream->in_buffer, NULL, NULL, (const char *)data, len);
530         h2_stream_dispatch(stream, H2_SEV_IN_DATA_PENDING);
531     }
532     return status;
533 }
534
535 static void prep_output(h2_stream *stream) {
536     conn_rec *c = stream->session->c;
537     if (!stream->out_buffer) {
538         stream->out_buffer = apr_brigade_create(stream->pool, c->bucket_alloc);
539     }
540 }
541
542 h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session,
543                             h2_stream_monitor *monitor, int initiated_on)
544 {
545     h2_stream *stream = apr_pcalloc(pool, sizeof(h2_stream));
546     
547     stream->id           = id;
548     stream->initiated_on = initiated_on;
549     stream->created      = apr_time_now();
550     stream->state        = H2_SS_IDLE;
551     stream->pool         = pool;
552     stream->session      = session;
553     stream->monitor      = monitor;
554     stream->max_mem      = session->max_stream_mem;
555     
556 #ifdef H2_NG2_LOCAL_WIN_SIZE
557     stream->in_window_size = 
558         nghttp2_session_get_stream_local_window_size(
559             stream->session->ngh2, stream->id);
560 #endif
561
562     ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
563                   H2_STRM_LOG(APLOGNO(03082), stream, "created"));
564     on_state_enter(stream);
565     return stream;
566 }
567
568 void h2_stream_cleanup(h2_stream *stream)
569 {
570     apr_status_t status;
571     
572     ap_assert(stream);
573     if (stream->out_buffer) {
574         /* remove any left over output buckets that may still have
575          * references into request pools */
576         apr_brigade_cleanup(stream->out_buffer);
577     }
578     if (stream->input) {
579         h2_beam_abort(stream->input);
580         status = h2_beam_wait_empty(stream->input, APR_NONBLOCK_READ);
581         if (status == APR_EAGAIN) {
582             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, 
583                           H2_STRM_MSG(stream, "wait on input drain"));
584             status = h2_beam_wait_empty(stream->input, APR_BLOCK_READ);
585             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c, 
586                           H2_STRM_MSG(stream, "input drain returned"));
587         }
588     }
589 }
590
591 void h2_stream_destroy(h2_stream *stream)
592 {
593     ap_assert(stream);
594     ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, stream->session->c, 
595                   H2_STRM_MSG(stream, "destroy"));
596     if (stream->pool) {
597         apr_pool_destroy(stream->pool);
598         stream->pool = NULL;
599     }
600 }
601
602 apr_pool_t *h2_stream_detach_pool(h2_stream *stream)
603 {
604     apr_pool_t *pool = stream->pool;
605     stream->pool = NULL;
606     return pool;
607 }
608
609 apr_status_t h2_stream_prep_processing(h2_stream *stream)
610 {
611     if (stream->request) {
612         const h2_request *r = stream->request;
613         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
614                       H2_STRM_MSG(stream, "schedule %s %s://%s%s chunked=%d"),
615                       r->method, r->scheme, r->authority, r->path, r->chunked);
616         setup_input(stream);
617         stream->scheduled = 1;
618         return APR_SUCCESS;
619     }
620     return APR_EINVAL;
621 }
622
623 void h2_stream_rst(h2_stream *stream, int error_code)
624 {
625     stream->rst_error = error_code;
626     if (stream->input) {
627         h2_beam_abort(stream->input);
628     }
629     if (stream->output) {
630         h2_beam_leave(stream->output);
631     }
632     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
633                   H2_STRM_MSG(stream, "reset, error=%d"), error_code);
634     h2_stream_dispatch(stream, H2_SEV_CANCELLED);
635 }
636
637 apr_status_t h2_stream_set_request_rec(h2_stream *stream, 
638                                        request_rec *r, int eos)
639 {
640     h2_request *req;
641     apr_status_t status;
642
643     ap_assert(stream->request == NULL);
644     ap_assert(stream->rtmp == NULL);
645     if (stream->rst_error) {
646         return APR_ECONNRESET;
647     }
648     status = h2_request_rcreate(&req, stream->pool, r);
649     if (status == APR_SUCCESS) {
650         ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, 
651                       H2_STRM_LOG(APLOGNO(03058), stream, 
652                       "set_request_rec %s host=%s://%s%s"),
653                       req->method, req->scheme, req->authority, req->path);
654         stream->rtmp = req;
655         /* simulate the frames that led to this */
656         return h2_stream_recv_frame(stream, NGHTTP2_HEADERS, 
657                                     NGHTTP2_FLAG_END_STREAM);
658     }
659     return status;
660 }
661
662 void h2_stream_set_request(h2_stream *stream, const h2_request *r)
663 {
664     ap_assert(stream->request == NULL);
665     ap_assert(stream->rtmp == NULL);
666     stream->rtmp = h2_request_clone(stream->pool, r);
667 }
668
669 static void set_error_response(h2_stream *stream, int http_status)
670 {
671     if (!h2_stream_is_ready(stream)) {
672         conn_rec *c = stream->session->c;
673         apr_bucket *b;
674         h2_headers *response;
675         
676         response = h2_headers_die(http_status, stream->request, stream->pool);
677         prep_output(stream);
678         b = apr_bucket_eos_create(c->bucket_alloc);
679         APR_BRIGADE_INSERT_HEAD(stream->out_buffer, b);
680         b = h2_bucket_headers_create(c->bucket_alloc, response);
681         APR_BRIGADE_INSERT_HEAD(stream->out_buffer, b);
682     }
683 }
684
685 static apr_status_t add_trailer(h2_stream *stream,
686                                 const char *name, size_t nlen,
687                                 const char *value, size_t vlen)
688 {
689     conn_rec *c = stream->session->c;
690     char *hname, *hvalue;
691
692     if (nlen == 0 || name[0] == ':') {
693         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, APR_EINVAL, c, 
694                       H2_STRM_LOG(APLOGNO(03060), stream, 
695                       "pseudo header in trailer"));
696         return APR_EINVAL;
697     }
698     if (h2_req_ignore_trailer(name, nlen)) {
699         return APR_SUCCESS;
700     }
701     if (!stream->trailers) {
702         stream->trailers = apr_table_make(stream->pool, 5);
703     }
704     hname = apr_pstrndup(stream->pool, name, nlen);
705     hvalue = apr_pstrndup(stream->pool, value, vlen);
706     h2_util_camel_case_header(hname, nlen);
707     apr_table_mergen(stream->trailers, hname, hvalue);
708     
709     return APR_SUCCESS;
710 }
711
712 apr_status_t h2_stream_add_header(h2_stream *stream,
713                                   const char *name, size_t nlen,
714                                   const char *value, size_t vlen)
715 {
716     h2_session *session = stream->session;
717     int error = 0;
718     apr_status_t status;
719     
720     if (stream->has_response) {
721         return APR_EINVAL;    
722     }
723     ++stream->request_headers_added;
724     if (name[0] == ':') {
725         if ((vlen) > session->s->limit_req_line) {
726             /* pseudo header: approximation of request line size check */
727             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
728                           H2_STRM_MSG(stream, "pseudo %s too long"), name);
729             error = HTTP_REQUEST_URI_TOO_LARGE;
730         }
731     }
732     else if ((nlen + 2 + vlen) > session->s->limit_req_fieldsize) {
733         /* header too long */
734         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
735                       H2_STRM_MSG(stream, "header %s too long"), name);
736         error = HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE;
737     }
738     
739     if (stream->request_headers_added > session->s->limit_req_fields + 4) {
740         /* too many header lines, include 4 pseudo headers */
741         if (stream->request_headers_added 
742             > session->s->limit_req_fields + 4 + 100) {
743             /* yeah, right */
744             h2_stream_rst(stream, H2_ERR_ENHANCE_YOUR_CALM);
745             return APR_ECONNRESET;
746         }
747         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
748                       H2_STRM_MSG(stream, "too many header lines")); 
749         error = HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE;
750     }
751     
752     if (error) {
753         set_error_response(stream, error);
754         return APR_EINVAL; 
755     }
756     else if (H2_SS_IDLE == stream->state) {
757         if (!stream->rtmp) {
758             stream->rtmp = h2_req_create(stream->id, stream->pool, 
759                                          NULL, NULL, NULL, NULL, NULL, 0);
760         }
761         status = h2_request_add_header(stream->rtmp, stream->pool,
762                                        name, nlen, value, vlen);
763     }
764     else if (H2_SS_OPEN == stream->state) {
765         status = add_trailer(stream, name, nlen, value, vlen);
766     }
767     else {
768         status = APR_EINVAL;
769     }
770     
771     if (status != APR_SUCCESS) {
772         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
773                       H2_STRM_MSG(stream, "header %s not accepted"), name);
774         h2_stream_dispatch(stream, H2_SEV_CANCELLED);
775     }
776     return status;
777 }
778
779 static apr_bucket *get_first_headers_bucket(apr_bucket_brigade *bb)
780 {
781     if (bb) {
782         apr_bucket *b = APR_BRIGADE_FIRST(bb);
783         while (b != APR_BRIGADE_SENTINEL(bb)) {
784             if (H2_BUCKET_IS_HEADERS(b)) {
785                 return b;
786             }
787             b = APR_BUCKET_NEXT(b);
788         }
789     }
790     return NULL;
791 }
792
793 static apr_status_t add_buffered_data(h2_stream *stream, apr_off_t requested,
794                                       apr_off_t *plen, int *peos, int *is_all, 
795                                       h2_headers **pheaders)
796 {
797     apr_bucket *b, *e;
798     
799     *peos = 0;
800     *plen = 0;
801     *is_all = 0;
802     if (pheaders) {
803         *pheaders = NULL;
804     }
805
806     H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "add_buffered_data");
807     b = APR_BRIGADE_FIRST(stream->out_buffer);
808     while (b != APR_BRIGADE_SENTINEL(stream->out_buffer)) {
809         e = APR_BUCKET_NEXT(b);
810         if (APR_BUCKET_IS_METADATA(b)) {
811             if (APR_BUCKET_IS_FLUSH(b)) {
812                 APR_BUCKET_REMOVE(b);
813                 apr_bucket_destroy(b);
814             }
815             else if (APR_BUCKET_IS_EOS(b)) {
816                 *peos = 1;
817                 return APR_SUCCESS;
818             }
819             else if (H2_BUCKET_IS_HEADERS(b)) {
820                 if (*plen > 0) {
821                     /* data before the response, can only return up to here */
822                     return APR_SUCCESS;
823                 }
824                 else if (pheaders) {
825                     *pheaders = h2_bucket_headers_get(b);
826                     APR_BUCKET_REMOVE(b);
827                     apr_bucket_destroy(b);
828                     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
829                                   H2_STRM_MSG(stream, "prep, -> response %d"), 
830                                   (*pheaders)->status);
831                     return APR_SUCCESS;
832                 }
833                 else {
834                     return APR_EAGAIN;
835                 }
836             }
837         }
838         else if (b->length == 0) {
839             APR_BUCKET_REMOVE(b);
840             apr_bucket_destroy(b);
841         }
842         else {
843             ap_assert(b->length != (apr_size_t)-1);
844             *plen += b->length;
845             if (*plen >= requested) {
846                 *plen = requested;
847                 return APR_SUCCESS;
848             }
849         }
850         b = e;
851     }
852     *is_all = 1;
853     return APR_SUCCESS;
854 }
855
856 apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen, 
857                                    int *peos, h2_headers **pheaders)
858 {
859     apr_status_t status = APR_SUCCESS;
860     apr_off_t requested, missing, max_chunk = H2_DATA_CHUNK_SIZE;
861     conn_rec *c;
862     int complete;
863
864     ap_assert(stream);
865     
866     if (stream->rst_error) {
867         *plen = 0;
868         *peos = 1;
869         return APR_ECONNRESET;
870     }
871     
872     c = stream->session->c;
873     prep_output(stream);
874
875     /* determine how much we'd like to send. We cannot send more than
876      * is requested. But we can reduce the size in case the master
877      * connection operates in smaller chunks. (TSL warmup) */
878     if (stream->session->io.write_size > 0) {
879         max_chunk = stream->session->io.write_size - 9; /* header bits */ 
880     }
881     requested = (*plen > 0)? H2MIN(*plen, max_chunk) : max_chunk;
882     
883     /* count the buffered data until eos or a headers bucket */
884     status = add_buffered_data(stream, requested, plen, peos, &complete, pheaders);
885     
886     if (status == APR_EAGAIN) {
887         /* TODO: ugly, someone needs to retrieve the response first */
888         h2_mplx_keep_active(stream->session->mplx, stream);
889         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
890                       H2_STRM_MSG(stream, "prep, response eagain"));
891         return status;
892     }
893     else if (status != APR_SUCCESS) {
894         return status;
895     }
896     
897     if (pheaders && *pheaders) {
898         return APR_SUCCESS;
899     }
900     
901     /* If there we do not have enough buffered data to satisfy the requested
902      * length *and* we counted the _complete_ buffer (and did not stop in the middle
903      * because of meta data there), lets see if we can read more from the
904      * output beam */
905     missing = H2MIN(requested, stream->max_mem) - *plen;
906     if (complete && !*peos && missing > 0) {
907         apr_status_t rv = APR_EOF;
908         
909         if (stream->output) {
910             H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "pre");
911             rv = h2_beam_receive(stream->output, stream->out_buffer, 
912                                  APR_NONBLOCK_READ, stream->max_mem - *plen);
913             H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "post");
914         }
915         
916         if (rv == APR_SUCCESS) {
917             /* count the buffer again, now that we have read output */
918             status = add_buffered_data(stream, requested, plen, peos, &complete, pheaders);
919         }
920         else if (APR_STATUS_IS_EOF(rv)) {
921             apr_bucket *eos = apr_bucket_eos_create(c->bucket_alloc);
922             APR_BRIGADE_INSERT_TAIL(stream->out_buffer, eos);
923             *peos = 1;
924         }
925         else if (APR_STATUS_IS_EAGAIN(rv)) {
926             /* we set this is the status of this call only if there
927              * is no buffered data, see check below */
928         }
929         else {
930             /* real error reading. Give this back directly, even though
931              * we may have something buffered. */
932             status = rv;
933         }
934     }
935     
936     if (status == APR_SUCCESS) {
937         if (*peos || *plen) {
938             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
939                           H2_STRM_MSG(stream, "prepare, len=%ld eos=%d"),
940                           (long)*plen, *peos);
941         }
942         else {
943             status = APR_EAGAIN;
944             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
945                           H2_STRM_MSG(stream, "prepare, no data"));
946         }
947     }
948     return status;
949 }
950
951 static int is_not_headers(apr_bucket *b)
952 {
953     return !H2_BUCKET_IS_HEADERS(b);
954 }
955
956 apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb, 
957                                apr_off_t *plen, int *peos)
958 {
959     conn_rec *c = stream->session->c;
960     apr_status_t status = APR_SUCCESS;
961
962     if (stream->rst_error) {
963         return APR_ECONNRESET;
964     }
965     status = h2_append_brigade(bb, stream->out_buffer, plen, peos, is_not_headers);
966     if (status == APR_SUCCESS && !*peos && !*plen) {
967         status = APR_EAGAIN;
968     }
969     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c,
970                   H2_STRM_MSG(stream, "read_to, len=%ld eos=%d"),
971                   (long)*plen, *peos);
972     return status;
973 }
974
975
976 apr_status_t h2_stream_submit_pushes(h2_stream *stream, h2_headers *response)
977 {
978     apr_status_t status = APR_SUCCESS;
979     apr_array_header_t *pushes;
980     int i;
981     
982     pushes = h2_push_collect_update(stream, stream->request, response);
983     if (pushes && !apr_is_empty_array(pushes)) {
984         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
985                       H2_STRM_MSG(stream, "found %d push candidates"),
986                       pushes->nelts);
987         for (i = 0; i < pushes->nelts; ++i) {
988             h2_push *push = APR_ARRAY_IDX(pushes, i, h2_push*);
989             h2_stream *s = h2_session_push(stream->session, stream, push);
990             if (!s) {
991                 status = APR_ECONNRESET;
992                 break;
993             }
994         }
995     }
996     return status;
997 }
998
999 apr_table_t *h2_stream_get_trailers(h2_stream *stream)
1000 {
1001     return NULL;
1002 }
1003
1004 const h2_priority *h2_stream_get_priority(h2_stream *stream, 
1005                                           h2_headers *response)
1006 {
1007     if (response && stream->initiated_on) {
1008         const char *ctype = apr_table_get(response->headers, "content-type");
1009         if (ctype) {
1010             /* FIXME: Not good enough, config needs to come from request->server */
1011             return h2_config_get_priority(stream->session->config, ctype);
1012         }
1013     }
1014     return NULL;
1015 }
1016
1017 int h2_stream_is_ready(h2_stream *stream)
1018 {
1019     if (stream->has_response) {
1020         return 1;
1021     }
1022     else if (stream->out_buffer && get_first_headers_bucket(stream->out_buffer)) {
1023         return 1;
1024     }
1025     return 0;
1026 }
1027
1028 int h2_stream_was_closed(const h2_stream *stream)
1029 {
1030     switch (stream->state) {
1031         case H2_SS_CLOSED:
1032         case H2_SS_CLEANUP:
1033             return 1;
1034         default:
1035             return 0;
1036     }
1037 }
1038
1039 apr_status_t h2_stream_in_consumed(h2_stream *stream, apr_off_t amount)
1040 {
1041     h2_session *session = stream->session;
1042     
1043     if (amount > 0) {
1044         apr_off_t consumed = amount;
1045         
1046         while (consumed > 0) {
1047             int len = (consumed > INT_MAX)? INT_MAX : (int)consumed;
1048             nghttp2_session_consume(session->ngh2, stream->id, len);
1049             consumed -= len;
1050         }
1051
1052 #ifdef H2_NG2_LOCAL_WIN_SIZE
1053         if (1) {
1054             int cur_size = nghttp2_session_get_stream_local_window_size(
1055                 session->ngh2, stream->id);
1056             int win = stream->in_window_size;
1057             int thigh = win * 8/10;
1058             int tlow = win * 2/10;
1059             const int win_max = 2*1024*1024;
1060             const int win_min = 32*1024;
1061             
1062             /* Work in progress, probably should add directives for these
1063              * values once this stabilizes somewhat. The general idea is
1064              * to adapt stream window sizes if the input window changes
1065              * a) very quickly (< good RTT) from full to empty
1066              * b) only a little bit (> bad RTT)
1067              * where in a) it grows and in b) it shrinks again.
1068              */
1069             if (cur_size > thigh && amount > thigh && win < win_max) {
1070                 /* almost empty again with one reported consumption, how
1071                  * long did this take? */
1072                 long ms = apr_time_msec(apr_time_now() - stream->in_last_write);
1073                 if (ms < 40) {
1074                     win = H2MIN(win_max, win + (64*1024));
1075                 }
1076             }
1077             else if (cur_size < tlow && amount < tlow && win > win_min) {
1078                 /* staying full, for how long already? */
1079                 long ms = apr_time_msec(apr_time_now() - stream->in_last_write);
1080                 if (ms > 700) {
1081                     win = H2MAX(win_min, win - (32*1024));
1082                 }
1083             }
1084             
1085             if (win != stream->in_window_size) {
1086                 stream->in_window_size = win;
1087                 nghttp2_session_set_local_window_size(session->ngh2, 
1088                         NGHTTP2_FLAG_NONE, stream->id, win);
1089             } 
1090             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
1091                           "h2_stream(%ld-%d): consumed %ld bytes, window now %d/%d",
1092                           session->id, stream->id, (long)amount, 
1093                           cur_size, stream->in_window_size);
1094         }
1095 #endif
1096     }
1097     return APR_SUCCESS;   
1098 }
1099