1 /* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
20 #include <http_core.h>
21 #include <http_connection.h>
24 #include <nghttp2/nghttp2.h>
26 #include "h2_private.h"
28 #include "h2_config.h"
30 #include "h2_filter.h"
33 #include "h2_request.h"
34 #include "h2_response.h"
35 #include "h2_session.h"
36 #include "h2_stream.h"
39 #include "h2_task_input.h"
44 #define H2_STREAM_IN(lvl,s,msg) \
46 if (APLOG_C_IS_LEVEL((s)->session->c,lvl)) \
47 h2_util_bb_log((s)->session->c,(s)->id,lvl,msg,(s)->bbin); \
51 static int state_transition[][7] = {
52 /* ID OP RL RR CI CO CL */
53 /*ID*/{ 1, 0, 0, 0, 0, 0, 0 },
54 /*OP*/{ 1, 1, 0, 0, 0, 0, 0 },
55 /*RL*/{ 0, 0, 1, 0, 0, 0, 0 },
56 /*RR*/{ 0, 0, 0, 1, 0, 0, 0 },
57 /*CI*/{ 1, 1, 0, 0, 1, 0, 0 },
58 /*CO*/{ 1, 1, 0, 0, 0, 1, 0 },
59 /*CL*/{ 1, 1, 0, 0, 1, 1, 1 },
62 static int set_state(h2_stream *stream, h2_stream_state_t state)
64 int allowed = state_transition[state][stream->state];
66 stream->state = state;
70 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c, APLOGNO(03081)
71 "h2_stream(%ld-%d): invalid state transition from %d to %d",
72 stream->session->id, stream->id, stream->state, state);
76 static int close_input(h2_stream *stream)
78 switch (stream->state) {
79 case H2_STREAM_ST_CLOSED_INPUT:
80 case H2_STREAM_ST_CLOSED:
81 return 0; /* ignore, idempotent */
82 case H2_STREAM_ST_CLOSED_OUTPUT:
84 set_state(stream, H2_STREAM_ST_CLOSED);
87 /* everything else we jump to here */
88 set_state(stream, H2_STREAM_ST_CLOSED_INPUT);
94 static int input_closed(h2_stream *stream)
96 switch (stream->state) {
97 case H2_STREAM_ST_OPEN:
98 case H2_STREAM_ST_CLOSED_OUTPUT:
105 static int close_output(h2_stream *stream)
107 switch (stream->state) {
108 case H2_STREAM_ST_CLOSED_OUTPUT:
109 case H2_STREAM_ST_CLOSED:
110 return 0; /* ignore, idempotent */
111 case H2_STREAM_ST_CLOSED_INPUT:
112 /* both closed now */
113 set_state(stream, H2_STREAM_ST_CLOSED);
116 /* everything else we jump to here */
117 set_state(stream, H2_STREAM_ST_CLOSED_OUTPUT);
123 static int input_open(const h2_stream *stream)
125 switch (stream->state) {
126 case H2_STREAM_ST_OPEN:
127 case H2_STREAM_ST_CLOSED_OUTPUT:
134 static int output_open(h2_stream *stream)
136 switch (stream->state) {
137 case H2_STREAM_ST_OPEN:
138 case H2_STREAM_ST_CLOSED_INPUT:
145 static h2_sos *h2_sos_mplx_create(h2_stream *stream, h2_response *response);
147 h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session)
149 h2_stream *stream = apr_pcalloc(pool, sizeof(h2_stream));
151 stream->state = H2_STREAM_ST_IDLE;
153 stream->session = session;
157 h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session)
159 h2_stream *stream = h2_stream_create(id, pool, session);
160 set_state(stream, H2_STREAM_ST_OPEN);
161 stream->request = h2_request_create(id, pool,
162 h2_config_geti(session->config, H2_CONF_SER_HEADERS));
164 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03082)
165 "h2_stream(%ld-%d): opened", session->id, stream->id);
169 apr_status_t h2_stream_destroy(h2_stream *stream)
171 AP_DEBUG_ASSERT(stream);
173 apr_pool_destroy(stream->pool);
178 void h2_stream_cleanup(h2_stream *stream)
180 h2_session_stream_destroy(stream->session, stream);
184 apr_pool_t *h2_stream_detach_pool(h2_stream *stream)
186 apr_pool_t *pool = stream->pool;
191 void h2_stream_rst(h2_stream *stream, int error_code)
193 stream->rst_error = error_code;
195 close_output(stream);
196 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
197 "h2_stream(%ld-%d): reset, error=%d",
198 stream->session->id, stream->id, error_code);
201 struct h2_response *h2_stream_get_response(h2_stream *stream)
203 return stream->sos? stream->sos->response : NULL;
206 apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response,
207 apr_bucket_brigade *bb)
209 apr_status_t status = APR_SUCCESS;
212 if (!output_open(stream)) {
213 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
214 "h2_stream(%ld-%d): output closed",
215 stream->session->id, stream->id);
216 return APR_ECONNRESET;
219 sos = h2_sos_mplx_create(stream, response);
220 if (sos->response->sos_filter) {
221 sos = h2_filter_sos_create(sos->response->sos_filter, sos);
225 status = stream->sos->buffer(stream->sos, bb);
226 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c,
227 "h2_stream(%ld-%d): set_response(%d)",
228 stream->session->id, stream->id, stream->sos->response->http_status);
232 apr_status_t h2_stream_set_request(h2_stream *stream, request_rec *r)
235 AP_DEBUG_ASSERT(stream);
236 if (stream->rst_error) {
237 return APR_ECONNRESET;
239 set_state(stream, H2_STREAM_ST_OPEN);
240 status = h2_request_rwrite(stream->request, r);
241 stream->request->serialize = h2_config_geti(h2_config_rget(r),
242 H2_CONF_SER_HEADERS);
247 void h2_stream_set_h2_request(h2_stream *stream, int initiated_on,
248 const h2_request *req)
250 h2_request_copy(stream->pool, stream->request, req);
251 stream->initiated_on = initiated_on;
252 stream->request->eoh = 0;
255 apr_status_t h2_stream_add_header(h2_stream *stream,
256 const char *name, size_t nlen,
257 const char *value, size_t vlen)
259 AP_DEBUG_ASSERT(stream);
260 if (h2_stream_is_scheduled(stream)) {
261 return h2_request_add_trailer(stream->request, stream->pool,
262 name, nlen, value, vlen);
265 if (!input_open(stream)) {
266 return APR_ECONNRESET;
268 return h2_request_add_header(stream->request, stream->pool,
269 name, nlen, value, vlen);
273 apr_status_t h2_stream_schedule(h2_stream *stream, int eos, int push_enabled,
274 h2_stream_pri_cmp *cmp, void *ctx)
277 AP_DEBUG_ASSERT(stream);
278 AP_DEBUG_ASSERT(stream->session);
279 AP_DEBUG_ASSERT(stream->session->mplx);
281 if (!output_open(stream)) {
282 return APR_ECONNRESET;
284 if (stream->scheduled) {
291 /* Seeing the end-of-headers, we have everything we need to
292 * start processing it.
294 status = h2_request_end_headers(stream->request, stream->pool,
296 if (status == APR_SUCCESS) {
298 stream->request->body = 1;
299 stream->bbin = apr_brigade_create(stream->pool,
300 stream->session->c->bucket_alloc);
302 stream->input_remaining = stream->request->content_length;
304 status = h2_mplx_process(stream->session->mplx, stream->id,
305 stream->request, cmp, ctx);
306 stream->scheduled = 1;
308 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
309 "h2_stream(%ld-%d): scheduled %s %s://%s%s",
310 stream->session->id, stream->id,
311 stream->request->method, stream->request->scheme,
312 stream->request->authority, stream->request->path);
315 h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
316 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
317 "h2_stream(%ld-%d): RST=2 (internal err) %s %s://%s%s",
318 stream->session->id, stream->id,
319 stream->request->method, stream->request->scheme,
320 stream->request->authority, stream->request->path);
326 int h2_stream_is_scheduled(const h2_stream *stream)
328 return stream->scheduled;
331 static apr_status_t h2_stream_input_flush(h2_stream *stream)
333 apr_status_t status = APR_SUCCESS;
334 if (stream->bbin && !APR_BRIGADE_EMPTY(stream->bbin)) {
336 status = h2_mplx_in_write(stream->session->mplx, stream->id, stream->bbin);
337 if (status != APR_SUCCESS) {
338 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->mplx->c,
339 "h2_stream(%ld-%d): flushing input data",
340 stream->session->id, stream->id);
346 static apr_status_t input_flush(apr_bucket_brigade *bb, void *ctx)
349 return h2_stream_input_flush(ctx);
352 static apr_status_t input_add_data(h2_stream *stream,
353 const char *data, size_t len)
355 return apr_brigade_write(stream->bbin, input_flush, stream, data, len);
358 apr_status_t h2_stream_close_input(h2_stream *stream)
360 apr_status_t status = APR_SUCCESS;
362 AP_DEBUG_ASSERT(stream);
363 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
364 "h2_stream(%ld-%d): closing input",
365 stream->session->id, stream->id);
367 if (stream->rst_error) {
368 return APR_ECONNRESET;
371 H2_STREAM_IN(APLOG_TRACE2, stream, "close_pre");
372 if (close_input(stream) && stream->bbin) {
373 status = h2_stream_input_flush(stream);
374 if (status == APR_SUCCESS) {
375 status = h2_mplx_in_close(stream->session->mplx, stream->id);
378 H2_STREAM_IN(APLOG_TRACE2, stream, "close_post");
382 apr_status_t h2_stream_write_data(h2_stream *stream,
383 const char *data, size_t len)
385 apr_status_t status = APR_SUCCESS;
387 AP_DEBUG_ASSERT(stream);
388 if (input_closed(stream) || !stream->request->eoh || !stream->bbin) {
389 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
390 "h2_stream(%ld-%d): writing denied, closed=%d, eoh=%d, bbin=%d",
391 stream->session->id, stream->id, input_closed(stream),
392 stream->request->eoh, !!stream->bbin);
396 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
397 "h2_stream(%ld-%d): add %ld input bytes",
398 stream->session->id, stream->id, (long)len);
400 H2_STREAM_IN(APLOG_TRACE2, stream, "write_data_pre");
401 if (!stream->request->chunked) {
402 stream->input_remaining -= len;
403 if (stream->input_remaining < 0) {
404 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c,
406 "h2_stream(%ld-%d): got %ld more content bytes than announced "
407 "in content-length header: %ld",
408 stream->session->id, stream->id,
409 (long)stream->request->content_length,
410 -(long)stream->input_remaining);
411 h2_stream_rst(stream, H2_ERR_PROTOCOL_ERROR);
412 return APR_ECONNABORTED;
416 status = input_add_data(stream, data, len);
417 if (status == APR_SUCCESS) {
418 status = h2_stream_input_flush(stream);
420 H2_STREAM_IN(APLOG_TRACE2, stream, "write_data_post");
424 void h2_stream_set_suspended(h2_stream *stream, int suspended)
426 AP_DEBUG_ASSERT(stream);
427 stream->suspended = !!suspended;
428 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
429 "h2_stream(%ld-%d): suspended=%d",
430 stream->session->id, stream->id, stream->suspended);
433 int h2_stream_is_suspended(const h2_stream *stream)
435 AP_DEBUG_ASSERT(stream);
436 return stream->suspended;
439 apr_status_t h2_stream_prep_read(h2_stream *stream,
440 apr_off_t *plen, int *peos)
442 if (stream->rst_error) {
443 return APR_ECONNRESET;
449 return stream->sos->prep_read(stream->sos, plen, peos);
452 apr_status_t h2_stream_readx(h2_stream *stream,
453 h2_io_data_cb *cb, void *ctx,
454 apr_off_t *plen, int *peos)
456 if (stream->rst_error) {
457 return APR_ECONNRESET;
462 return stream->sos->readx(stream->sos, cb, ctx, plen, peos);
465 apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb,
466 apr_off_t *plen, int *peos)
468 if (stream->rst_error) {
469 return APR_ECONNRESET;
474 return stream->sos->read_to(stream->sos, bb, plen, peos);
477 int h2_stream_input_is_open(const h2_stream *stream)
479 return input_open(stream);
482 int h2_stream_needs_submit(const h2_stream *stream)
484 switch (stream->state) {
485 case H2_STREAM_ST_OPEN:
486 case H2_STREAM_ST_CLOSED_INPUT:
487 case H2_STREAM_ST_CLOSED_OUTPUT:
488 case H2_STREAM_ST_CLOSED:
489 return !stream->submitted;
495 apr_status_t h2_stream_submit_pushes(h2_stream *stream)
497 apr_status_t status = APR_SUCCESS;
498 apr_array_header_t *pushes;
501 pushes = h2_push_collect_update(stream, stream->request,
502 h2_stream_get_response(stream));
503 if (pushes && !apr_is_empty_array(pushes)) {
504 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
505 "h2_stream(%ld-%d): found %d push candidates",
506 stream->session->id, stream->id, pushes->nelts);
507 for (i = 0; i < pushes->nelts; ++i) {
508 h2_push *push = APR_ARRAY_IDX(pushes, i, h2_push*);
509 h2_stream *s = h2_session_push(stream->session, stream, push);
511 status = APR_ECONNRESET;
519 apr_table_t *h2_stream_get_trailers(h2_stream *stream)
521 return stream->sos? stream->sos->get_trailers(stream->sos) : NULL;
524 const h2_priority *h2_stream_get_priority(h2_stream *stream)
526 h2_response *response = h2_stream_get_response(stream);
528 if (stream->initiated_on && response) {
529 const char *ctype = apr_table_get(response->headers, "content-type");
531 /* FIXME: Not good enough, config needs to come from request->server */
532 return h2_config_get_priority(stream->session->config, ctype);
538 /*******************************************************************************
540 ******************************************************************************/
542 typedef struct h2_sos_mplx {
544 apr_bucket_brigade *bb;
545 apr_table_t *trailers;
548 #define H2_SOS_MPLX_OUT(lvl,msos,msg) \
550 if (APLOG_C_IS_LEVEL((msos)->m->c,lvl)) \
551 h2_util_bb_log((msos)->m->c,(msos)->m->id,lvl,msg,(msos)->bb); \
555 static apr_status_t h2_sos_mplx_read_to(h2_sos *sos, apr_bucket_brigade *bb,
556 apr_off_t *plen, int *peos)
558 h2_sos_mplx *msos = sos->ctx;
559 apr_status_t status = APR_SUCCESS;
560 apr_table_t *trailers = NULL;
562 H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx read_to_pre");
564 if (APR_BRIGADE_EMPTY(msos->bb)) {
565 apr_off_t tlen = *plen;
567 status = h2_mplx_out_read_to(msos->m, sos->stream->id,
568 msos->bb, &tlen, &eos, &trailers);
571 if (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(msos->bb)) {
572 status = h2_transfer_brigade(bb, msos->bb, sos->stream->pool,
581 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, msos->m->c,
582 "h2_stream(%ld-%d): read_to, saving trailers",
583 msos->m->id, sos->stream->id);
584 msos->trailers = trailers;
587 if (status == APR_SUCCESS && !*peos && !*plen) {
590 H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx read_to_post");
591 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, msos->m->c,
592 "h2_stream(%ld-%d): read_to, len=%ld eos=%d",
593 msos->m->id, sos->stream->id, (long)*plen, *peos);
597 static apr_status_t h2_sos_mplx_prep_read(h2_sos *sos, apr_off_t *plen, int *peos)
599 h2_sos_mplx *msos = sos->ctx;
600 apr_status_t status = APR_SUCCESS;
602 apr_table_t *trailers = NULL;
603 int test_read = (*plen == 0);
605 H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx prep_read_pre");
606 if (!APR_BRIGADE_EMPTY(msos->bb)) {
608 status = h2_util_bb_avail(msos->bb, plen, peos);
609 if (!test_read && status == APR_SUCCESS && !*peos && !*plen) {
610 apr_brigade_cleanup(msos->bb);
611 return h2_sos_mplx_prep_read(sos, plen, peos);
616 status = h2_mplx_out_readx(msos->m, sos->stream->id,
617 NULL, NULL, plen, peos, &trailers);
619 msos->trailers = trailers;
623 if (!test_read && status == APR_SUCCESS && !*peos && !*plen) {
627 H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx prep_read_post");
628 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, msos->m->c,
629 "h2_stream(%ld-%d): prep_read %s, len=%ld eos=%d, trailers=%s",
630 msos->m->id, sos->stream->id, src, (long)*plen, *peos,
631 msos->trailers? "yes" : "no");
635 static apr_status_t h2_sos_mplx_readx(h2_sos *sos, h2_io_data_cb *cb, void *ctx,
636 apr_off_t *plen, int *peos)
638 h2_sos_mplx *msos = sos->ctx;
639 apr_status_t status = APR_SUCCESS;
640 apr_table_t *trailers = NULL;
643 H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx readx_pre");
645 if (!APR_BRIGADE_EMPTY(msos->bb)) {
646 apr_off_t origlen = *plen;
649 status = h2_util_bb_readx(msos->bb, cb, ctx, plen, peos);
650 if (status == APR_SUCCESS && !*peos && !*plen) {
651 apr_brigade_cleanup(msos->bb);
653 return h2_sos_mplx_readx(sos, cb, ctx, plen, peos);
658 status = h2_mplx_out_readx(msos->m, sos->stream->id,
659 cb, ctx, plen, peos, &trailers);
663 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, msos->m->c,
664 "h2_stream(%ld-%d): readx, saving trailers",
665 msos->m->id, sos->stream->id);
666 msos->trailers = trailers;
669 if (status == APR_SUCCESS && !*peos && !*plen) {
673 H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_stream readx_post");
674 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, msos->m->c,
675 "h2_stream(%ld-%d): readx %s, len=%ld eos=%d",
676 msos->m->id, sos->stream->id, src, (long)*plen, *peos);
681 static apr_table_t *h2_sos_mplx_get_trailers(h2_sos *sos)
683 h2_sos_mplx *msos = sos->ctx;
685 return msos->trailers;
688 static apr_status_t h2_sos_mplx_buffer(h2_sos *sos, apr_bucket_brigade *bb)
690 h2_sos_mplx *msos = sos->ctx;
691 apr_status_t status = APR_SUCCESS;
693 if (bb && !APR_BRIGADE_EMPTY(bb)) {
694 apr_size_t move_all = INT_MAX;
695 /* we can move file handles from h2_mplx into this h2_stream as many
696 * as we want, since the lifetimes are the same and we are not freeing
697 * the ones in h2_mplx->io before this stream is done. */
698 H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx set_response_pre");
699 status = h2_util_move(msos->bb, bb, 16 * 1024, &move_all,
700 "h2_stream_set_response");
701 H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx set_response_post");
706 static h2_sos *h2_sos_mplx_create(h2_stream *stream, h2_response *response)
711 msos = apr_pcalloc(stream->pool, sizeof(*msos));
712 msos->m = stream->session->mplx;
713 msos->bb = apr_brigade_create(stream->pool, msos->m->c->bucket_alloc);
715 sos = apr_pcalloc(stream->pool, sizeof(*sos));
716 sos->stream = stream;
717 sos->response = response;
720 sos->buffer = h2_sos_mplx_buffer;
721 sos->prep_read = h2_sos_mplx_prep_read;
722 sos->readx = h2_sos_mplx_readx;
723 sos->read_to = h2_sos_mplx_read_to;
724 sos->get_trailers = h2_sos_mplx_get_trailers;
726 sos->response = response;