1 /* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
19 #include <apr_strings.h>
22 #include <http_core.h>
23 #include <http_connection.h>
26 #include <nghttp2/nghttp2.h>
28 #include "h2_private.h"
30 #include "h2_bucket_beam.h"
32 #include "h2_config.h"
36 #include "h2_request.h"
37 #include "h2_headers.h"
38 #include "h2_session.h"
39 #include "h2_stream.h"
49 #define S_IDL (H2_SS_IDL + 1)
50 #define S_RS_L (H2_SS_RSVD_L + 1)
51 #define S_RS_R (H2_SS_RSVD_R + 1)
52 #define S_OPEN (H2_SS_OPEN + 1)
53 #define S_CL_L (H2_SS_CLOSED_L + 1)
54 #define S_CL_R (H2_SS_CLOSED_R + 1)
55 #define S_CLS (H2_SS_CLOSED + 1)
56 #define S_CLN (H2_SS_CLEANUP + 1)
58 static const char *h2_ss_str(h2_stream_state_t state)
64 return "RESERVED_LOCAL";
66 return "RESERVED_REMOTE";
70 return "HALF_CLOSED_LOCAL";
72 return "HALF_CLOSED_REMOTE";
82 const char *h2_stream_state_str(h2_stream *stream)
84 return h2_ss_str(stream->state);
87 static int trans_on_send[][H2_SS_MAX] = {
88 /* S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, */
89 /* DATA, */ { S_ERR, S_ERR, S_ERR, S_NOP, S_NOP, S_ERR, S_NOP, S_NOP, },
90 /* HEADERS, */ { S_ERR, S_ERR, S_CL_R, S_NOP, S_NOP, S_ERR, S_NOP, S_NOP, },
91 /* PRIORITY, */ { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },
92 /* RST_STREAM, */ { S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, },
93 /* SETTINGS, */ { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },
94 /* PUSH_PROMISE, */ { S_RS_L,S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },
95 /* PING, */ { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },
96 /* GOAWAY, */ { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },
97 /* WINDOW_UPDATE,*/ { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },
98 /* CONT */ { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },
100 static int trans_on_recv[][H2_SS_MAX] = {
101 /* S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, */
102 /* DATA, */ { S_ERR, S_ERR, S_ERR, S_NOP, S_ERR, S_NOP, S_NOP, S_NOP, },
103 /* HEADERS, */ { S_OPEN,S_CL_L, S_ERR, S_NOP, S_ERR, S_NOP, S_NOP, S_NOP, },
104 /* PRIORITY, */ { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },
105 /* RST_STREAM, */ { S_ERR, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, },
106 /* SETTINGS, */ { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },
107 /* PUSH_PROMISE, */ { S_RS_R,S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },
108 /* PING, */ { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },
109 /* GOAWAY, */ { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },
110 /* WINDOW_UPDATE,*/ { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },
111 /* CONT */ { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },
113 static int trans_on_event[][H2_SS_MAX] = {
114 /* H2_SEV_CLOSED_L*/{ S_XXX, S_ERR, S_ERR, S_CL_L, S_CLS, S_XXX, S_XXX, S_XXX, },
115 /* H2_SEV_CLOSED_R*/{ S_ERR, S_ERR, S_ERR, S_CL_R, S_ERR, S_CLS, S_NOP, S_NOP, },
116 /* H2_SEV_CANCELLED*/{S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, },
117 /* H2_SEV_EOS_SENT*/{ S_NOP, S_XXX, S_XXX, S_XXX, S_XXX, S_CLS, S_CLN, S_XXX, },
120 static int on_map(h2_stream_state_t state, int map[H2_SS_MAX])
134 static int on_frame(h2_stream_state_t state, int frame_type,
135 int frame_map[][H2_SS_MAX], apr_size_t maxlen)
137 ap_assert(frame_type >= 0);
138 ap_assert(state >= 0);
139 if (frame_type >= maxlen) {
140 return state; /* NOP */
142 return on_map(state, frame_map[frame_type]);
145 static int on_frame_send(h2_stream_state_t state, int frame_type)
147 return on_frame(state, frame_type, trans_on_send, H2_ALEN(trans_on_send));
150 static int on_frame_recv(h2_stream_state_t state, int frame_type)
152 return on_frame(state, frame_type, trans_on_recv, H2_ALEN(trans_on_recv));
155 static int on_event(h2_stream_state_t state, h2_stream_event_t ev)
157 return on_map(state, trans_on_event[ev]);
160 static void H2_STREAM_OUT_LOG(int lvl, h2_stream *s, const char *tag)
162 if (APLOG_C_IS_LEVEL(s->session->c, lvl)) {
163 conn_rec *c = s->session->c;
164 char buffer[4 * 1024];
165 apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]);
167 len = h2_util_bb_print(buffer, bmax, tag, "", s->out_buffer);
168 ap_log_cerror(APLOG_MARK, lvl, 0, c,
169 H2_STRM_MSG(s, "out-buffer(%s)"), len? buffer : "empty");
173 static apr_status_t close_input(h2_stream *stream)
175 conn_rec *c = stream->session->c;
177 apr_bucket_brigade *tmp;
180 if (h2_beam_is_closed(stream->input)) {
184 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
185 H2_STRM_MSG(stream, "closing input"));
186 if (stream->rst_error) {
187 return APR_ECONNRESET;
190 tmp = apr_brigade_create(stream->pool, c->bucket_alloc);
191 if (stream->trailers && !apr_is_empty_table(stream->trailers)) {
192 h2_headers *r = h2_headers_create(HTTP_OK, stream->trailers,
194 b = h2_bucket_headers_create(c->bucket_alloc, r);
195 APR_BRIGADE_INSERT_TAIL(tmp, b);
196 stream->trailers = NULL;
197 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
198 H2_STRM_MSG(stream, "added trailers"));
201 b = apr_bucket_eos_create(c->bucket_alloc);
202 APR_BRIGADE_INSERT_TAIL(tmp, b);
203 status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ);
204 apr_brigade_destroy(tmp);
205 h2_beam_close(stream->input);
209 static apr_status_t close_output(h2_stream *stream)
211 if (h2_beam_is_closed(stream->output)) {
214 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
215 H2_STRM_MSG(stream, "closing output"));
216 return h2_beam_leave(stream->output);
219 static void on_state_enter(h2_stream *stream)
221 if (stream->monitor && stream->monitor->on_state_enter) {
222 stream->monitor->on_state_enter(stream->monitor->ctx, stream);
226 static void on_state_event(h2_stream *stream, h2_stream_event_t ev)
228 if (stream->monitor && stream->monitor->on_state_event) {
229 stream->monitor->on_state_event(stream->monitor->ctx, stream, ev);
233 static void on_state_invalid(h2_stream *stream)
235 if (stream->monitor && stream->monitor->on_state_invalid) {
236 stream->monitor->on_state_invalid(stream->monitor->ctx, stream);
238 /* stream got an event/frame invalid in its state */
239 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
240 H2_STRM_MSG(stream, "invalid state event"));
241 switch (stream->state) {
247 h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
254 static apr_status_t transit(h2_stream *stream, int new_state)
256 if (new_state == stream->state) {
259 else if (new_state < 0) {
260 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c,
261 H2_STRM_LOG(APLOGNO(03081), stream, "invalid transition"));
262 on_state_invalid(stream);
266 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
267 H2_STRM_MSG(stream, "transit to [%s]"), h2_ss_str(new_state));
268 stream->state = new_state;
280 close_output(stream);
287 close_output(stream);
288 if (stream->out_buffer) {
289 apr_brigade_cleanup(stream->out_buffer);
295 on_state_enter(stream);
299 void h2_stream_set_monitor(h2_stream *stream, h2_stream_monitor *monitor)
301 stream->monitor = monitor;
304 void h2_stream_dispatch(h2_stream *stream, h2_stream_event_t ev)
308 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
309 H2_STRM_MSG(stream, "dispatch event %d"), ev);
310 new_state = on_event(stream->state, ev);
312 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c,
313 H2_STRM_LOG(APLOGNO(10002), stream, "invalid event %d"), ev);
314 on_state_invalid(stream);
315 AP_DEBUG_ASSERT(new_state > S_XXX);
318 else if (new_state == stream->state) {
320 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
321 H2_STRM_MSG(stream, "ignored event %d"), ev);
325 on_state_event(stream, ev);
326 transit(stream, new_state);
330 static void set_policy_for(h2_stream *stream, h2_request *r)
332 int enabled = h2_session_push_enabled(stream->session);
333 stream->push_policy = h2_push_policy_determine(r->headers, stream->pool,
335 r->serialize = h2_config_geti(stream->session->config, H2_CONF_SER_HEADERS);
338 apr_status_t h2_stream_send_frame(h2_stream *stream, int ftype, int flags)
340 apr_status_t status = APR_SUCCESS;
341 int new_state, eos = 0;
343 new_state = on_frame_send(stream->state, ftype);
345 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
346 H2_STRM_MSG(stream, "invalid frame %d send"), ftype);
347 AP_DEBUG_ASSERT(new_state > S_XXX);
348 return transit(stream, new_state);
353 eos = (flags & NGHTTP2_FLAG_END_STREAM);
356 case NGHTTP2_HEADERS:
357 eos = (flags & NGHTTP2_FLAG_END_STREAM);
360 case NGHTTP2_PUSH_PROMISE:
361 /* start pushed stream */
362 ap_assert(stream->request == NULL);
363 ap_assert(stream->rtmp != NULL);
364 status = h2_request_end_headers(stream->rtmp, stream->pool, 0);
365 if (status != APR_SUCCESS) {
368 set_policy_for(stream, stream->rtmp);
369 stream->request = stream->rtmp;
376 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
377 H2_STRM_MSG(stream, "send frame %d, eos=%d"), ftype, eos);
378 status = transit(stream, new_state);
379 if (status == APR_SUCCESS && eos) {
380 status = transit(stream, on_event(stream->state, H2_SEV_CLOSED_L));
385 apr_status_t h2_stream_recv_frame(h2_stream *stream, int ftype, int flags)
387 apr_status_t status = APR_SUCCESS;
388 int new_state, eos = 0;
390 new_state = on_frame_recv(stream->state, ftype);
392 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
393 H2_STRM_MSG(stream, "invalid frame %d recv"), ftype);
394 AP_DEBUG_ASSERT(new_state > S_XXX);
395 return transit(stream, new_state);
400 eos = (flags & NGHTTP2_FLAG_END_STREAM);
403 case NGHTTP2_HEADERS:
404 eos = (flags & NGHTTP2_FLAG_END_STREAM);
405 if (stream->state == H2_SS_OPEN) {
408 h2_stream_rst(stream, H2_ERR_PROTOCOL_ERROR);
413 ap_assert(stream->request == NULL);
414 ap_assert(stream->rtmp != NULL);
415 status = h2_request_end_headers(stream->rtmp, stream->pool, 0);
416 if (status != APR_SUCCESS) {
419 set_policy_for(stream, stream->rtmp);
420 stream->request = stream->rtmp;
428 status = transit(stream, new_state);
429 if (status == APR_SUCCESS && eos) {
430 status = transit(stream, on_event(stream->state, H2_SEV_CLOSED_R));
435 apr_status_t h2_stream_recv_DATA(h2_stream *stream, uint8_t flags,
436 const uint8_t *data, size_t len)
438 h2_session *session = stream->session;
439 apr_status_t status = APR_SUCCESS;
440 apr_bucket_brigade *tmp;
443 if (!stream->input) {
447 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
448 H2_STRM_MSG(stream, "recv DATA, len=%d"), (int)len);
450 tmp = apr_brigade_create(stream->pool, session->c->bucket_alloc);
451 apr_brigade_write(tmp, NULL, NULL, (const char *)data, len);
452 status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ);
453 apr_brigade_destroy(tmp);
455 stream->in_data_frames++;
456 stream->in_data_octets += len;
460 static void prep_output(h2_stream *stream) {
461 conn_rec *c = stream->session->c;
462 if (!stream->out_buffer) {
463 stream->out_buffer = apr_brigade_create(stream->pool, c->bucket_alloc);
467 h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session,
468 h2_stream_monitor *monitor, int initiated_on)
470 h2_stream *stream = apr_pcalloc(pool, sizeof(h2_stream));
473 stream->initiated_on = initiated_on;
474 stream->created = apr_time_now();
475 stream->state = H2_SS_IDLE;
477 stream->session = session;
478 stream->monitor = monitor;
479 stream->max_mem = session->max_stream_mem;
481 h2_beam_create(&stream->input, pool, id, "input", H2_BEAM_OWNER_SEND, 0);
482 h2_beam_send_from(stream->input, stream->pool);
483 h2_beam_create(&stream->output, pool, id, "output", H2_BEAM_OWNER_RECV, 0);
485 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
486 H2_STRM_LOG(APLOGNO(03082), stream, "created"));
487 on_state_enter(stream);
491 void h2_stream_cleanup(h2_stream *stream)
496 if (stream->out_buffer) {
497 /* remove any left over output buckets that may still have
498 * references into request pools */
499 apr_brigade_cleanup(stream->out_buffer);
501 h2_beam_abort(stream->input);
502 status = h2_beam_wait_empty(stream->input, APR_NONBLOCK_READ);
503 if (status == APR_EAGAIN) {
504 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
505 H2_STRM_MSG(stream, "wait on input drain"));
506 status = h2_beam_wait_empty(stream->input, APR_BLOCK_READ);
507 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c,
508 H2_STRM_MSG(stream, "input drain returned"));
512 void h2_stream_destroy(h2_stream *stream)
515 ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, stream->session->c,
516 H2_STRM_MSG(stream, "destroy"));
518 apr_pool_destroy(stream->pool);
523 apr_pool_t *h2_stream_detach_pool(h2_stream *stream)
525 apr_pool_t *pool = stream->pool;
530 void h2_stream_rst(h2_stream *stream, int error_code)
532 stream->rst_error = error_code;
533 h2_beam_abort(stream->input);
534 h2_beam_leave(stream->output);
535 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
536 H2_STRM_MSG(stream, "reset, error=%d"), error_code);
537 h2_stream_dispatch(stream, H2_SEV_CANCELLED);
540 apr_status_t h2_stream_set_request_rec(h2_stream *stream,
541 request_rec *r, int eos)
546 ap_assert(stream->request == NULL);
547 ap_assert(stream->rtmp == NULL);
548 if (stream->rst_error) {
549 return APR_ECONNRESET;
551 status = h2_request_rcreate(&req, stream->pool, r);
552 if (status == APR_SUCCESS) {
553 ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r,
554 H2_STRM_LOG(APLOGNO(03058), stream,
555 "set_request_rec %s host=%s://%s%s"),
556 req->method, req->scheme, req->authority, req->path);
558 /* simulate the frames that led to this */
559 return h2_stream_recv_frame(stream, NGHTTP2_HEADERS,
560 NGHTTP2_FLAG_END_STREAM);
565 void h2_stream_set_request(h2_stream *stream, const h2_request *r)
567 ap_assert(stream->request == NULL);
568 ap_assert(stream->rtmp == NULL);
569 stream->rtmp = h2_request_clone(stream->pool, r);
572 static void set_error_response(h2_stream *stream, int http_status)
574 if (!h2_stream_is_ready(stream)) {
575 conn_rec *c = stream->session->c;
577 h2_headers *response;
579 response = h2_headers_die(http_status, stream->request, stream->pool);
581 b = apr_bucket_eos_create(c->bucket_alloc);
582 APR_BRIGADE_INSERT_HEAD(stream->out_buffer, b);
583 b = h2_bucket_headers_create(c->bucket_alloc, response);
584 APR_BRIGADE_INSERT_HEAD(stream->out_buffer, b);
588 static apr_status_t add_trailer(h2_stream *stream,
589 const char *name, size_t nlen,
590 const char *value, size_t vlen)
592 conn_rec *c = stream->session->c;
593 char *hname, *hvalue;
595 if (nlen == 0 || name[0] == ':') {
596 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, APR_EINVAL, c,
597 H2_STRM_LOG(APLOGNO(03060), stream,
598 "pseudo header in trailer"));
601 if (h2_req_ignore_trailer(name, nlen)) {
604 if (!stream->trailers) {
605 stream->trailers = apr_table_make(stream->pool, 5);
607 hname = apr_pstrndup(stream->pool, name, nlen);
608 hvalue = apr_pstrndup(stream->pool, value, vlen);
609 h2_util_camel_case_header(hname, nlen);
610 apr_table_mergen(stream->trailers, hname, hvalue);
615 apr_status_t h2_stream_add_header(h2_stream *stream,
616 const char *name, size_t nlen,
617 const char *value, size_t vlen)
619 h2_session *session = stream->session;
623 if (stream->has_response) {
626 ++stream->request_headers_added;
627 if (name[0] == ':') {
628 if ((vlen) > session->s->limit_req_line) {
629 /* pseudo header: approximation of request line size check */
630 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
631 H2_STRM_MSG(stream, "pseudo %s too long"), name);
632 error = HTTP_REQUEST_URI_TOO_LARGE;
635 else if ((nlen + 2 + vlen) > session->s->limit_req_fieldsize) {
636 /* header too long */
637 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
638 H2_STRM_MSG(stream, "header %s too long"), name);
639 error = HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE;
642 if (stream->request_headers_added > session->s->limit_req_fields + 4) {
643 /* too many header lines, include 4 pseudo headers */
644 if (stream->request_headers_added
645 > session->s->limit_req_fields + 4 + 100) {
647 h2_stream_rst(stream, H2_ERR_ENHANCE_YOUR_CALM);
648 return APR_ECONNRESET;
650 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
651 H2_STRM_MSG(stream, "too many header lines"));
652 error = HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE;
656 set_error_response(stream, error);
659 else if (H2_SS_IDLE == stream->state) {
661 stream->rtmp = h2_req_create(stream->id, stream->pool,
662 NULL, NULL, NULL, NULL, NULL, 0);
664 status = h2_request_add_header(stream->rtmp, stream->pool,
665 name, nlen, value, vlen);
668 status = add_trailer(stream, name, nlen, value, vlen);
670 if (status != APR_SUCCESS) {
671 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
672 H2_STRM_MSG(stream, "header %s not accepted"), name);
673 h2_stream_dispatch(stream, H2_SEV_CANCELLED);
678 static apr_bucket *get_first_headers_bucket(apr_bucket_brigade *bb)
681 apr_bucket *b = APR_BRIGADE_FIRST(bb);
682 while (b != APR_BRIGADE_SENTINEL(bb)) {
683 if (H2_BUCKET_IS_HEADERS(b)) {
686 b = APR_BUCKET_NEXT(b);
692 apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen,
693 int *peos, h2_headers **presponse)
695 apr_status_t status = APR_SUCCESS;
696 apr_off_t requested, max_chunk = H2_DATA_CHUNK_SIZE;
704 if (stream->rst_error) {
707 return APR_ECONNRESET;
710 c = stream->session->c;
713 /* determine how much we'd like to send. We cannot send more than
714 * is requested. But we can reduce the size in case the master
715 * connection operates in smaller chunks. (TSL warmup) */
716 if (stream->session->io.write_size > 0) {
717 max_chunk = stream->session->io.write_size - 9; /* header bits */
719 *plen = requested = (*plen > 0)? H2MIN(*plen, max_chunk) : max_chunk;
721 h2_util_bb_avail(stream->out_buffer, plen, peos);
722 if (!*peos && *plen < requested && *plen < stream->max_mem) {
723 H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "pre");
724 status = h2_beam_receive(stream->output, stream->out_buffer,
725 APR_NONBLOCK_READ, stream->max_mem - *plen);
726 if (APR_STATUS_IS_EOF(status)) {
727 apr_bucket *eos = apr_bucket_eos_create(c->bucket_alloc);
728 APR_BRIGADE_INSERT_TAIL(stream->out_buffer, eos);
729 status = APR_SUCCESS;
731 else if (status == APR_EAGAIN) {
732 status = APR_SUCCESS;
735 h2_util_bb_avail(stream->out_buffer, plen, peos);
736 H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "post");
739 H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "ok");
742 b = APR_BRIGADE_FIRST(stream->out_buffer);
743 while (b != APR_BRIGADE_SENTINEL(stream->out_buffer)) {
744 e = APR_BUCKET_NEXT(b);
745 if (APR_BUCKET_IS_FLUSH(b)
746 || (!APR_BUCKET_IS_METADATA(b) && b->length == 0)) {
747 APR_BUCKET_REMOVE(b);
748 apr_bucket_destroy(b);
756 b = get_first_headers_bucket(stream->out_buffer);
758 /* there are HEADERS to submit */
761 if (b == APR_BRIGADE_FIRST(stream->out_buffer)) {
763 *presponse = h2_bucket_headers_get(b);
764 APR_BUCKET_REMOVE(b);
765 apr_bucket_destroy(b);
766 status = APR_SUCCESS;
769 /* someone needs to retrieve the response first */
770 h2_mplx_keep_active(stream->session->mplx, stream->id);
775 apr_bucket *e = APR_BRIGADE_FIRST(stream->out_buffer);
776 while (e != APR_BRIGADE_SENTINEL(stream->out_buffer)) {
780 else if (e->length != (apr_size_t)-1) {
783 e = APR_BUCKET_NEXT(e);
788 if (status == APR_SUCCESS) {
789 if (presponse && *presponse) {
790 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
791 H2_STRM_MSG(stream, "prepare, response %d"),
792 (*presponse)->status);
794 else if (*peos || *plen) {
795 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
796 H2_STRM_MSG(stream, "prepare, len=%ld eos=%d"),
801 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
802 H2_STRM_MSG(stream, "prepare, no data"));
808 static int is_not_headers(apr_bucket *b)
810 return !H2_BUCKET_IS_HEADERS(b);
813 apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb,
814 apr_off_t *plen, int *peos)
816 conn_rec *c = stream->session->c;
817 apr_status_t status = APR_SUCCESS;
819 if (stream->rst_error) {
820 return APR_ECONNRESET;
822 status = h2_append_brigade(bb, stream->out_buffer, plen, peos, is_not_headers);
823 if (status == APR_SUCCESS && !*peos && !*plen) {
826 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c,
827 H2_STRM_MSG(stream, "read_to, len=%ld eos=%d"),
833 apr_status_t h2_stream_submit_pushes(h2_stream *stream, h2_headers *response)
835 apr_status_t status = APR_SUCCESS;
836 apr_array_header_t *pushes;
839 pushes = h2_push_collect_update(stream, stream->request, response);
840 if (pushes && !apr_is_empty_array(pushes)) {
841 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
842 H2_STRM_MSG(stream, "found %d push candidates"),
844 for (i = 0; i < pushes->nelts; ++i) {
845 h2_push *push = APR_ARRAY_IDX(pushes, i, h2_push*);
846 h2_stream *s = h2_session_push(stream->session, stream, push);
848 status = APR_ECONNRESET;
856 apr_table_t *h2_stream_get_trailers(h2_stream *stream)
861 const h2_priority *h2_stream_get_priority(h2_stream *stream,
862 h2_headers *response)
864 if (response && stream->initiated_on) {
865 const char *ctype = apr_table_get(response->headers, "content-type");
867 /* FIXME: Not good enough, config needs to come from request->server */
868 return h2_config_get_priority(stream->session->config, ctype);
874 int h2_stream_is_ready(h2_stream *stream)
876 if (stream->has_response) {
879 else if (stream->out_buffer && get_first_headers_bucket(stream->out_buffer)) {
885 int h2_stream_was_closed(const h2_stream *stream)
887 switch (stream->state) {