1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2 * contributor license agreements. See the NOTICE file distributed with
3 * this work for additional information regarding copyright ownership.
4 * The ASF licenses this file to You under the Apache License, Version 2.0
5 * (the "License"); you may not use this file except in compliance with
6 * the License. You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 /* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
19 * Licensed under the Apache License, Version 2.0 (the "License");
20 * you may not use this file except in compliance with the License.
21 * You may obtain a copy of the License at
23 * http://www.apache.org/licenses/LICENSE-2.0
25 * Unless required by applicable law or agreed to in writing, software
26 * distributed under the License is distributed on an "AS IS" BASIS,
27 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
28 * See the License for the specific language governing permissions and
29 * limitations under the License.
35 #include <apr_strings.h>
38 #include <http_core.h>
39 #include <http_connection.h>
42 #include <nghttp2/nghttp2.h>
44 #include "h2_private.h"
46 #include "h2_bucket_beam.h"
48 #include "h2_config.h"
52 #include "h2_request.h"
53 #include "h2_headers.h"
54 #include "h2_session.h"
55 #include "h2_stream.h"
62 static const char *h2_ss_str(h2_stream_state_t state)
68 return "RESERVED_LOCAL";
70 return "RESERVED_REMOTE";
74 return "HALF_CLOSED_LOCAL";
76 return "HALF_CLOSED_REMOTE";
86 const char *h2_stream_state_str(h2_stream *stream)
88 return h2_ss_str(stream->state);
91 /* Abbreviations for stream transit tables */
92 #define S_XXX (-2) /* Programming Error */
93 #define S_ERR (-1) /* Protocol Error */
94 #define S_NOP (0) /* No Change */
95 #define S_IDL (H2_SS_IDL + 1)
96 #define S_RS_L (H2_SS_RSVD_L + 1)
97 #define S_RS_R (H2_SS_RSVD_R + 1)
98 #define S_OPEN (H2_SS_OPEN + 1)
99 #define S_CL_L (H2_SS_CLOSED_L + 1)
100 #define S_CL_R (H2_SS_CLOSED_R + 1)
101 #define S_CLS (H2_SS_CLOSED + 1)
102 #define S_CLN (H2_SS_CLEANUP + 1)
104 /* state transisitions when certain frame types are sent */
105 static int trans_on_send[][H2_SS_MAX] = {
106 /*S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, */
107 { S_ERR, S_ERR, S_ERR, S_NOP, S_NOP, S_ERR, S_NOP, S_NOP, },/* DATA */
108 { S_ERR, S_ERR, S_CL_R, S_NOP, S_NOP, S_ERR, S_NOP, S_NOP, },/* HEADERS */
109 { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* PRIORITY */
110 { S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, },/* RST_STREAM */
111 { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* SETTINGS */
112 { S_RS_L,S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* PUSH_PROMISE */
113 { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* PING */
114 { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* GOAWAY */
115 { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* WINDOW_UPDATE */
116 { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* CONT */
118 /* state transisitions when certain frame types are received */
119 static int trans_on_recv[][H2_SS_MAX] = {
120 /*S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, */
121 { S_ERR, S_ERR, S_ERR, S_NOP, S_ERR, S_NOP, S_NOP, S_NOP, },/* DATA */
122 { S_OPEN,S_CL_L, S_ERR, S_NOP, S_ERR, S_NOP, S_NOP, S_NOP, },/* HEADERS */
123 { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* PRIORITY */
124 { S_ERR, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, },/* RST_STREAM */
125 { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* SETTINGS */
126 { S_RS_R,S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* PUSH_PROMISE */
127 { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* PING */
128 { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* GOAWAY */
129 { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* WINDOW_UPDATE */
130 { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* CONT */
132 /* state transisitions when certain events happen */
133 static int trans_on_event[][H2_SS_MAX] = {
134 /*S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, */
135 { S_XXX, S_ERR, S_ERR, S_CL_L, S_CLS, S_XXX, S_XXX, S_XXX, },/* EV_CLOSED_L*/
136 { S_ERR, S_ERR, S_ERR, S_CL_R, S_ERR, S_CLS, S_NOP, S_NOP, },/* EV_CLOSED_R*/
137 { S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, },/* EV_CANCELLED*/
138 { S_NOP, S_XXX, S_XXX, S_XXX, S_XXX, S_CLS, S_CLN, S_XXX, },/* EV_EOS_SENT*/
141 static int on_map(h2_stream_state_t state, int map[H2_SS_MAX])
155 static int on_frame(h2_stream_state_t state, int frame_type,
156 int frame_map[][H2_SS_MAX], apr_size_t maxlen)
158 ap_assert(frame_type >= 0);
159 ap_assert(state >= 0);
160 if (frame_type >= maxlen) {
161 return state; /* NOP, ignore unknown frame types */
163 return on_map(state, frame_map[frame_type]);
166 static int on_frame_send(h2_stream_state_t state, int frame_type)
168 return on_frame(state, frame_type, trans_on_send, H2_ALEN(trans_on_send));
171 static int on_frame_recv(h2_stream_state_t state, int frame_type)
173 return on_frame(state, frame_type, trans_on_recv, H2_ALEN(trans_on_recv));
176 static int on_event(h2_stream* stream, h2_stream_event_t ev)
178 if (stream->monitor && stream->monitor->on_event) {
179 stream->monitor->on_event(stream->monitor->ctx, stream, ev);
181 if (ev < H2_ALEN(trans_on_event)) {
182 return on_map(stream->state, trans_on_event[ev]);
184 return stream->state;
187 static void H2_STREAM_OUT_LOG(int lvl, h2_stream *s, const char *tag)
189 if (APLOG_C_IS_LEVEL(s->session->c, lvl)) {
190 conn_rec *c = s->session->c;
191 char buffer[4 * 1024];
192 apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]);
194 len = h2_util_bb_print(buffer, bmax, tag, "", s->out_buffer);
195 ap_log_cerror(APLOG_MARK, lvl, 0, c,
196 H2_STRM_MSG(s, "out-buffer(%s)"), len? buffer : "empty");
200 static apr_status_t setup_input(h2_stream *stream) {
201 if (stream->input == NULL) {
202 int empty = (stream->input_eof
203 && (!stream->in_buffer
204 || APR_BRIGADE_EMPTY(stream->in_buffer)));
206 h2_beam_create(&stream->input, stream->pool, stream->id,
207 "input", H2_BEAM_OWNER_SEND, 0,
208 stream->session->s->timeout);
209 h2_beam_send_from(stream->input, stream->pool);
215 static apr_status_t close_input(h2_stream *stream)
217 conn_rec *c = stream->session->c;
218 apr_status_t status = APR_SUCCESS;
220 stream->input_eof = 1;
221 if (stream->input && h2_beam_is_closed(stream->input)) {
225 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
226 H2_STRM_MSG(stream, "closing input"));
227 if (stream->rst_error) {
228 return APR_ECONNRESET;
231 if (stream->trailers && !apr_is_empty_table(stream->trailers)) {
235 if (!stream->in_buffer) {
236 stream->in_buffer = apr_brigade_create(stream->pool, c->bucket_alloc);
239 r = h2_headers_create(HTTP_OK, stream->trailers, NULL, stream->pool);
240 stream->trailers = NULL;
241 b = h2_bucket_headers_create(c->bucket_alloc, r);
242 APR_BRIGADE_INSERT_TAIL(stream->in_buffer, b);
244 b = apr_bucket_eos_create(c->bucket_alloc);
245 APR_BRIGADE_INSERT_TAIL(stream->in_buffer, b);
247 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
248 H2_STRM_MSG(stream, "added trailers"));
249 h2_stream_dispatch(stream, H2_SEV_IN_DATA_PENDING);
252 h2_stream_flush_input(stream);
253 return h2_beam_close(stream->input);
258 static apr_status_t close_output(h2_stream *stream)
260 if (!stream->output || h2_beam_is_closed(stream->output)) {
263 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
264 H2_STRM_MSG(stream, "closing output"));
265 return h2_beam_leave(stream->output);
268 static void on_state_enter(h2_stream *stream)
270 if (stream->monitor && stream->monitor->on_state_enter) {
271 stream->monitor->on_state_enter(stream->monitor->ctx, stream);
275 static void on_state_event(h2_stream *stream, h2_stream_event_t ev)
277 if (stream->monitor && stream->monitor->on_state_event) {
278 stream->monitor->on_state_event(stream->monitor->ctx, stream, ev);
282 static void on_state_invalid(h2_stream *stream)
284 if (stream->monitor && stream->monitor->on_state_invalid) {
285 stream->monitor->on_state_invalid(stream->monitor->ctx, stream);
287 /* stream got an event/frame invalid in its state */
288 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
289 H2_STRM_MSG(stream, "invalid state event"));
290 switch (stream->state) {
296 h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
303 static apr_status_t transit(h2_stream *stream, int new_state)
305 if (new_state == stream->state) {
308 else if (new_state < 0) {
309 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c,
310 H2_STRM_LOG(APLOGNO(03081), stream, "invalid transition"));
311 on_state_invalid(stream);
315 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
316 H2_STRM_MSG(stream, "transit to [%s]"), h2_ss_str(new_state));
317 stream->state = new_state;
329 close_output(stream);
336 close_output(stream);
337 if (stream->out_buffer) {
338 apr_brigade_cleanup(stream->out_buffer);
344 on_state_enter(stream);
348 void h2_stream_set_monitor(h2_stream *stream, h2_stream_monitor *monitor)
350 stream->monitor = monitor;
353 void h2_stream_dispatch(h2_stream *stream, h2_stream_event_t ev)
357 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
358 H2_STRM_MSG(stream, "dispatch event %d"), ev);
359 new_state = on_event(stream, ev);
361 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c,
362 H2_STRM_LOG(APLOGNO(10002), stream, "invalid event %d"), ev);
363 on_state_invalid(stream);
364 AP_DEBUG_ASSERT(new_state > S_XXX);
367 else if (new_state == stream->state) {
369 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
370 H2_STRM_MSG(stream, "non-state event %d"), ev);
374 on_state_event(stream, ev);
375 transit(stream, new_state);
379 static void set_policy_for(h2_stream *stream, h2_request *r)
381 int enabled = h2_session_push_enabled(stream->session);
382 stream->push_policy = h2_push_policy_determine(r->headers, stream->pool,
384 r->serialize = h2_config_geti(stream->session->config, H2_CONF_SER_HEADERS);
387 apr_status_t h2_stream_send_frame(h2_stream *stream, int ftype, int flags)
389 apr_status_t status = APR_SUCCESS;
390 int new_state, eos = 0;
392 new_state = on_frame_send(stream->state, ftype);
394 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
395 H2_STRM_MSG(stream, "invalid frame %d send"), ftype);
396 AP_DEBUG_ASSERT(new_state > S_XXX);
397 return transit(stream, new_state);
402 eos = (flags & NGHTTP2_FLAG_END_STREAM);
405 case NGHTTP2_HEADERS:
406 eos = (flags & NGHTTP2_FLAG_END_STREAM);
409 case NGHTTP2_PUSH_PROMISE:
410 /* start pushed stream */
411 ap_assert(stream->request == NULL);
412 ap_assert(stream->rtmp != NULL);
413 status = h2_request_end_headers(stream->rtmp, stream->pool, 1);
414 if (status != APR_SUCCESS) {
417 set_policy_for(stream, stream->rtmp);
418 stream->request = stream->rtmp;
425 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
426 H2_STRM_MSG(stream, "send frame %d, eos=%d"), ftype, eos);
427 status = transit(stream, new_state);
428 if (status == APR_SUCCESS && eos) {
429 status = transit(stream, on_event(stream, H2_SEV_CLOSED_L));
434 apr_status_t h2_stream_recv_frame(h2_stream *stream, int ftype, int flags)
436 apr_status_t status = APR_SUCCESS;
437 int new_state, eos = 0;
439 new_state = on_frame_recv(stream->state, ftype);
441 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
442 H2_STRM_MSG(stream, "invalid frame %d recv"), ftype);
443 AP_DEBUG_ASSERT(new_state > S_XXX);
444 return transit(stream, new_state);
449 eos = (flags & NGHTTP2_FLAG_END_STREAM);
452 case NGHTTP2_HEADERS:
453 eos = (flags & NGHTTP2_FLAG_END_STREAM);
454 if (stream->state == H2_SS_OPEN) {
457 h2_stream_rst(stream, H2_ERR_PROTOCOL_ERROR);
462 ap_assert(stream->request == NULL);
463 if (stream->rtmp == NULL) {
464 /* This can only happen, if the stream has received no header
465 * name/value pairs at all. The lastest nghttp2 version have become
466 * pretty good at detecting this early. In any case, we have
467 * to abort the connection here, since this is clearly a protocol error */
470 status = h2_request_end_headers(stream->rtmp, stream->pool, eos);
471 if (status != APR_SUCCESS) {
474 set_policy_for(stream, stream->rtmp);
475 stream->request = stream->rtmp;
483 status = transit(stream, new_state);
484 if (status == APR_SUCCESS && eos) {
485 status = transit(stream, on_event(stream, H2_SEV_CLOSED_R));
490 apr_status_t h2_stream_flush_input(h2_stream *stream)
492 apr_status_t status = APR_SUCCESS;
494 if (stream->in_buffer && !APR_BRIGADE_EMPTY(stream->in_buffer)) {
496 status = h2_beam_send(stream->input, stream->in_buffer, APR_BLOCK_READ);
497 stream->in_last_write = apr_time_now();
499 if (stream->input_eof
500 && stream->input && !h2_beam_is_closed(stream->input)) {
501 status = h2_beam_close(stream->input);
506 apr_status_t h2_stream_recv_DATA(h2_stream *stream, uint8_t flags,
507 const uint8_t *data, size_t len)
509 h2_session *session = stream->session;
510 apr_status_t status = APR_SUCCESS;
512 stream->in_data_frames++;
514 if (APLOGctrace3(session->c)) {
515 const char *load = apr_pstrndup(stream->pool, (const char *)data, len);
516 ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, session->c,
517 H2_STRM_MSG(stream, "recv DATA, len=%d: -->%s<--"),
521 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
522 H2_STRM_MSG(stream, "recv DATA, len=%d"), (int)len);
524 stream->in_data_octets += len;
525 if (!stream->in_buffer) {
526 stream->in_buffer = apr_brigade_create(stream->pool,
527 session->c->bucket_alloc);
529 apr_brigade_write(stream->in_buffer, NULL, NULL, (const char *)data, len);
530 h2_stream_dispatch(stream, H2_SEV_IN_DATA_PENDING);
535 static void prep_output(h2_stream *stream) {
536 conn_rec *c = stream->session->c;
537 if (!stream->out_buffer) {
538 stream->out_buffer = apr_brigade_create(stream->pool, c->bucket_alloc);
542 h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session,
543 h2_stream_monitor *monitor, int initiated_on)
545 h2_stream *stream = apr_pcalloc(pool, sizeof(h2_stream));
548 stream->initiated_on = initiated_on;
549 stream->created = apr_time_now();
550 stream->state = H2_SS_IDLE;
552 stream->session = session;
553 stream->monitor = monitor;
554 stream->max_mem = session->max_stream_mem;
556 #ifdef H2_NG2_LOCAL_WIN_SIZE
557 stream->in_window_size =
558 nghttp2_session_get_stream_local_window_size(
559 stream->session->ngh2, stream->id);
562 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
563 H2_STRM_LOG(APLOGNO(03082), stream, "created"));
564 on_state_enter(stream);
568 void h2_stream_cleanup(h2_stream *stream)
573 if (stream->out_buffer) {
574 /* remove any left over output buckets that may still have
575 * references into request pools */
576 apr_brigade_cleanup(stream->out_buffer);
579 h2_beam_abort(stream->input);
580 status = h2_beam_wait_empty(stream->input, APR_NONBLOCK_READ);
581 if (status == APR_EAGAIN) {
582 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
583 H2_STRM_MSG(stream, "wait on input drain"));
584 status = h2_beam_wait_empty(stream->input, APR_BLOCK_READ);
585 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c,
586 H2_STRM_MSG(stream, "input drain returned"));
591 void h2_stream_destroy(h2_stream *stream)
594 ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, stream->session->c,
595 H2_STRM_MSG(stream, "destroy"));
597 apr_pool_destroy(stream->pool);
602 apr_pool_t *h2_stream_detach_pool(h2_stream *stream)
604 apr_pool_t *pool = stream->pool;
609 apr_status_t h2_stream_prep_processing(h2_stream *stream)
611 if (stream->request) {
612 const h2_request *r = stream->request;
613 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
614 H2_STRM_MSG(stream, "schedule %s %s://%s%s chunked=%d"),
615 r->method, r->scheme, r->authority, r->path, r->chunked);
617 stream->scheduled = 1;
623 void h2_stream_rst(h2_stream *stream, int error_code)
625 stream->rst_error = error_code;
627 h2_beam_abort(stream->input);
629 if (stream->output) {
630 h2_beam_leave(stream->output);
632 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
633 H2_STRM_MSG(stream, "reset, error=%d"), error_code);
634 h2_stream_dispatch(stream, H2_SEV_CANCELLED);
637 apr_status_t h2_stream_set_request_rec(h2_stream *stream,
638 request_rec *r, int eos)
643 ap_assert(stream->request == NULL);
644 ap_assert(stream->rtmp == NULL);
645 if (stream->rst_error) {
646 return APR_ECONNRESET;
648 status = h2_request_rcreate(&req, stream->pool, r);
649 if (status == APR_SUCCESS) {
650 ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r,
651 H2_STRM_LOG(APLOGNO(03058), stream,
652 "set_request_rec %s host=%s://%s%s"),
653 req->method, req->scheme, req->authority, req->path);
655 /* simulate the frames that led to this */
656 return h2_stream_recv_frame(stream, NGHTTP2_HEADERS,
657 NGHTTP2_FLAG_END_STREAM);
662 void h2_stream_set_request(h2_stream *stream, const h2_request *r)
664 ap_assert(stream->request == NULL);
665 ap_assert(stream->rtmp == NULL);
666 stream->rtmp = h2_request_clone(stream->pool, r);
669 static void set_error_response(h2_stream *stream, int http_status)
671 if (!h2_stream_is_ready(stream)) {
672 conn_rec *c = stream->session->c;
674 h2_headers *response;
676 response = h2_headers_die(http_status, stream->request, stream->pool);
678 b = apr_bucket_eos_create(c->bucket_alloc);
679 APR_BRIGADE_INSERT_HEAD(stream->out_buffer, b);
680 b = h2_bucket_headers_create(c->bucket_alloc, response);
681 APR_BRIGADE_INSERT_HEAD(stream->out_buffer, b);
685 static apr_status_t add_trailer(h2_stream *stream,
686 const char *name, size_t nlen,
687 const char *value, size_t vlen)
689 conn_rec *c = stream->session->c;
690 char *hname, *hvalue;
692 if (nlen == 0 || name[0] == ':') {
693 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, APR_EINVAL, c,
694 H2_STRM_LOG(APLOGNO(03060), stream,
695 "pseudo header in trailer"));
698 if (h2_req_ignore_trailer(name, nlen)) {
701 if (!stream->trailers) {
702 stream->trailers = apr_table_make(stream->pool, 5);
704 hname = apr_pstrndup(stream->pool, name, nlen);
705 hvalue = apr_pstrndup(stream->pool, value, vlen);
706 h2_util_camel_case_header(hname, nlen);
707 apr_table_mergen(stream->trailers, hname, hvalue);
712 apr_status_t h2_stream_add_header(h2_stream *stream,
713 const char *name, size_t nlen,
714 const char *value, size_t vlen)
716 h2_session *session = stream->session;
720 if (stream->has_response) {
723 ++stream->request_headers_added;
724 if (name[0] == ':') {
725 if ((vlen) > session->s->limit_req_line) {
726 /* pseudo header: approximation of request line size check */
727 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
728 H2_STRM_MSG(stream, "pseudo %s too long"), name);
729 error = HTTP_REQUEST_URI_TOO_LARGE;
732 else if ((nlen + 2 + vlen) > session->s->limit_req_fieldsize) {
733 /* header too long */
734 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
735 H2_STRM_MSG(stream, "header %s too long"), name);
736 error = HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE;
739 if (stream->request_headers_added > session->s->limit_req_fields + 4) {
740 /* too many header lines, include 4 pseudo headers */
741 if (stream->request_headers_added
742 > session->s->limit_req_fields + 4 + 100) {
744 h2_stream_rst(stream, H2_ERR_ENHANCE_YOUR_CALM);
745 return APR_ECONNRESET;
747 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
748 H2_STRM_MSG(stream, "too many header lines"));
749 error = HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE;
753 set_error_response(stream, error);
756 else if (H2_SS_IDLE == stream->state) {
758 stream->rtmp = h2_req_create(stream->id, stream->pool,
759 NULL, NULL, NULL, NULL, NULL, 0);
761 status = h2_request_add_header(stream->rtmp, stream->pool,
762 name, nlen, value, vlen);
764 else if (H2_SS_OPEN == stream->state) {
765 status = add_trailer(stream, name, nlen, value, vlen);
771 if (status != APR_SUCCESS) {
772 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
773 H2_STRM_MSG(stream, "header %s not accepted"), name);
774 h2_stream_dispatch(stream, H2_SEV_CANCELLED);
779 static apr_bucket *get_first_headers_bucket(apr_bucket_brigade *bb)
782 apr_bucket *b = APR_BRIGADE_FIRST(bb);
783 while (b != APR_BRIGADE_SENTINEL(bb)) {
784 if (H2_BUCKET_IS_HEADERS(b)) {
787 b = APR_BUCKET_NEXT(b);
793 static apr_status_t add_buffered_data(h2_stream *stream, apr_off_t requested,
794 apr_off_t *plen, int *peos, int *is_all,
795 h2_headers **pheaders)
806 H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "add_buffered_data");
807 b = APR_BRIGADE_FIRST(stream->out_buffer);
808 while (b != APR_BRIGADE_SENTINEL(stream->out_buffer)) {
809 e = APR_BUCKET_NEXT(b);
810 if (APR_BUCKET_IS_METADATA(b)) {
811 if (APR_BUCKET_IS_FLUSH(b)) {
812 APR_BUCKET_REMOVE(b);
813 apr_bucket_destroy(b);
815 else if (APR_BUCKET_IS_EOS(b)) {
819 else if (H2_BUCKET_IS_HEADERS(b)) {
821 /* data before the response, can only return up to here */
825 *pheaders = h2_bucket_headers_get(b);
826 APR_BUCKET_REMOVE(b);
827 apr_bucket_destroy(b);
828 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
829 H2_STRM_MSG(stream, "prep, -> response %d"),
830 (*pheaders)->status);
838 else if (b->length == 0) {
839 APR_BUCKET_REMOVE(b);
840 apr_bucket_destroy(b);
843 ap_assert(b->length != (apr_size_t)-1);
845 if (*plen >= requested) {
856 apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen,
857 int *peos, h2_headers **pheaders)
859 apr_status_t status = APR_SUCCESS;
860 apr_off_t requested, missing, max_chunk = H2_DATA_CHUNK_SIZE;
866 if (stream->rst_error) {
869 return APR_ECONNRESET;
872 c = stream->session->c;
875 /* determine how much we'd like to send. We cannot send more than
876 * is requested. But we can reduce the size in case the master
877 * connection operates in smaller chunks. (TSL warmup) */
878 if (stream->session->io.write_size > 0) {
879 max_chunk = stream->session->io.write_size - 9; /* header bits */
881 requested = (*plen > 0)? H2MIN(*plen, max_chunk) : max_chunk;
883 /* count the buffered data until eos or a headers bucket */
884 status = add_buffered_data(stream, requested, plen, peos, &complete, pheaders);
886 if (status == APR_EAGAIN) {
887 /* TODO: ugly, someone needs to retrieve the response first */
888 h2_mplx_keep_active(stream->session->mplx, stream);
889 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
890 H2_STRM_MSG(stream, "prep, response eagain"));
893 else if (status != APR_SUCCESS) {
897 if (pheaders && *pheaders) {
901 /* If there we do not have enough buffered data to satisfy the requested
902 * length *and* we counted the _complete_ buffer (and did not stop in the middle
903 * because of meta data there), lets see if we can read more from the
905 missing = H2MIN(requested, stream->max_mem) - *plen;
906 if (complete && !*peos && missing > 0) {
907 apr_status_t rv = APR_EOF;
909 if (stream->output) {
910 H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "pre");
911 rv = h2_beam_receive(stream->output, stream->out_buffer,
912 APR_NONBLOCK_READ, stream->max_mem - *plen);
913 H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "post");
916 if (rv == APR_SUCCESS) {
917 /* count the buffer again, now that we have read output */
918 status = add_buffered_data(stream, requested, plen, peos, &complete, pheaders);
920 else if (APR_STATUS_IS_EOF(rv)) {
921 apr_bucket *eos = apr_bucket_eos_create(c->bucket_alloc);
922 APR_BRIGADE_INSERT_TAIL(stream->out_buffer, eos);
925 else if (APR_STATUS_IS_EAGAIN(rv)) {
926 /* we set this is the status of this call only if there
927 * is no buffered data, see check below */
930 /* real error reading. Give this back directly, even though
931 * we may have something buffered. */
936 if (status == APR_SUCCESS) {
937 if (*peos || *plen) {
938 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
939 H2_STRM_MSG(stream, "prepare, len=%ld eos=%d"),
944 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
945 H2_STRM_MSG(stream, "prepare, no data"));
951 static int is_not_headers(apr_bucket *b)
953 return !H2_BUCKET_IS_HEADERS(b);
956 apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb,
957 apr_off_t *plen, int *peos)
959 conn_rec *c = stream->session->c;
960 apr_status_t status = APR_SUCCESS;
962 if (stream->rst_error) {
963 return APR_ECONNRESET;
965 status = h2_append_brigade(bb, stream->out_buffer, plen, peos, is_not_headers);
966 if (status == APR_SUCCESS && !*peos && !*plen) {
969 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c,
970 H2_STRM_MSG(stream, "read_to, len=%ld eos=%d"),
976 apr_status_t h2_stream_submit_pushes(h2_stream *stream, h2_headers *response)
978 apr_status_t status = APR_SUCCESS;
979 apr_array_header_t *pushes;
982 pushes = h2_push_collect_update(stream, stream->request, response);
983 if (pushes && !apr_is_empty_array(pushes)) {
984 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
985 H2_STRM_MSG(stream, "found %d push candidates"),
987 for (i = 0; i < pushes->nelts; ++i) {
988 h2_push *push = APR_ARRAY_IDX(pushes, i, h2_push*);
989 h2_stream *s = h2_session_push(stream->session, stream, push);
991 status = APR_ECONNRESET;
999 apr_table_t *h2_stream_get_trailers(h2_stream *stream)
1004 const h2_priority *h2_stream_get_priority(h2_stream *stream,
1005 h2_headers *response)
1007 if (response && stream->initiated_on) {
1008 const char *ctype = apr_table_get(response->headers, "content-type");
1010 /* FIXME: Not good enough, config needs to come from request->server */
1011 return h2_config_get_priority(stream->session->config, ctype);
1017 int h2_stream_is_ready(h2_stream *stream)
1019 if (stream->has_response) {
1022 else if (stream->out_buffer && get_first_headers_bucket(stream->out_buffer)) {
1028 int h2_stream_was_closed(const h2_stream *stream)
1030 switch (stream->state) {
1039 apr_status_t h2_stream_in_consumed(h2_stream *stream, apr_off_t amount)
1041 h2_session *session = stream->session;
1044 apr_off_t consumed = amount;
1046 while (consumed > 0) {
1047 int len = (consumed > INT_MAX)? INT_MAX : (int)consumed;
1048 nghttp2_session_consume(session->ngh2, stream->id, len);
1052 #ifdef H2_NG2_LOCAL_WIN_SIZE
1054 int cur_size = nghttp2_session_get_stream_local_window_size(
1055 session->ngh2, stream->id);
1056 int win = stream->in_window_size;
1057 int thigh = win * 8/10;
1058 int tlow = win * 2/10;
1059 const int win_max = 2*1024*1024;
1060 const int win_min = 32*1024;
1062 /* Work in progress, probably should add directives for these
1063 * values once this stabilizes somewhat. The general idea is
1064 * to adapt stream window sizes if the input window changes
1065 * a) very quickly (< good RTT) from full to empty
1066 * b) only a little bit (> bad RTT)
1067 * where in a) it grows and in b) it shrinks again.
1069 if (cur_size > thigh && amount > thigh && win < win_max) {
1070 /* almost empty again with one reported consumption, how
1071 * long did this take? */
1072 long ms = apr_time_msec(apr_time_now() - stream->in_last_write);
1074 win = H2MIN(win_max, win + (64*1024));
1077 else if (cur_size < tlow && amount < tlow && win > win_min) {
1078 /* staying full, for how long already? */
1079 long ms = apr_time_msec(apr_time_now() - stream->in_last_write);
1081 win = H2MAX(win_min, win - (32*1024));
1085 if (win != stream->in_window_size) {
1086 stream->in_window_size = win;
1087 nghttp2_session_set_local_window_size(session->ngh2,
1088 NGHTTP2_FLAG_NONE, stream->id, win);
1090 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
1091 "h2_stream(%ld-%d): consumed %ld bytes, window now %d/%d",
1092 session->id, stream->id, (long)amount,
1093 cur_size, stream->in_window_size);