1 /* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
20 #include <http_core.h>
21 #include <http_connection.h>
24 #include <nghttp2/nghttp2.h>
26 #include "h2_private.h"
31 #include "h2_request.h"
32 #include "h2_response.h"
33 #include "h2_session.h"
34 #include "h2_stream.h"
37 #include "h2_task_input.h"
42 #define H2_STREAM_OUT(lvl,s,msg) \
44 if (APLOG_C_IS_LEVEL((s)->session->c,lvl)) \
45 h2_util_bb_log((s)->session->c,(s)->id,lvl,msg,(s)->bbout); \
47 #define H2_STREAM_IN(lvl,s,msg) \
49 if (APLOG_C_IS_LEVEL((s)->session->c,lvl)) \
50 h2_util_bb_log((s)->session->c,(s)->id,lvl,msg,(s)->bbin); \
54 static int state_transition[][7] = {
55 /* ID OP RL RR CI CO CL */
56 /*ID*/{ 1, 0, 0, 0, 0, 0, 0 },
57 /*OP*/{ 1, 1, 0, 0, 0, 0, 0 },
58 /*RL*/{ 0, 0, 1, 0, 0, 0, 0 },
59 /*RR*/{ 0, 0, 0, 1, 0, 0, 0 },
60 /*CI*/{ 1, 1, 0, 0, 1, 0, 0 },
61 /*CO*/{ 1, 1, 0, 0, 0, 1, 0 },
62 /*CL*/{ 1, 1, 0, 0, 1, 1, 1 },
65 static int set_state(h2_stream *stream, h2_stream_state_t state)
67 int allowed = state_transition[state][stream->state];
69 stream->state = state;
73 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c,
74 "h2_stream(%ld-%d): invalid state transition from %d to %d",
75 stream->session->id, stream->id, stream->state, state);
79 static int close_input(h2_stream *stream)
81 switch (stream->state) {
82 case H2_STREAM_ST_CLOSED_INPUT:
83 case H2_STREAM_ST_CLOSED:
84 return 0; /* ignore, idempotent */
85 case H2_STREAM_ST_CLOSED_OUTPUT:
87 set_state(stream, H2_STREAM_ST_CLOSED);
90 /* everything else we jump to here */
91 set_state(stream, H2_STREAM_ST_CLOSED_INPUT);
97 static int input_closed(h2_stream *stream)
99 switch (stream->state) {
100 case H2_STREAM_ST_OPEN:
101 case H2_STREAM_ST_CLOSED_OUTPUT:
108 static int close_output(h2_stream *stream)
110 switch (stream->state) {
111 case H2_STREAM_ST_CLOSED_OUTPUT:
112 case H2_STREAM_ST_CLOSED:
113 return 0; /* ignore, idempotent */
114 case H2_STREAM_ST_CLOSED_INPUT:
115 /* both closed now */
116 set_state(stream, H2_STREAM_ST_CLOSED);
119 /* everything else we jump to here */
120 set_state(stream, H2_STREAM_ST_CLOSED_OUTPUT);
126 static int input_open(h2_stream *stream)
128 switch (stream->state) {
129 case H2_STREAM_ST_OPEN:
130 case H2_STREAM_ST_CLOSED_OUTPUT:
137 static int output_open(h2_stream *stream)
139 switch (stream->state) {
140 case H2_STREAM_ST_OPEN:
141 case H2_STREAM_ST_CLOSED_INPUT:
148 h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session)
150 h2_stream *stream = apr_pcalloc(pool, sizeof(h2_stream));
152 stream->state = H2_STREAM_ST_IDLE;
154 stream->session = session;
158 h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session)
160 h2_stream *stream = h2_stream_create(id, pool, session);
161 set_state(stream, H2_STREAM_ST_OPEN);
162 stream->request = h2_request_create(id, pool);
163 stream->bbout = apr_brigade_create(stream->pool,
164 stream->session->c->bucket_alloc);
166 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
167 "h2_stream(%ld-%d): opened", session->id, stream->id);
171 apr_status_t h2_stream_destroy(h2_stream *stream)
173 AP_DEBUG_ASSERT(stream);
174 if (stream->request) {
175 h2_request_destroy(stream->request);
176 stream->request = NULL;
180 apr_pool_destroy(stream->pool);
185 void h2_stream_cleanup(h2_stream *stream)
187 h2_session_stream_destroy(stream->session, stream);
191 apr_pool_t *h2_stream_detach_pool(h2_stream *stream)
193 apr_pool_t *pool = stream->pool;
198 void h2_stream_rst(h2_stream *stream, int error_code)
200 stream->rst_error = error_code;
202 close_output(stream);
203 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->session->c,
204 "h2_stream(%ld-%d): reset, error=%d",
205 stream->session->id, stream->id, error_code);
208 apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response,
209 apr_bucket_brigade *bb)
211 apr_status_t status = APR_SUCCESS;
212 if (!output_open(stream)) {
213 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->session->c,
214 "h2_stream(%ld-%d): output closed",
215 stream->session->id, stream->id);
216 return APR_ECONNRESET;
219 stream->response = response;
220 if (bb && !APR_BRIGADE_EMPTY(bb)) {
221 int move_all = INT_MAX;
222 /* we can move file handles from h2_mplx into this h2_stream as many
223 * as we want, since the lifetimes are the same and we are not freeing
224 * the ones in h2_mplx->io before this stream is done. */
225 H2_STREAM_OUT(APLOG_TRACE2, stream, "h2_stream set_response_pre");
226 status = h2_util_move(stream->bbout, bb, 16 * 1024, &move_all,
227 "h2_stream_set_response");
228 H2_STREAM_OUT(APLOG_TRACE2, stream, "h2_stream set_response_post");
231 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, stream->session->c,
232 "h2_stream(%ld-%d): set_response(%d)",
233 stream->session->id, stream->id, response->http_status);
237 apr_status_t h2_stream_set_request(h2_stream *stream, request_rec *r)
240 AP_DEBUG_ASSERT(stream);
241 if (stream->rst_error) {
242 return APR_ECONNRESET;
244 set_state(stream, H2_STREAM_ST_OPEN);
245 status = h2_request_rwrite(stream->request, r);
249 void h2_stream_set_h2_request(h2_stream *stream, int initiated_on,
250 const h2_request *req)
252 h2_request_copy(stream->pool, stream->request, req);
253 stream->initiated_on = initiated_on;
254 stream->request->eoh = 0;
257 apr_status_t h2_stream_add_header(h2_stream *stream,
258 const char *name, size_t nlen,
259 const char *value, size_t vlen)
261 AP_DEBUG_ASSERT(stream);
262 if (h2_stream_is_scheduled(stream)) {
263 return h2_request_add_trailer(stream->request, stream->pool,
264 name, nlen, value, vlen);
267 if (!input_open(stream)) {
268 return APR_ECONNRESET;
270 return h2_request_add_header(stream->request, stream->pool,
271 name, nlen, value, vlen);
275 apr_status_t h2_stream_schedule(h2_stream *stream, int eos,
276 h2_stream_pri_cmp *cmp, void *ctx)
279 AP_DEBUG_ASSERT(stream);
280 AP_DEBUG_ASSERT(stream->session);
281 AP_DEBUG_ASSERT(stream->session->mplx);
283 if (!output_open(stream)) {
284 return APR_ECONNRESET;
286 if (stream->scheduled) {
293 /* Seeing the end-of-headers, we have everything we need to
294 * start processing it.
296 status = h2_request_end_headers(stream->request, stream->pool, eos);
297 if (status == APR_SUCCESS) {
299 stream->bbin = apr_brigade_create(stream->pool,
300 stream->session->c->bucket_alloc);
302 stream->input_remaining = stream->request->content_length;
304 status = h2_mplx_process(stream->session->mplx, stream->id,
305 stream->request, eos, cmp, ctx);
306 stream->scheduled = 1;
308 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->session->c,
309 "h2_stream(%ld-%d): scheduled %s %s://%s%s",
310 stream->session->id, stream->id,
311 stream->request->method, stream->request->scheme,
312 stream->request->authority, stream->request->path);
315 h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
316 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->session->c,
317 "h2_stream(%ld-%d): RST=2 (internal err) %s %s://%s%s",
318 stream->session->id, stream->id,
319 stream->request->method, stream->request->scheme,
320 stream->request->authority, stream->request->path);
326 int h2_stream_is_scheduled(h2_stream *stream)
328 return stream->scheduled;
331 static apr_status_t h2_stream_input_flush(h2_stream *stream)
333 apr_status_t status = APR_SUCCESS;
334 if (stream->bbin && !APR_BRIGADE_EMPTY(stream->bbin)) {
336 status = h2_mplx_in_write(stream->session->mplx, stream->id, stream->bbin);
337 if (status != APR_SUCCESS) {
338 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, stream->session->mplx->c,
339 "h2_stream(%ld-%d): flushing input data",
340 stream->session->id, stream->id);
346 static apr_status_t input_flush(apr_bucket_brigade *bb, void *ctx)
349 return h2_stream_input_flush(ctx);
352 static apr_status_t input_add_data(h2_stream *stream,
353 const char *data, size_t len, int chunked)
355 apr_status_t status = APR_SUCCESS;
358 status = apr_brigade_printf(stream->bbin, input_flush, stream,
359 "%lx\r\n", (unsigned long)len);
360 if (status == APR_SUCCESS) {
361 status = apr_brigade_write(stream->bbin, input_flush, stream, data, len);
362 if (status == APR_SUCCESS) {
363 status = apr_brigade_puts(stream->bbin, input_flush, stream, "\r\n");
368 status = apr_brigade_write(stream->bbin, input_flush, stream, data, len);
373 static int input_add_header(void *str, const char *key, const char *value)
375 h2_stream *stream = str;
376 apr_status_t status = input_add_data(stream, key, strlen(key), 0);
377 if (status == APR_SUCCESS) {
378 status = input_add_data(stream, ": ", 2, 0);
379 if (status == APR_SUCCESS) {
380 status = input_add_data(stream, value, strlen(value), 0);
381 if (status == APR_SUCCESS) {
382 status = input_add_data(stream, "\r\n", 2, 0);
386 return (status == APR_SUCCESS);
389 apr_status_t h2_stream_close_input(h2_stream *stream)
391 apr_status_t status = APR_SUCCESS;
393 AP_DEBUG_ASSERT(stream);
394 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->session->c,
395 "h2_stream(%ld-%d): closing input",
396 stream->session->id, stream->id);
398 if (stream->rst_error) {
399 return APR_ECONNRESET;
402 H2_STREAM_IN(APLOG_TRACE2, stream, "close_pre");
403 if (close_input(stream) && stream->bbin) {
404 if (stream->request->chunked) {
405 apr_table_t *trailers = stream->request->trailers;
406 if (trailers && !apr_is_empty_table(trailers)) {
407 status = input_add_data(stream, "0\r\n", 3, 0);
408 apr_table_do(input_add_header, stream, trailers, NULL);
409 status = input_add_data(stream, "\r\n", 2, 0);
412 status = input_add_data(stream, "0\r\n\r\n", 5, 0);
416 if (status == APR_SUCCESS) {
417 status = h2_stream_input_flush(stream);
419 if (status == APR_SUCCESS) {
420 status = h2_mplx_in_close(stream->session->mplx, stream->id);
423 H2_STREAM_IN(APLOG_TRACE2, stream, "close_post");
427 apr_status_t h2_stream_write_data(h2_stream *stream,
428 const char *data, size_t len)
430 apr_status_t status = APR_SUCCESS;
432 AP_DEBUG_ASSERT(stream);
433 if (input_closed(stream) || !stream->request->eoh || !stream->bbin) {
434 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->session->c,
435 "h2_stream(%ld-%d): writing denied, closed=%d, eoh=%d, bbin=%d",
436 stream->session->id, stream->id, input_closed(stream),
437 stream->request->eoh, !!stream->bbin);
441 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->session->c,
442 "h2_stream(%ld-%d): add %ld input bytes",
443 stream->session->id, stream->id, (long)len);
445 H2_STREAM_IN(APLOG_TRACE2, stream, "write_data_pre");
446 if (stream->request->chunked) {
447 /* if input may have a body and we have not seen any
448 * content-length header, we need to chunk the input data.
450 status = input_add_data(stream, data, len, 1);
453 stream->input_remaining -= len;
454 if (stream->input_remaining < 0) {
455 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c,
457 "h2_stream(%ld-%d): got %ld more content bytes than announced "
458 "in content-length header: %ld",
459 stream->session->id, stream->id,
460 (long)stream->request->content_length,
461 -(long)stream->input_remaining);
462 h2_stream_rst(stream, H2_ERR_PROTOCOL_ERROR);
463 return APR_ECONNABORTED;
465 status = input_add_data(stream, data, len, 0);
467 if (status == APR_SUCCESS) {
468 status = h2_stream_input_flush(stream);
470 H2_STREAM_IN(APLOG_TRACE2, stream, "write_data_post");
474 apr_status_t h2_stream_prep_read(h2_stream *stream,
475 apr_off_t *plen, int *peos)
477 apr_status_t status = APR_SUCCESS;
479 apr_table_t *trailers = NULL;
480 int test_read = (*plen == 0);
482 if (stream->rst_error) {
483 return APR_ECONNRESET;
486 H2_STREAM_OUT(APLOG_TRACE2, stream, "h2_stream prep_read_pre");
487 if (!APR_BRIGADE_EMPTY(stream->bbout)) {
489 status = h2_util_bb_avail(stream->bbout, plen, peos);
490 if (!test_read && status == APR_SUCCESS && !*peos && !*plen) {
491 apr_brigade_cleanup(stream->bbout);
492 return h2_stream_prep_read(stream, plen, peos);
494 trailers = stream->response? stream->response->trailers : NULL;
498 status = h2_mplx_out_readx(stream->session->mplx, stream->id,
499 NULL, NULL, plen, peos, &trailers);
500 if (trailers && stream->response) {
501 h2_response_set_trailers(stream->response, trailers);
505 if (!test_read && status == APR_SUCCESS && !*peos && !*plen) {
509 H2_STREAM_OUT(APLOG_TRACE2, stream, "h2_stream prep_read_post");
510 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c,
511 "h2_stream(%ld-%d): prep_read %s, len=%ld eos=%d, trailers=%s",
512 stream->session->id, stream->id, src, (long)*plen, *peos,
513 trailers? "yes" : "no");
517 apr_status_t h2_stream_readx(h2_stream *stream,
518 h2_io_data_cb *cb, void *ctx,
519 apr_off_t *plen, int *peos)
521 apr_status_t status = APR_SUCCESS;
522 apr_table_t *trailers = NULL;
525 H2_STREAM_OUT(APLOG_TRACE2, stream, "h2_stream readx_pre");
526 if (stream->rst_error) {
527 return APR_ECONNRESET;
530 if (!APR_BRIGADE_EMPTY(stream->bbout)) {
531 apr_off_t origlen = *plen;
534 status = h2_util_bb_readx(stream->bbout, cb, ctx, plen, peos);
535 if (status == APR_SUCCESS && !*peos && !*plen) {
536 apr_brigade_cleanup(stream->bbout);
538 return h2_stream_readx(stream, cb, ctx, plen, peos);
543 status = h2_mplx_out_readx(stream->session->mplx, stream->id,
544 cb, ctx, plen, peos, &trailers);
547 if (trailers && stream->response) {
548 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c,
549 "h2_stream(%ld-%d): readx, saving trailers",
550 stream->session->id, stream->id);
551 h2_response_set_trailers(stream->response, trailers);
554 if (status == APR_SUCCESS && !*peos && !*plen) {
558 H2_STREAM_OUT(APLOG_TRACE2, stream, "h2_stream readx_post");
559 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c,
560 "h2_stream(%ld-%d): readx %s, len=%ld eos=%d",
561 stream->session->id, stream->id, src, (long)*plen, *peos);
562 H2_STREAM_OUT(APLOG_TRACE2, stream, "h2_stream readx_post");
567 apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb,
568 apr_off_t *plen, int *peos)
570 apr_status_t status = APR_SUCCESS;
571 apr_table_t *trailers = NULL;
573 H2_STREAM_OUT(APLOG_TRACE2, stream, "h2_stream read_to_pre");
574 if (stream->rst_error) {
575 return APR_ECONNRESET;
578 if (APR_BRIGADE_EMPTY(stream->bbout)) {
579 apr_off_t tlen = *plen;
581 status = h2_mplx_out_read_to(stream->session->mplx, stream->id,
582 stream->bbout, &tlen, &eos, &trailers);
585 if (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(stream->bbout)) {
586 status = h2_transfer_brigade(bb, stream->bbout, stream->pool,
594 if (trailers && stream->response) {
595 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c,
596 "h2_stream(%ld-%d): read_to, saving trailers",
597 stream->session->id, stream->id);
598 h2_response_set_trailers(stream->response, trailers);
601 if (status == APR_SUCCESS && !*peos && !*plen) {
604 H2_STREAM_OUT(APLOG_TRACE2, stream, "h2_stream read_to_post");
605 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c,
606 "h2_stream(%ld-%d): read_to, len=%ld eos=%d",
607 stream->session->id, stream->id, (long)*plen, *peos);
611 void h2_stream_set_suspended(h2_stream *stream, int suspended)
613 AP_DEBUG_ASSERT(stream);
614 stream->suspended = !!suspended;
615 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
616 "h2_stream(%ld-%d): suspended=%d",
617 stream->session->id, stream->id, stream->suspended);
620 int h2_stream_is_suspended(h2_stream *stream)
622 AP_DEBUG_ASSERT(stream);
623 return stream->suspended;
626 int h2_stream_input_is_open(h2_stream *stream)
628 return input_open(stream);
631 int h2_stream_needs_submit(h2_stream *stream)
633 switch (stream->state) {
634 case H2_STREAM_ST_OPEN:
635 case H2_STREAM_ST_CLOSED_INPUT:
636 case H2_STREAM_ST_CLOSED_OUTPUT:
637 case H2_STREAM_ST_CLOSED:
638 return !stream->submitted;
644 apr_status_t h2_stream_submit_pushes(h2_stream *stream)
646 apr_status_t status = APR_SUCCESS;
647 apr_array_header_t *pushes;
650 pushes = h2_push_collect(stream->pool, stream->request, stream->response);
651 if (pushes && !apr_is_empty_array(pushes)) {
652 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
653 "h2_stream(%ld-%d): found %d push candidates",
654 stream->session->id, stream->id, pushes->nelts);
655 for (i = 0; i < pushes->nelts; ++i) {
656 h2_push *push = APR_ARRAY_IDX(pushes, i, h2_push*);
657 h2_stream *s = h2_session_push(stream->session, stream, push);
659 status = APR_ECONNRESET;
667 apr_table_t *h2_stream_get_trailers(h2_stream *stream)
669 return stream->response? stream->response->trailers : NULL;
672 void h2_stream_set_priority(h2_stream *stream, h2_priority *prio)
674 stream->prio = apr_pcalloc(stream->pool, sizeof(*prio));
675 memcpy(stream->prio, prio, sizeof(*prio));
678 h2_priority *h2_stream_get_priority(h2_stream *stream)