]> granicus.if.org Git - apache/blob - modules/http2/h2_stream.c
f457eb49ff8e725c41a4aef3ae182309c855f1c1
[apache] / modules / http2 / h2_stream.c
1 /* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <assert.h>
17 #include <stddef.h>
18
19 #include <httpd.h>
20 #include <http_core.h>
21 #include <http_connection.h>
22 #include <http_log.h>
23
24 #include <nghttp2/nghttp2.h>
25
26 #include "h2_private.h"
27 #include "h2.h"
28 #include "h2_bucket_beam.h"
29 #include "h2_conn.h"
30 #include "h2_config.h"
31 #include "h2_h2.h"
32 #include "h2_filter.h"
33 #include "h2_mplx.h"
34 #include "h2_push.h"
35 #include "h2_request.h"
36 #include "h2_response.h"
37 #include "h2_session.h"
38 #include "h2_stream.h"
39 #include "h2_task.h"
40 #include "h2_ctx.h"
41 #include "h2_task.h"
42 #include "h2_util.h"
43
44
45 static int state_transition[][7] = {
46     /*  ID OP RL RR CI CO CL */
47 /*ID*/{  1, 0, 0, 0, 0, 0, 0 },
48 /*OP*/{  1, 1, 0, 0, 0, 0, 0 },
49 /*RL*/{  0, 0, 1, 0, 0, 0, 0 },
50 /*RR*/{  0, 0, 0, 1, 0, 0, 0 },
51 /*CI*/{  1, 1, 0, 0, 1, 0, 0 },
52 /*CO*/{  1, 1, 0, 0, 0, 1, 0 },
53 /*CL*/{  1, 1, 0, 0, 1, 1, 1 },
54 };
55
56 static void H2_STREAM_OUT_LOG(int lvl, h2_stream *s, char *tag)
57 {
58     if (APLOG_C_IS_LEVEL(s->session->c, lvl)) {
59         conn_rec *c = s->session->c;
60         char buffer[4 * 1024];
61         const char *line = "(null)";
62         apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]);
63         
64         len = h2_util_bb_print(buffer, bmax, tag, "", s->buffer);
65         ap_log_cerror(APLOG_MARK, lvl, 0, c, "bb_dump(%ld-%d): %s", 
66                       c->id, s->id, len? buffer : line);
67     }
68 }
69
70 static int set_state(h2_stream *stream, h2_stream_state_t state)
71 {
72     int allowed = state_transition[state][stream->state];
73     if (allowed) {
74         stream->state = state;
75         return 1;
76     }
77     
78     ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c, APLOGNO(03081)
79                   "h2_stream(%ld-%d): invalid state transition from %d to %d", 
80                   stream->session->id, stream->id, stream->state, state);
81     return 0;
82 }
83
84 static int close_input(h2_stream *stream) 
85 {
86     switch (stream->state) {
87         case H2_STREAM_ST_CLOSED_INPUT:
88         case H2_STREAM_ST_CLOSED:
89             return 0; /* ignore, idempotent */
90         case H2_STREAM_ST_CLOSED_OUTPUT:
91             /* both closed now */
92             set_state(stream, H2_STREAM_ST_CLOSED);
93             break;
94         default:
95             /* everything else we jump to here */
96             set_state(stream, H2_STREAM_ST_CLOSED_INPUT);
97             break;
98     }
99     return 1;
100 }
101
102 static int input_closed(h2_stream *stream) 
103 {
104     switch (stream->state) {
105         case H2_STREAM_ST_OPEN:
106         case H2_STREAM_ST_CLOSED_OUTPUT:
107             return 0;
108         default:
109             return 1;
110     }
111 }
112
113 static int close_output(h2_stream *stream) 
114 {
115     switch (stream->state) {
116         case H2_STREAM_ST_CLOSED_OUTPUT:
117         case H2_STREAM_ST_CLOSED:
118             return 0; /* ignore, idempotent */
119         case H2_STREAM_ST_CLOSED_INPUT:
120             /* both closed now */
121             set_state(stream, H2_STREAM_ST_CLOSED);
122             break;
123         default:
124             /* everything else we jump to here */
125             set_state(stream, H2_STREAM_ST_CLOSED_OUTPUT);
126             break;
127     }
128     return 1;
129 }
130
131 static int input_open(const h2_stream *stream) 
132 {
133     switch (stream->state) {
134         case H2_STREAM_ST_OPEN:
135         case H2_STREAM_ST_CLOSED_OUTPUT:
136             return 1;
137         default:
138             return 0;
139     }
140 }
141
142 static int output_open(h2_stream *stream) 
143 {
144     switch (stream->state) {
145         case H2_STREAM_ST_OPEN:
146         case H2_STREAM_ST_CLOSED_INPUT:
147             return 1;
148         default:
149             return 0;
150     }
151 }
152
153 static apr_status_t stream_pool_cleanup(void *ctx)
154 {
155     h2_stream *stream = ctx;
156     apr_status_t status;
157     
158     if (stream->input) {
159         h2_beam_destroy(stream->input);
160         stream->input = NULL;
161     }
162     if (stream->files) {
163         apr_file_t *file;
164         int i;
165         for (i = 0; i < stream->files->nelts; ++i) {
166             file = APR_ARRAY_IDX(stream->files, i, apr_file_t*);
167             status = apr_file_close(file);
168             ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, stream->session->c, 
169                           "h2_stream(%ld-%d): destroy, closed file %d", 
170                           stream->session->id, stream->id, i);
171         }
172         stream->files = NULL;
173     }
174     return APR_SUCCESS;
175 }
176
177 h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session,
178                           int initiated_on, const h2_request *creq)
179 {
180     h2_request *req;
181     h2_stream *stream = apr_pcalloc(pool, sizeof(h2_stream));
182     
183     stream->id        = id;
184     stream->created   = apr_time_now();
185     stream->state     = H2_STREAM_ST_IDLE;
186     stream->pool      = pool;
187     stream->session   = session;
188     set_state(stream, H2_STREAM_ST_OPEN);
189     
190     if (creq) {
191         /* take it into out pool and assure correct id's */
192         req = h2_request_clone(pool, creq);
193         req->id = id;
194         req->initiated_on = initiated_on;
195     }
196     else {
197         req = h2_req_create(id, pool, 
198                 h2_config_geti(session->config, H2_CONF_SER_HEADERS));
199     }
200     stream->request = req; 
201     
202     apr_pool_cleanup_register(pool, stream, stream_pool_cleanup, 
203                               apr_pool_cleanup_null);
204     ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03082)
205                   "h2_stream(%ld-%d): opened", session->id, stream->id);
206     return stream;
207 }
208
209 void h2_stream_cleanup(h2_stream *stream)
210 {
211     AP_DEBUG_ASSERT(stream);
212     if (stream->buffer) {
213         apr_brigade_cleanup(stream->buffer);
214     }
215     if (stream->input) {
216         apr_status_t status;
217         status = h2_beam_shutdown(stream->input, APR_NONBLOCK_READ, 1);
218         if (status == APR_EAGAIN) {
219             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, 
220                           "h2_stream(%ld-%d): wait on input shutdown", 
221                           stream->session->id, stream->id);
222             status = h2_beam_shutdown(stream->input, APR_BLOCK_READ, 1);
223             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c, 
224                           "h2_stream(%ld-%d): input shutdown returned", 
225                           stream->session->id, stream->id);
226         }
227     }
228 }
229
230 void h2_stream_destroy(h2_stream *stream)
231 {
232     AP_DEBUG_ASSERT(stream);
233     ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, stream->session->c, 
234                   "h2_stream(%ld-%d): destroy", 
235                   stream->session->id, stream->id);
236     if (stream->pool) {
237         apr_pool_destroy(stream->pool);
238     }
239 }
240
241 void h2_stream_eos_destroy(h2_stream *stream)
242 {
243     h2_session_stream_done(stream->session, stream);
244     /* stream possibly destroyed */
245 }
246
247 apr_pool_t *h2_stream_detach_pool(h2_stream *stream)
248 {
249     apr_pool_t *pool = stream->pool;
250     stream->pool = NULL;
251     return pool;
252 }
253
254 void h2_stream_rst(h2_stream *stream, int error_code)
255 {
256     stream->rst_error = error_code;
257     close_input(stream);
258     close_output(stream);
259     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
260                   "h2_stream(%ld-%d): reset, error=%d", 
261                   stream->session->id, stream->id, error_code);
262 }
263
264 struct h2_response *h2_stream_get_response(h2_stream *stream)
265 {
266     return stream->response;
267 }
268
269 struct h2_response *h2_stream_get_unsent_response(h2_stream *stream)
270 {
271     h2_response *unsent = (stream->last_sent? 
272                            stream->last_sent->next : stream->response);
273     if (unsent) {
274         stream->last_sent = unsent;
275     }
276     return unsent;
277 }
278
279 apr_status_t h2_stream_set_request(h2_stream *stream, request_rec *r)
280 {
281     apr_status_t status;
282     AP_DEBUG_ASSERT(stream);
283     if (stream->rst_error) {
284         return APR_ECONNRESET;
285     }
286     set_state(stream, H2_STREAM_ST_OPEN);
287     status = h2_request_rwrite(stream->request, stream->pool, r);
288     stream->request->serialize = h2_config_geti(h2_config_sget(r->server), 
289                                                 H2_CONF_SER_HEADERS);
290     ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, APLOGNO(03058)
291                   "h2_request(%d): rwrite %s host=%s://%s%s",
292                   stream->request->id, stream->request->method, 
293                   stream->request->scheme, stream->request->authority, 
294                   stream->request->path);
295
296     return status;
297 }
298
299 apr_status_t h2_stream_add_header(h2_stream *stream,
300                                   const char *name, size_t nlen,
301                                   const char *value, size_t vlen)
302 {
303     AP_DEBUG_ASSERT(stream);
304     if (!stream->response) {
305         if (name[0] == ':') {
306             if ((vlen) > stream->session->s->limit_req_line) {
307                 /* pseudo header: approximation of request line size check */
308                 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
309                               "h2_stream(%ld-%d): pseudo header %s too long", 
310                               stream->session->id, stream->id, name);
311                 return h2_stream_set_error(stream, 
312                                            HTTP_REQUEST_URI_TOO_LARGE);
313             }
314         }
315         else if ((nlen + 2 + vlen) > stream->session->s->limit_req_fieldsize) {
316             /* header too long */
317             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
318                           "h2_stream(%ld-%d): header %s too long", 
319                           stream->session->id, stream->id, name);
320             return h2_stream_set_error(stream, 
321                                        HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE);
322         }
323         
324         if (name[0] != ':') {
325             ++stream->request_headers_added;
326             if (stream->request_headers_added 
327                 > stream->session->s->limit_req_fields) {
328                 /* too many header lines */
329                 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
330                               "h2_stream(%ld-%d): too many header lines", 
331                               stream->session->id, stream->id);
332                 return h2_stream_set_error(stream, 
333                                            HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE);
334             }
335         }
336     }
337     
338     if (h2_stream_is_scheduled(stream)) {
339         return h2_request_add_trailer(stream->request, stream->pool,
340                                       name, nlen, value, vlen);
341     }
342     else {
343         if (!input_open(stream)) {
344             return APR_ECONNRESET;
345         }
346         return h2_request_add_header(stream->request, stream->pool,
347                                      name, nlen, value, vlen);
348     }
349 }
350
351 apr_status_t h2_stream_schedule(h2_stream *stream, int eos, int push_enabled, 
352                                 h2_stream_pri_cmp *cmp, void *ctx)
353 {
354     apr_status_t status;
355     AP_DEBUG_ASSERT(stream);
356     AP_DEBUG_ASSERT(stream->session);
357     AP_DEBUG_ASSERT(stream->session->mplx);
358     
359     if (!output_open(stream)) {
360         return APR_ECONNRESET;
361     }
362     if (stream->scheduled) {
363         return APR_EINVAL;
364     }
365     if (eos) {
366         close_input(stream);
367     }
368     
369     if (stream->response) {
370         /* already have a resonse, probably a HTTP error code */
371         return h2_mplx_process(stream->session->mplx, stream, cmp, ctx);
372     }
373     
374     /* Seeing the end-of-headers, we have everything we need to 
375      * start processing it.
376      */
377     status = h2_request_end_headers(stream->request, stream->pool, 
378                                     eos, push_enabled);
379     if (status == APR_SUCCESS) {
380         stream->request->body = !eos;
381         stream->scheduled = 1;
382         stream->input_remaining = stream->request->content_length;
383         
384         status = h2_mplx_process(stream->session->mplx, stream, cmp, ctx);
385         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
386                       "h2_stream(%ld-%d): scheduled %s %s://%s%s",
387                       stream->session->id, stream->id,
388                       stream->request->method, stream->request->scheme,
389                       stream->request->authority, stream->request->path);
390     }
391     else {
392         h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
393         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c,
394                       "h2_stream(%ld-%d): RST=2 (internal err) %s %s://%s%s",
395                       stream->session->id, stream->id,
396                       stream->request->method, stream->request->scheme,
397                       stream->request->authority, stream->request->path);
398     }
399     
400     return status;
401 }
402
403 int h2_stream_is_scheduled(const h2_stream *stream)
404 {
405     return stream->scheduled;
406 }
407
408 apr_status_t h2_stream_close_input(h2_stream *stream)
409 {
410     apr_status_t status = APR_SUCCESS;
411     
412     AP_DEBUG_ASSERT(stream);
413     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
414                   "h2_stream(%ld-%d): closing input",
415                   stream->session->id, stream->id);
416                   
417     if (stream->rst_error) {
418         return APR_ECONNRESET;
419     }
420     
421     if (close_input(stream) && stream->input) {
422         status = h2_beam_close(stream->input);
423     }
424     return status;
425 }
426
427 apr_status_t h2_stream_write_data(h2_stream *stream,
428                                   const char *data, size_t len, int eos)
429 {
430     conn_rec *c = stream->session->c;
431     apr_status_t status = APR_SUCCESS;
432     apr_bucket_brigade *tmp;
433     
434     AP_DEBUG_ASSERT(stream);
435     if (!stream->input) {
436         return APR_EOF;
437     }
438     if (input_closed(stream) || !stream->request->eoh) {
439         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
440                       "h2_stream(%ld-%d): writing denied, closed=%d, eoh=%d", 
441                       stream->session->id, stream->id, input_closed(stream),
442                       stream->request->eoh);
443         return APR_EINVAL;
444     }
445
446     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
447                   "h2_stream(%ld-%d): add %ld input bytes", 
448                   stream->session->id, stream->id, (long)len);
449
450     if (!stream->request->chunked) {
451         stream->input_remaining -= len;
452         if (stream->input_remaining < 0) {
453             ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, c,
454                           APLOGNO(02961) 
455                           "h2_stream(%ld-%d): got %ld more content bytes than announced "
456                           "in content-length header: %ld", 
457                           stream->session->id, stream->id,
458                           (long)stream->request->content_length, 
459                           -(long)stream->input_remaining);
460             h2_stream_rst(stream, H2_ERR_PROTOCOL_ERROR);
461             return APR_ECONNABORTED;
462         }
463     }
464     
465     tmp = apr_brigade_create(stream->pool, c->bucket_alloc);
466     apr_brigade_write(tmp, NULL, NULL, data, len);
467     if (eos) {
468         APR_BRIGADE_INSERT_TAIL(tmp, apr_bucket_eos_create(c->bucket_alloc)); 
469         close_input(stream);
470     }
471     status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ);
472     apr_brigade_destroy(tmp);
473     
474     stream->in_data_frames++;
475     stream->in_data_octets += len;
476     
477     return status;
478 }
479
480 void h2_stream_set_suspended(h2_stream *stream, int suspended)
481 {
482     AP_DEBUG_ASSERT(stream);
483     stream->suspended = !!suspended;
484     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
485                   "h2_stream(%ld-%d): suspended=%d",
486                   stream->session->id, stream->id, stream->suspended);
487 }
488
489 int h2_stream_is_suspended(const h2_stream *stream)
490 {
491     AP_DEBUG_ASSERT(stream);
492     return stream->suspended;
493 }
494
495 static apr_status_t fill_buffer(h2_stream *stream, apr_size_t amount)
496 {
497     conn_rec *c = stream->session->c;
498     apr_bucket *b;
499     apr_status_t status;
500     
501     if (!stream->output) {
502         return APR_EOF;
503     }
504     status = h2_beam_receive(stream->output, stream->buffer, 
505                              APR_NONBLOCK_READ, amount);
506     /* The buckets we reveive are using the stream->buffer pool as
507      * lifetime which is exactly what we want since this is stream->pool.
508      *
509      * However: when we send these buckets down the core output filters, the
510      * filter might decide to setaside them into a pool of its own. And it
511      * might decide, after having sent the buckets, to clear its pool.
512      *
513      * This is problematic for file buckets because it then closed the contained
514      * file. Any split off buckets we sent afterwards will result in a 
515      * APR_EBADF.
516      */
517     for (b = APR_BRIGADE_FIRST(stream->buffer);
518          b != APR_BRIGADE_SENTINEL(stream->buffer);
519          b = APR_BUCKET_NEXT(b)) {
520         if (APR_BUCKET_IS_FILE(b)) {
521             apr_bucket_file *f = (apr_bucket_file *)b->data;
522             apr_pool_t *fpool = apr_file_pool_get(f->fd);
523             if (fpool != c->pool) {
524                 apr_bucket_setaside(b, c->pool);
525                 if (!stream->files) {
526                     stream->files = apr_array_make(stream->pool, 
527                                                    5, sizeof(apr_file_t*));
528                 }
529                 APR_ARRAY_PUSH(stream->files, apr_file_t*) = f->fd;
530             }
531         }
532     }
533     return status;
534 }
535
536 apr_status_t h2_stream_add_response(h2_stream *stream, h2_response *response,
537                                     h2_bucket_beam *output)
538 {
539     apr_status_t status = APR_SUCCESS;
540     conn_rec *c = stream->session->c;
541     h2_response **pr = &stream->response;
542     
543     if (!output_open(stream)) {
544         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
545                       "h2_stream(%ld-%d): output closed", 
546                       stream->session->id, stream->id);
547         return APR_ECONNRESET;
548     }
549     if (stream->submitted) {
550         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
551                       "h2_stream(%ld-%d): already submitted final response", 
552                       stream->session->id, stream->id);
553         return APR_ECONNRESET;
554     }
555     
556     /* append */
557     while (*pr) {
558         pr = &((*pr)->next);
559     }
560     *pr = response;
561     
562     if (h2_response_is_final(response)) {
563         stream->output = output;
564         stream->buffer = apr_brigade_create(stream->pool, c->bucket_alloc);
565         
566         h2_stream_filter(stream);
567         if (stream->output) {
568             status = fill_buffer(stream, 0);
569         }
570     }
571     
572     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
573                   "h2_stream(%ld-%d): set_response(%d)", 
574                   stream->session->id, stream->id, 
575                   stream->response->http_status);
576     return status;
577 }
578
579 apr_status_t h2_stream_set_error(h2_stream *stream, int http_status)
580 {
581     h2_response *response;
582     
583     if (stream->submitted) {
584         return APR_EINVAL;
585     }
586     response = h2_response_die(stream->id, http_status, stream->request, 
587                                stream->pool);
588     return h2_stream_add_response(stream, response, NULL);
589 }
590
591 static const apr_size_t DATA_CHUNK_SIZE = ((16*1024) - 100 - 9); 
592
593 apr_status_t h2_stream_out_prepare(h2_stream *stream,
594                                    apr_off_t *plen, int *peos)
595 {
596     conn_rec *c = stream->session->c;
597     apr_status_t status = APR_SUCCESS;
598     apr_off_t requested;
599
600     if (stream->rst_error) {
601         *plen = 0;
602         *peos = 1;
603         return APR_ECONNRESET;
604     }
605
606     if (!stream->buffer) {
607         return APR_EAGAIN;
608     }
609     
610     if (*plen > 0) {
611         requested = H2MIN(*plen, DATA_CHUNK_SIZE);
612     }
613     else {
614         requested = DATA_CHUNK_SIZE;
615     }
616     *plen = requested;
617     
618     H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "h2_stream_out_prepare_pre");
619     h2_util_bb_avail(stream->buffer, plen, peos);
620     if (!*peos && *plen < requested) {
621         /* try to get more data */
622         status = fill_buffer(stream, (requested - *plen) + DATA_CHUNK_SIZE);
623         if (APR_STATUS_IS_EOF(status)) {
624             apr_bucket *eos = apr_bucket_eos_create(c->bucket_alloc);
625             APR_BRIGADE_INSERT_TAIL(stream->buffer, eos);
626             status = APR_SUCCESS;
627         }
628         else if (status == APR_EAGAIN) {
629             /* did not receive more, it's ok */
630             status = APR_SUCCESS;
631         }
632         *plen = requested;
633         h2_util_bb_avail(stream->buffer, plen, peos);
634     }
635     H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "h2_stream_out_prepare_post");
636     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
637                   "h2_stream(%ld-%d): prepare, len=%ld eos=%d, trailers=%s",
638                   c->id, stream->id, (long)*plen, *peos,
639                   (stream->response && stream->response->trailers)? 
640                   "yes" : "no");
641     if (!*peos && !*plen && status == APR_SUCCESS) {
642         return APR_EAGAIN;
643     }
644     return status;
645 }
646
647
648 apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb, 
649                                apr_off_t *plen, int *peos)
650 {
651     conn_rec *c = stream->session->c;
652     apr_status_t status = APR_SUCCESS;
653
654     if (stream->rst_error) {
655         return APR_ECONNRESET;
656     }
657     status = h2_append_brigade(bb, stream->buffer, plen, peos);
658     if (status == APR_SUCCESS && !*peos && !*plen) {
659         status = APR_EAGAIN;
660     }
661     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c,
662                   "h2_stream(%ld-%d): read_to, len=%ld eos=%d",
663                   c->id, stream->id, (long)*plen, *peos);
664     return status;
665 }
666
667
668 int h2_stream_input_is_open(const h2_stream *stream) 
669 {
670     return input_open(stream);
671 }
672
673 int h2_stream_needs_submit(const h2_stream *stream)
674 {
675     switch (stream->state) {
676         case H2_STREAM_ST_OPEN:
677         case H2_STREAM_ST_CLOSED_INPUT:
678         case H2_STREAM_ST_CLOSED_OUTPUT:
679         case H2_STREAM_ST_CLOSED:
680             return !stream->submitted;
681         default:
682             return 0;
683     }
684 }
685
686 apr_status_t h2_stream_submit_pushes(h2_stream *stream)
687 {
688     apr_status_t status = APR_SUCCESS;
689     apr_array_header_t *pushes;
690     int i;
691     
692     pushes = h2_push_collect_update(stream, stream->request, 
693                                     stream->response);
694     if (pushes && !apr_is_empty_array(pushes)) {
695         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
696                       "h2_stream(%ld-%d): found %d push candidates",
697                       stream->session->id, stream->id, pushes->nelts);
698         for (i = 0; i < pushes->nelts; ++i) {
699             h2_push *push = APR_ARRAY_IDX(pushes, i, h2_push*);
700             h2_stream *s = h2_session_push(stream->session, stream, push);
701             if (!s) {
702                 status = APR_ECONNRESET;
703                 break;
704             }
705         }
706     }
707     return status;
708 }
709
710 apr_table_t *h2_stream_get_trailers(h2_stream *stream)
711 {
712     return stream->response? stream->response->trailers : NULL;
713 }
714
715 const h2_priority *h2_stream_get_priority(h2_stream *stream)
716 {
717     if (stream->response && stream->request && stream->request->initiated_on) {
718         const char *ctype = apr_table_get(stream->response->headers, "content-type");
719         if (ctype) {
720             /* FIXME: Not good enough, config needs to come from request->server */
721             return h2_config_get_priority(stream->session->config, ctype);
722         }
723     }
724     return NULL;
725 }
726
727 const char *h2_stream_state_str(h2_stream *stream)
728 {
729     switch (stream->state) {
730         case H2_STREAM_ST_IDLE:
731             return "IDLE";
732         case H2_STREAM_ST_OPEN:
733             return "OPEN";
734         case H2_STREAM_ST_RESV_LOCAL:
735             return "RESERVED_LOCAL";
736         case H2_STREAM_ST_RESV_REMOTE:
737             return "RESERVED_REMOTE";
738         case H2_STREAM_ST_CLOSED_INPUT:
739             return "HALF_CLOSED_REMOTE";
740         case H2_STREAM_ST_CLOSED_OUTPUT:
741             return "HALF_CLOSED_LOCAL";
742         case H2_STREAM_ST_CLOSED:
743             return "CLOSED";
744         default:
745             return "UNKNOWN";
746             
747     }
748 }
749