1 /* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
20 #include <http_core.h>
21 #include <http_connection.h>
24 #include <nghttp2/nghttp2.h>
26 #include "h2_private.h"
28 #include "h2_bucket_beam.h"
30 #include "h2_config.h"
32 #include "h2_filter.h"
35 #include "h2_request.h"
36 #include "h2_response.h"
37 #include "h2_session.h"
38 #include "h2_stream.h"
45 static int state_transition[][7] = {
46 /* ID OP RL RR CI CO CL */
47 /*ID*/{ 1, 0, 0, 0, 0, 0, 0 },
48 /*OP*/{ 1, 1, 0, 0, 0, 0, 0 },
49 /*RL*/{ 0, 0, 1, 0, 0, 0, 0 },
50 /*RR*/{ 0, 0, 0, 1, 0, 0, 0 },
51 /*CI*/{ 1, 1, 0, 0, 1, 0, 0 },
52 /*CO*/{ 1, 1, 0, 0, 0, 1, 0 },
53 /*CL*/{ 1, 1, 0, 0, 1, 1, 1 },
56 static void H2_STREAM_OUT_LOG(int lvl, h2_stream *s, char *tag)
58 if (APLOG_C_IS_LEVEL(s->session->c, lvl)) {
59 conn_rec *c = s->session->c;
60 char buffer[4 * 1024];
61 const char *line = "(null)";
62 apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]);
64 len = h2_util_bb_print(buffer, bmax, tag, "", s->buffer);
65 ap_log_cerror(APLOG_MARK, lvl, 0, c, "bb_dump(%ld-%d): %s",
66 c->id, s->id, len? buffer : line);
70 static int set_state(h2_stream *stream, h2_stream_state_t state)
72 int allowed = state_transition[state][stream->state];
74 stream->state = state;
78 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c, APLOGNO(03081)
79 "h2_stream(%ld-%d): invalid state transition from %d to %d",
80 stream->session->id, stream->id, stream->state, state);
84 static int close_input(h2_stream *stream)
86 switch (stream->state) {
87 case H2_STREAM_ST_CLOSED_INPUT:
88 case H2_STREAM_ST_CLOSED:
89 return 0; /* ignore, idempotent */
90 case H2_STREAM_ST_CLOSED_OUTPUT:
92 set_state(stream, H2_STREAM_ST_CLOSED);
95 /* everything else we jump to here */
96 set_state(stream, H2_STREAM_ST_CLOSED_INPUT);
102 static int input_closed(h2_stream *stream)
104 switch (stream->state) {
105 case H2_STREAM_ST_OPEN:
106 case H2_STREAM_ST_CLOSED_OUTPUT:
113 static int close_output(h2_stream *stream)
115 switch (stream->state) {
116 case H2_STREAM_ST_CLOSED_OUTPUT:
117 case H2_STREAM_ST_CLOSED:
118 return 0; /* ignore, idempotent */
119 case H2_STREAM_ST_CLOSED_INPUT:
120 /* both closed now */
121 set_state(stream, H2_STREAM_ST_CLOSED);
124 /* everything else we jump to here */
125 set_state(stream, H2_STREAM_ST_CLOSED_OUTPUT);
131 static int input_open(const h2_stream *stream)
133 switch (stream->state) {
134 case H2_STREAM_ST_OPEN:
135 case H2_STREAM_ST_CLOSED_OUTPUT:
142 static int output_open(h2_stream *stream)
144 switch (stream->state) {
145 case H2_STREAM_ST_OPEN:
146 case H2_STREAM_ST_CLOSED_INPUT:
153 static apr_status_t stream_pool_cleanup(void *ctx)
155 h2_stream *stream = ctx;
159 h2_beam_destroy(stream->input);
160 stream->input = NULL;
165 for (i = 0; i < stream->files->nelts; ++i) {
166 file = APR_ARRAY_IDX(stream->files, i, apr_file_t*);
167 status = apr_file_close(file);
168 ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, stream->session->c,
169 "h2_stream(%ld-%d): destroy, closed file %d",
170 stream->session->id, stream->id, i);
172 stream->files = NULL;
177 h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session,
178 int initiated_on, const h2_request *creq)
181 h2_stream *stream = apr_pcalloc(pool, sizeof(h2_stream));
184 stream->created = apr_time_now();
185 stream->state = H2_STREAM_ST_IDLE;
187 stream->session = session;
188 set_state(stream, H2_STREAM_ST_OPEN);
191 /* take it into out pool and assure correct id's */
192 req = h2_request_clone(pool, creq);
194 req->initiated_on = initiated_on;
197 req = h2_req_create(id, pool,
198 h2_config_geti(session->config, H2_CONF_SER_HEADERS));
200 stream->request = req;
202 apr_pool_cleanup_register(pool, stream, stream_pool_cleanup,
203 apr_pool_cleanup_null);
204 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03082)
205 "h2_stream(%ld-%d): opened", session->id, stream->id);
209 void h2_stream_cleanup(h2_stream *stream)
211 AP_DEBUG_ASSERT(stream);
212 if (stream->buffer) {
213 apr_brigade_cleanup(stream->buffer);
217 status = h2_beam_shutdown(stream->input, APR_NONBLOCK_READ, 1);
218 if (status == APR_EAGAIN) {
219 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
220 "h2_stream(%ld-%d): wait on input shutdown",
221 stream->session->id, stream->id);
222 status = h2_beam_shutdown(stream->input, APR_BLOCK_READ, 1);
223 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c,
224 "h2_stream(%ld-%d): input shutdown returned",
225 stream->session->id, stream->id);
230 void h2_stream_destroy(h2_stream *stream)
232 AP_DEBUG_ASSERT(stream);
233 ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, stream->session->c,
234 "h2_stream(%ld-%d): destroy",
235 stream->session->id, stream->id);
237 apr_pool_destroy(stream->pool);
241 void h2_stream_eos_destroy(h2_stream *stream)
243 h2_session_stream_done(stream->session, stream);
244 /* stream possibly destroyed */
247 apr_pool_t *h2_stream_detach_pool(h2_stream *stream)
249 apr_pool_t *pool = stream->pool;
254 void h2_stream_rst(h2_stream *stream, int error_code)
256 stream->rst_error = error_code;
258 close_output(stream);
259 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
260 "h2_stream(%ld-%d): reset, error=%d",
261 stream->session->id, stream->id, error_code);
264 struct h2_response *h2_stream_get_response(h2_stream *stream)
266 return stream->response;
269 struct h2_response *h2_stream_get_unsent_response(h2_stream *stream)
271 h2_response *unsent = (stream->last_sent?
272 stream->last_sent->next : stream->response);
274 stream->last_sent = unsent;
279 apr_status_t h2_stream_set_request(h2_stream *stream, request_rec *r)
282 AP_DEBUG_ASSERT(stream);
283 if (stream->rst_error) {
284 return APR_ECONNRESET;
286 set_state(stream, H2_STREAM_ST_OPEN);
287 status = h2_request_rwrite(stream->request, stream->pool, r);
288 stream->request->serialize = h2_config_geti(h2_config_sget(r->server),
289 H2_CONF_SER_HEADERS);
290 ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, APLOGNO(03058)
291 "h2_request(%d): rwrite %s host=%s://%s%s",
292 stream->request->id, stream->request->method,
293 stream->request->scheme, stream->request->authority,
294 stream->request->path);
299 apr_status_t h2_stream_add_header(h2_stream *stream,
300 const char *name, size_t nlen,
301 const char *value, size_t vlen)
303 AP_DEBUG_ASSERT(stream);
304 if (!stream->response) {
305 if (name[0] == ':') {
306 if ((vlen) > stream->session->s->limit_req_line) {
307 /* pseudo header: approximation of request line size check */
308 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
309 "h2_stream(%ld-%d): pseudo header %s too long",
310 stream->session->id, stream->id, name);
311 return h2_stream_set_error(stream,
312 HTTP_REQUEST_URI_TOO_LARGE);
315 else if ((nlen + 2 + vlen) > stream->session->s->limit_req_fieldsize) {
316 /* header too long */
317 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
318 "h2_stream(%ld-%d): header %s too long",
319 stream->session->id, stream->id, name);
320 return h2_stream_set_error(stream,
321 HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE);
324 if (name[0] != ':') {
325 ++stream->request_headers_added;
326 if (stream->request_headers_added
327 > stream->session->s->limit_req_fields) {
328 /* too many header lines */
329 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
330 "h2_stream(%ld-%d): too many header lines",
331 stream->session->id, stream->id);
332 return h2_stream_set_error(stream,
333 HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE);
338 if (h2_stream_is_scheduled(stream)) {
339 return h2_request_add_trailer(stream->request, stream->pool,
340 name, nlen, value, vlen);
343 if (!input_open(stream)) {
344 return APR_ECONNRESET;
346 return h2_request_add_header(stream->request, stream->pool,
347 name, nlen, value, vlen);
351 apr_status_t h2_stream_schedule(h2_stream *stream, int eos, int push_enabled,
352 h2_stream_pri_cmp *cmp, void *ctx)
355 AP_DEBUG_ASSERT(stream);
356 AP_DEBUG_ASSERT(stream->session);
357 AP_DEBUG_ASSERT(stream->session->mplx);
359 if (!output_open(stream)) {
360 return APR_ECONNRESET;
362 if (stream->scheduled) {
369 if (stream->response) {
370 /* already have a resonse, probably a HTTP error code */
371 return h2_mplx_process(stream->session->mplx, stream, cmp, ctx);
374 /* Seeing the end-of-headers, we have everything we need to
375 * start processing it.
377 status = h2_request_end_headers(stream->request, stream->pool,
379 if (status == APR_SUCCESS) {
380 stream->request->body = !eos;
381 stream->scheduled = 1;
382 stream->input_remaining = stream->request->content_length;
384 status = h2_mplx_process(stream->session->mplx, stream, cmp, ctx);
385 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
386 "h2_stream(%ld-%d): scheduled %s %s://%s%s",
387 stream->session->id, stream->id,
388 stream->request->method, stream->request->scheme,
389 stream->request->authority, stream->request->path);
392 h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
393 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c,
394 "h2_stream(%ld-%d): RST=2 (internal err) %s %s://%s%s",
395 stream->session->id, stream->id,
396 stream->request->method, stream->request->scheme,
397 stream->request->authority, stream->request->path);
403 int h2_stream_is_scheduled(const h2_stream *stream)
405 return stream->scheduled;
408 apr_status_t h2_stream_close_input(h2_stream *stream)
410 apr_status_t status = APR_SUCCESS;
412 AP_DEBUG_ASSERT(stream);
413 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
414 "h2_stream(%ld-%d): closing input",
415 stream->session->id, stream->id);
417 if (stream->rst_error) {
418 return APR_ECONNRESET;
421 if (close_input(stream) && stream->input) {
422 status = h2_beam_close(stream->input);
427 apr_status_t h2_stream_write_data(h2_stream *stream,
428 const char *data, size_t len, int eos)
430 conn_rec *c = stream->session->c;
431 apr_status_t status = APR_SUCCESS;
432 apr_bucket_brigade *tmp;
434 AP_DEBUG_ASSERT(stream);
435 if (!stream->input) {
438 if (input_closed(stream) || !stream->request->eoh) {
439 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
440 "h2_stream(%ld-%d): writing denied, closed=%d, eoh=%d",
441 stream->session->id, stream->id, input_closed(stream),
442 stream->request->eoh);
446 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
447 "h2_stream(%ld-%d): add %ld input bytes",
448 stream->session->id, stream->id, (long)len);
450 if (!stream->request->chunked) {
451 stream->input_remaining -= len;
452 if (stream->input_remaining < 0) {
453 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, c,
455 "h2_stream(%ld-%d): got %ld more content bytes than announced "
456 "in content-length header: %ld",
457 stream->session->id, stream->id,
458 (long)stream->request->content_length,
459 -(long)stream->input_remaining);
460 h2_stream_rst(stream, H2_ERR_PROTOCOL_ERROR);
461 return APR_ECONNABORTED;
465 tmp = apr_brigade_create(stream->pool, c->bucket_alloc);
466 apr_brigade_write(tmp, NULL, NULL, data, len);
468 APR_BRIGADE_INSERT_TAIL(tmp, apr_bucket_eos_create(c->bucket_alloc));
471 status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ);
472 apr_brigade_destroy(tmp);
474 stream->in_data_frames++;
475 stream->in_data_octets += len;
480 void h2_stream_set_suspended(h2_stream *stream, int suspended)
482 AP_DEBUG_ASSERT(stream);
483 stream->suspended = !!suspended;
484 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
485 "h2_stream(%ld-%d): suspended=%d",
486 stream->session->id, stream->id, stream->suspended);
489 int h2_stream_is_suspended(const h2_stream *stream)
491 AP_DEBUG_ASSERT(stream);
492 return stream->suspended;
495 static apr_status_t fill_buffer(h2_stream *stream, apr_size_t amount)
497 conn_rec *c = stream->session->c;
501 if (!stream->output) {
504 status = h2_beam_receive(stream->output, stream->buffer,
505 APR_NONBLOCK_READ, amount);
506 /* The buckets we reveive are using the stream->buffer pool as
507 * lifetime which is exactly what we want since this is stream->pool.
509 * However: when we send these buckets down the core output filters, the
510 * filter might decide to setaside them into a pool of its own. And it
511 * might decide, after having sent the buckets, to clear its pool.
513 * This is problematic for file buckets because it then closed the contained
514 * file. Any split off buckets we sent afterwards will result in a
517 for (b = APR_BRIGADE_FIRST(stream->buffer);
518 b != APR_BRIGADE_SENTINEL(stream->buffer);
519 b = APR_BUCKET_NEXT(b)) {
520 if (APR_BUCKET_IS_FILE(b)) {
521 apr_bucket_file *f = (apr_bucket_file *)b->data;
522 apr_pool_t *fpool = apr_file_pool_get(f->fd);
523 if (fpool != c->pool) {
524 apr_bucket_setaside(b, c->pool);
525 if (!stream->files) {
526 stream->files = apr_array_make(stream->pool,
527 5, sizeof(apr_file_t*));
529 APR_ARRAY_PUSH(stream->files, apr_file_t*) = f->fd;
536 apr_status_t h2_stream_add_response(h2_stream *stream, h2_response *response,
537 h2_bucket_beam *output)
539 apr_status_t status = APR_SUCCESS;
540 conn_rec *c = stream->session->c;
541 h2_response **pr = &stream->response;
543 if (!output_open(stream)) {
544 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
545 "h2_stream(%ld-%d): output closed",
546 stream->session->id, stream->id);
547 return APR_ECONNRESET;
549 if (stream->submitted) {
550 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
551 "h2_stream(%ld-%d): already submitted final response",
552 stream->session->id, stream->id);
553 return APR_ECONNRESET;
562 if (h2_response_is_final(response)) {
563 stream->output = output;
564 stream->buffer = apr_brigade_create(stream->pool, c->bucket_alloc);
566 h2_stream_filter(stream);
567 if (stream->output) {
568 status = fill_buffer(stream, 0);
572 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
573 "h2_stream(%ld-%d): set_response(%d)",
574 stream->session->id, stream->id,
575 stream->response->http_status);
579 apr_status_t h2_stream_set_error(h2_stream *stream, int http_status)
581 h2_response *response;
583 if (stream->submitted) {
586 response = h2_response_die(stream->id, http_status, stream->request,
588 return h2_stream_add_response(stream, response, NULL);
591 static const apr_size_t DATA_CHUNK_SIZE = ((16*1024) - 100 - 9);
593 apr_status_t h2_stream_out_prepare(h2_stream *stream,
594 apr_off_t *plen, int *peos)
596 conn_rec *c = stream->session->c;
597 apr_status_t status = APR_SUCCESS;
600 if (stream->rst_error) {
603 return APR_ECONNRESET;
606 if (!stream->buffer) {
611 requested = H2MIN(*plen, DATA_CHUNK_SIZE);
614 requested = DATA_CHUNK_SIZE;
618 H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "h2_stream_out_prepare_pre");
619 h2_util_bb_avail(stream->buffer, plen, peos);
620 if (!*peos && *plen < requested) {
621 /* try to get more data */
622 status = fill_buffer(stream, (requested - *plen) + DATA_CHUNK_SIZE);
623 if (APR_STATUS_IS_EOF(status)) {
624 apr_bucket *eos = apr_bucket_eos_create(c->bucket_alloc);
625 APR_BRIGADE_INSERT_TAIL(stream->buffer, eos);
626 status = APR_SUCCESS;
628 else if (status == APR_EAGAIN) {
629 /* did not receive more, it's ok */
630 status = APR_SUCCESS;
633 h2_util_bb_avail(stream->buffer, plen, peos);
635 H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "h2_stream_out_prepare_post");
636 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
637 "h2_stream(%ld-%d): prepare, len=%ld eos=%d, trailers=%s",
638 c->id, stream->id, (long)*plen, *peos,
639 (stream->response && stream->response->trailers)?
641 if (!*peos && !*plen && status == APR_SUCCESS) {
648 apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb,
649 apr_off_t *plen, int *peos)
651 conn_rec *c = stream->session->c;
652 apr_status_t status = APR_SUCCESS;
654 if (stream->rst_error) {
655 return APR_ECONNRESET;
657 status = h2_append_brigade(bb, stream->buffer, plen, peos);
658 if (status == APR_SUCCESS && !*peos && !*plen) {
661 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c,
662 "h2_stream(%ld-%d): read_to, len=%ld eos=%d",
663 c->id, stream->id, (long)*plen, *peos);
668 int h2_stream_input_is_open(const h2_stream *stream)
670 return input_open(stream);
673 int h2_stream_needs_submit(const h2_stream *stream)
675 switch (stream->state) {
676 case H2_STREAM_ST_OPEN:
677 case H2_STREAM_ST_CLOSED_INPUT:
678 case H2_STREAM_ST_CLOSED_OUTPUT:
679 case H2_STREAM_ST_CLOSED:
680 return !stream->submitted;
686 apr_status_t h2_stream_submit_pushes(h2_stream *stream)
688 apr_status_t status = APR_SUCCESS;
689 apr_array_header_t *pushes;
692 pushes = h2_push_collect_update(stream, stream->request,
694 if (pushes && !apr_is_empty_array(pushes)) {
695 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
696 "h2_stream(%ld-%d): found %d push candidates",
697 stream->session->id, stream->id, pushes->nelts);
698 for (i = 0; i < pushes->nelts; ++i) {
699 h2_push *push = APR_ARRAY_IDX(pushes, i, h2_push*);
700 h2_stream *s = h2_session_push(stream->session, stream, push);
702 status = APR_ECONNRESET;
710 apr_table_t *h2_stream_get_trailers(h2_stream *stream)
712 return stream->response? stream->response->trailers : NULL;
715 const h2_priority *h2_stream_get_priority(h2_stream *stream)
717 if (stream->response && stream->request && stream->request->initiated_on) {
718 const char *ctype = apr_table_get(stream->response->headers, "content-type");
720 /* FIXME: Not good enough, config needs to come from request->server */
721 return h2_config_get_priority(stream->session->config, ctype);
727 const char *h2_stream_state_str(h2_stream *stream)
729 switch (stream->state) {
730 case H2_STREAM_ST_IDLE:
732 case H2_STREAM_ST_OPEN:
734 case H2_STREAM_ST_RESV_LOCAL:
735 return "RESERVED_LOCAL";
736 case H2_STREAM_ST_RESV_REMOTE:
737 return "RESERVED_REMOTE";
738 case H2_STREAM_ST_CLOSED_INPUT:
739 return "HALF_CLOSED_REMOTE";
740 case H2_STREAM_ST_CLOSED_OUTPUT:
741 return "HALF_CLOSED_LOCAL";
742 case H2_STREAM_ST_CLOSED: