1 /* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
17 #include <apr_strings.h>
18 #include <nghttp2/nghttp2.h>
20 #include <mpm_common.h>
22 #include <mod_proxy.h>
24 #include "mod_http2.h"
27 #include "h2_proxy_session.h"
29 APLOG_USE_MODULE(proxy_http2);
31 typedef struct h2_proxy_stream {
34 h2_proxy_session *session;
40 h2_stream_state_t state;
41 unsigned int suspended : 1;
42 unsigned int data_received : 1;
44 apr_bucket_brigade *input;
45 apr_bucket_brigade *output;
51 static void dispatch_event(h2_proxy_session *session, h2_proxys_event_t ev,
52 int arg, const char *msg);
55 static apr_status_t proxy_session_pre_close(void *theconn)
57 proxy_conn_rec *p_conn = (proxy_conn_rec *)theconn;
58 h2_proxy_session *session = p_conn->data;
60 if (session && session->ngh2) {
61 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
62 "proxy_session(%s): pool cleanup, state=%d, streams=%d",
63 session->id, session->state,
64 (int)h2_ihash_count(session->streams));
66 dispatch_event(session, H2_PROXYS_EV_PRE_CLOSE, 0, NULL);
67 nghttp2_session_del(session->ngh2);
74 static int proxy_pass_brigade(apr_bucket_alloc_t *bucket_alloc,
75 proxy_conn_rec *p_conn,
76 conn_rec *origin, apr_bucket_brigade *bb,
80 apr_off_t transferred;
83 apr_bucket *e = apr_bucket_flush_create(bucket_alloc);
84 APR_BRIGADE_INSERT_TAIL(bb, e);
86 apr_brigade_length(bb, 0, &transferred);
87 if (transferred != -1)
88 p_conn->worker->s->transferred += transferred;
89 status = ap_pass_brigade(origin->output_filters, bb);
90 /* Cleanup the brigade now to avoid buckets lifetime
91 * issues in case of error returned below. */
92 apr_brigade_cleanup(bb);
93 if (status != APR_SUCCESS) {
94 ap_log_cerror(APLOG_MARK, APLOG_ERR, status, origin, APLOGNO(03357)
95 "pass output failed to %pI (%s)",
96 p_conn->addr, p_conn->hostname);
101 static ssize_t raw_send(nghttp2_session *ngh2, const uint8_t *data,
102 size_t length, int flags, void *user_data)
104 h2_proxy_session *session = user_data;
110 b = apr_bucket_transient_create((const char*)data, length,
111 session->c->bucket_alloc);
112 APR_BRIGADE_INSERT_TAIL(session->output, b);
115 status = proxy_pass_brigade(session->c->bucket_alloc,
116 session->p_conn, session->c,
117 session->output, flush);
118 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
119 "h2_proxy_sesssion(%s): raw_send %d bytes, flush=%d",
120 session->id, (int)length, flush);
121 if (status != APR_SUCCESS) {
122 return NGHTTP2_ERR_CALLBACK_FAILURE;
127 static int on_frame_recv(nghttp2_session *ngh2, const nghttp2_frame *frame,
130 h2_proxy_session *session = user_data;
133 if (APLOGcdebug(session->c)) {
136 h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
137 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03341)
138 "h2_proxy_session(%s): recv FRAME[%s]",
139 session->id, buffer);
142 switch (frame->hd.type) {
143 case NGHTTP2_HEADERS:
145 case NGHTTP2_PUSH_PROMISE:
147 case NGHTTP2_SETTINGS:
148 if (frame->settings.niv > 0) {
149 n = nghttp2_session_get_remote_settings(ngh2, NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS);
151 session->remote_max_concurrent = n;
156 /* we expect the remote server to tell us the highest stream id
157 * that it has started processing. */
158 session->last_stream_id = frame->goaway.last_stream_id;
159 dispatch_event(session, H2_PROXYS_EV_REMOTE_GOAWAY, 0, NULL);
160 if (APLOGcinfo(session->c)) {
163 h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
164 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03342)
165 "h2_proxy_session(%s): recv FRAME[%s]",
166 session->id, buffer);
175 static int before_frame_send(nghttp2_session *ngh2,
176 const nghttp2_frame *frame, void *user_data)
178 h2_proxy_session *session = user_data;
179 if (APLOGcdebug(session->c)) {
182 h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
183 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03343)
184 "h2_proxy_session(%s): sent FRAME[%s]",
185 session->id, buffer);
190 static int add_header(void *table, const char *n, const char *v)
192 apr_table_addn(table, n, v);
196 static void process_proxy_header(request_rec *r, const char *n, const char *v)
198 static const struct {
200 ap_proxy_header_reverse_map_fn func;
201 } transform_hdrs[] = {
202 { "Location", ap_proxy_location_reverse_map },
203 { "Content-Location", ap_proxy_location_reverse_map },
204 { "URI", ap_proxy_location_reverse_map },
205 { "Destination", ap_proxy_location_reverse_map },
206 { "Set-Cookie", ap_proxy_cookie_reverse_map },
209 proxy_dir_conf *dconf;
212 for (i = 0; transform_hdrs[i].name; ++i) {
213 if (!ap_casecmpstr(transform_hdrs[i].name, n)) {
214 dconf = ap_get_module_config(r->per_dir_config, &proxy_module);
215 apr_table_add(r->headers_out, n,
216 (*transform_hdrs[i].func)(r, dconf, v));
220 apr_table_add(r->headers_out, n, v);
223 static apr_status_t h2_proxy_stream_add_header_out(h2_proxy_stream *stream,
224 const char *n, apr_size_t nlen,
225 const char *v, apr_size_t vlen)
228 if (!stream->data_received && !strncmp(":status", n, nlen)) {
229 char *s = apr_pstrndup(stream->r->pool, v, vlen);
231 apr_table_setn(stream->r->notes, "proxy-status", s);
232 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
233 "h2_proxy_stream(%s-%d): got status %s",
234 stream->session->id, stream->id, s);
235 stream->r->status = (int)apr_atoi64(s);
236 if (stream->r->status <= 0) {
237 stream->r->status = 500;
244 if (!h2_proxy_res_ignore_header(n, nlen)) {
245 char *hname, *hvalue;
247 hname = apr_pstrndup(stream->pool, n, nlen);
248 h2_util_camel_case_header(hname, nlen);
249 hvalue = apr_pstrndup(stream->pool, v, vlen);
251 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
252 "h2_proxy_stream(%s-%d): got header %s: %s",
253 stream->session->id, stream->id, hname, hvalue);
254 process_proxy_header(stream->r, hname, hvalue);
259 static int log_header(void *ctx, const char *key, const char *value)
261 h2_proxy_stream *stream = ctx;
262 ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, stream->r,
263 "h2_proxy_stream(%s-%d), header_out %s: %s",
264 stream->session->id, stream->id, key, value);
268 static void h2_proxy_stream_end_headers_out(h2_proxy_stream *stream)
270 h2_proxy_session *session = stream->session;
271 request_rec *r = stream->r;
272 apr_pool_t *p = r->pool;
274 /* Now, add in the cookies from the response to the ones already saved */
275 apr_table_do(add_header, stream->saves, r->headers_out, "Set-Cookie", NULL);
277 /* and now load 'em all in */
278 if (!apr_is_empty_table(stream->saves)) {
279 apr_table_unset(r->headers_out, "Set-Cookie");
280 r->headers_out = apr_table_overlay(p, r->headers_out, stream->saves);
283 /* handle Via header in response */
284 if (session->conf->viaopt != via_off
285 && session->conf->viaopt != via_block) {
286 const char *server_name = ap_get_server_name(stream->r);
287 apr_port_t port = ap_get_server_port(stream->r);
290 /* If USE_CANONICAL_NAME_OFF was configured for the proxy virtual host,
291 * then the server name returned by ap_get_server_name() is the
292 * origin server name (which does make too much sense with Via: headers)
293 * so we use the proxy vhost's name instead.
295 if (server_name == stream->r->hostname) {
296 server_name = stream->r->server->server_hostname;
298 if (ap_is_default_port(port, stream->r)) {
302 apr_snprintf(portstr, sizeof(portstr), ":%d", port);
305 /* create a "Via:" response header entry and merge it */
306 apr_table_addn(r->headers_out, "Via",
307 (session->conf->viaopt == via_full)
308 ? apr_psprintf(p, "%d.%d %s%s (%s)",
309 HTTP_VERSION_MAJOR(r->proto_num),
310 HTTP_VERSION_MINOR(r->proto_num),
311 server_name, portstr,
312 AP_SERVER_BASEVERSION)
313 : apr_psprintf(p, "%d.%d %s%s",
314 HTTP_VERSION_MAJOR(r->proto_num),
315 HTTP_VERSION_MINOR(r->proto_num),
316 server_name, portstr)
320 if (APLOGrtrace2(stream->r)) {
321 ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, stream->r,
322 "h2_proxy_stream(%s-%d), header_out after merging",
323 stream->session->id, stream->id);
324 apr_table_do(log_header, stream, stream->r->headers_out, NULL);
328 static int on_data_chunk_recv(nghttp2_session *ngh2, uint8_t flags,
329 int32_t stream_id, const uint8_t *data,
330 size_t len, void *user_data)
332 h2_proxy_session *session = user_data;
333 h2_proxy_stream *stream;
337 stream = nghttp2_session_get_stream_user_data(ngh2, stream_id);
339 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(03358)
340 "h2_proxy_session(%s): recv data chunk for "
341 "unknown stream %d, ignored",
342 session->id, stream_id);
346 if (!stream->data_received) {
347 /* last chance to manipulate response headers.
348 * after this, only trailers */
349 h2_proxy_stream_end_headers_out(stream);
350 stream->data_received = 1;
353 b = apr_bucket_transient_create((const char*)data, len,
354 stream->r->connection->bucket_alloc);
355 APR_BRIGADE_INSERT_TAIL(stream->output, b);
356 /* always flush after a DATA frame, as we have no other indication
358 b = apr_bucket_flush_create(stream->r->connection->bucket_alloc);
359 APR_BRIGADE_INSERT_TAIL(stream->output, b);
361 ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, stream->r, APLOGNO(03359)
362 "h2_proxy_session(%s): pass response data for "
363 "stream %d, %d bytes", session->id, stream_id, (int)len);
364 status = ap_pass_brigade(stream->r->output_filters, stream->output);
365 if (status != APR_SUCCESS) {
366 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, APLOGNO(03344)
367 "h2_proxy_session(%s): passing output on stream %d",
368 session->id, stream->id);
369 nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE,
370 stream_id, NGHTTP2_STREAM_CLOSED);
371 return NGHTTP2_ERR_STREAM_CLOSING;
376 static int on_stream_close(nghttp2_session *ngh2, int32_t stream_id,
377 uint32_t error_code, void *user_data)
379 h2_proxy_session *session = user_data;
380 if (!session->aborted) {
381 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03360)
382 "h2_proxy_session(%s): stream=%d, closed, err=%d",
383 session->id, stream_id, error_code);
384 dispatch_event(session, H2_PROXYS_EV_STREAM_DONE, stream_id, NULL);
389 static int on_header(nghttp2_session *ngh2, const nghttp2_frame *frame,
390 const uint8_t *namearg, size_t nlen,
391 const uint8_t *valuearg, size_t vlen, uint8_t flags,
394 h2_proxy_session *session = user_data;
395 h2_proxy_stream *stream;
396 const char *n = (const char*)namearg;
397 const char *v = (const char*)valuearg;
400 if (frame->hd.type == NGHTTP2_HEADERS && nlen) {
401 stream = nghttp2_session_get_stream_user_data(ngh2, frame->hd.stream_id);
403 if (h2_proxy_stream_add_header_out(stream, n, nlen, v, vlen)) {
404 return NGHTTP2_ERR_CALLBACK_FAILURE;
408 else if (frame->hd.type == NGHTTP2_PUSH_PROMISE) {
414 static ssize_t stream_data_read(nghttp2_session *ngh2, int32_t stream_id,
415 uint8_t *buf, size_t length,
416 uint32_t *data_flags,
417 nghttp2_data_source *source, void *user_data)
419 h2_proxy_stream *stream;
420 apr_status_t status = APR_SUCCESS;
423 stream = nghttp2_session_get_stream_user_data(ngh2, stream_id);
425 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(03361)
426 "h2_proxy_stream(%s): data_read, stream %d not found",
427 stream->session->id, stream_id);
428 return NGHTTP2_ERR_CALLBACK_FAILURE;
431 if (APR_BRIGADE_EMPTY(stream->input)) {
432 status = ap_get_brigade(stream->r->input_filters, stream->input,
433 AP_MODE_READBYTES, APR_NONBLOCK_READ,
434 H2MAX(APR_BUCKET_BUFF_SIZE, length));
435 ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, stream->r,
436 "h2_proxy_stream(%s-%d): request body read",
437 stream->session->id, stream->id);
440 if (status == APR_SUCCESS) {
442 while (status == APR_SUCCESS
443 && (readlen < length)
444 && !APR_BRIGADE_EMPTY(stream->input)) {
445 apr_bucket* b = APR_BRIGADE_FIRST(stream->input);
446 if (APR_BUCKET_IS_METADATA(b)) {
447 if (APR_BUCKET_IS_EOS(b)) {
448 *data_flags |= NGHTTP2_DATA_FLAG_EOF;
451 /* we do nothing more regarding any meta here */
455 const char *bdata = NULL;
457 status = apr_bucket_read(b, &bdata, &blen, APR_BLOCK_READ);
459 if (status == APR_SUCCESS && blen > 0) {
460 ssize_t copylen = H2MIN(length - readlen, blen);
461 memcpy(buf, bdata, copylen);
464 if (copylen < blen) {
465 /* We have data left in the bucket. Split it. */
466 status = apr_bucket_split(b, copylen);
470 apr_bucket_delete(b);
473 ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, stream->r,
474 "h2_proxy_stream(%d): request body read %ld bytes, flags=%d",
475 stream->id, (long)readlen, (int)*data_flags);
478 else if (APR_STATUS_IS_EAGAIN(status)) {
479 /* suspended stream, needs to be re-awakened */
480 ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, stream->r,
481 "h2_proxy_stream(%s-%d): suspending",
482 stream->session->id, stream_id);
483 stream->suspended = 1;
484 h2_iq_add(stream->session->suspended, stream->id, NULL, NULL);
485 return NGHTTP2_ERR_DEFERRED;
488 nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE,
489 stream_id, NGHTTP2_STREAM_CLOSED);
490 return NGHTTP2_ERR_STREAM_CLOSING;
494 h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn,
495 proxy_server_conf *conf,
496 unsigned char window_bits_connection,
497 unsigned char window_bits_stream,
498 h2_proxy_request_done *done)
501 apr_pool_t *pool = p_conn->scpool;
502 h2_proxy_session *session;
503 nghttp2_session_callbacks *cbs;
504 nghttp2_option *option;
506 session = apr_pcalloc(pool, sizeof(*session));
507 apr_pool_pre_cleanup_register(pool, p_conn, proxy_session_pre_close);
508 p_conn->data = session;
510 session->id = apr_pstrdup(p_conn->scpool, id);
511 session->c = p_conn->connection;
512 session->p_conn = p_conn;
513 session->conf = conf;
514 session->pool = p_conn->scpool;
515 session->state = H2_PROXYS_ST_INIT;
516 session->window_bits_stream = window_bits_stream;
517 session->window_bits_connection = window_bits_connection;
518 session->streams = h2_ihash_create(pool, offsetof(h2_proxy_stream, id));
519 session->suspended = h2_iq_create(pool, 5);
520 session->done = done;
522 session->input = apr_brigade_create(session->pool, session->c->bucket_alloc);
523 session->output = apr_brigade_create(session->pool, session->c->bucket_alloc);
525 nghttp2_session_callbacks_new(&cbs);
526 nghttp2_session_callbacks_set_on_frame_recv_callback(cbs, on_frame_recv);
527 nghttp2_session_callbacks_set_on_data_chunk_recv_callback(cbs, on_data_chunk_recv);
528 nghttp2_session_callbacks_set_on_stream_close_callback(cbs, on_stream_close);
529 nghttp2_session_callbacks_set_on_header_callback(cbs, on_header);
530 nghttp2_session_callbacks_set_before_frame_send_callback(cbs, before_frame_send);
531 nghttp2_session_callbacks_set_send_callback(cbs, raw_send);
533 nghttp2_option_new(&option);
534 nghttp2_option_set_peer_max_concurrent_streams(option, 100);
535 nghttp2_option_set_no_auto_window_update(option, 1);
537 nghttp2_session_client_new2(&session->ngh2, cbs, session, option);
539 nghttp2_option_del(option);
540 nghttp2_session_callbacks_del(cbs);
542 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03362)
543 "setup session for %s", p_conn->hostname);
548 static apr_status_t session_start(h2_proxy_session *session)
550 nghttp2_settings_entry settings[2];
551 int rv, add_conn_window;
554 s = ap_get_conn_socket(session->c);
555 #if (!defined(WIN32) && !defined(NETWARE)) || defined(DOXYGEN)
557 ap_sock_disable_nagle(s);
561 settings[0].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH;
562 settings[0].value = 0;
563 settings[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
564 settings[1].value = (1 << session->window_bits_stream) - 1;
566 rv = nghttp2_submit_settings(session->ngh2, NGHTTP2_FLAG_NONE, settings,
569 /* If the connection window is larger than our default, trigger a WINDOW_UPDATE */
570 add_conn_window = ((1 << session->window_bits_connection) - 1 -
571 NGHTTP2_INITIAL_CONNECTION_WINDOW_SIZE);
572 if (!rv && add_conn_window != 0) {
573 rv = nghttp2_submit_window_update(session->ngh2, NGHTTP2_FLAG_NONE, 0, add_conn_window);
575 return rv? APR_EGENERAL : APR_SUCCESS;
578 static apr_status_t open_stream(h2_proxy_session *session, const char *url,
579 request_rec *r, h2_proxy_stream **pstream)
581 h2_proxy_stream *stream;
583 const char *authority, *scheme, *path;
586 stream = apr_pcalloc(r->pool, sizeof(*stream));
588 stream->pool = r->pool;
591 stream->session = session;
592 stream->state = H2_STREAM_ST_IDLE;
594 stream->input = apr_brigade_create(stream->pool, session->c->bucket_alloc);
595 stream->output = apr_brigade_create(stream->pool, session->c->bucket_alloc);
597 stream->req = h2_req_create(1, stream->pool, 0);
599 status = apr_uri_parse(stream->pool, url, &puri);
600 if (status != APR_SUCCESS)
603 scheme = (strcmp(puri.scheme, "h2")? "http" : "https");
604 authority = puri.hostname;
605 if (!ap_strchr_c(authority, ':') && puri.port
606 && apr_uri_port_of_scheme(scheme) != puri.port) {
607 /* port info missing and port is not default for scheme: append */
608 authority = apr_psprintf(stream->pool, "%s:%d", authority, puri.port);
610 path = apr_uri_unparse(stream->pool, &puri, APR_URI_UNP_OMITSITEPART);
611 h2_req_make(stream->req, stream->pool, r->method, scheme,
612 authority, path, r->headers_in);
614 /* Tuck away all already existing cookies */
615 stream->saves = apr_table_make(r->pool, 2);
616 apr_table_do(add_header, stream->saves, r->headers_out,"Set-Cookie", NULL);
623 static apr_status_t submit_stream(h2_proxy_session *session, h2_proxy_stream *stream)
626 nghttp2_data_provider *pp = NULL;
627 nghttp2_data_provider provider;
631 hd = h2_util_ngheader_make_req(stream->pool, stream->req);
633 status = ap_get_brigade(stream->r->input_filters, stream->input,
634 AP_MODE_READBYTES, APR_NONBLOCK_READ,
635 APR_BUCKET_BUFF_SIZE);
636 if ((status == APR_SUCCESS && !APR_BUCKET_IS_EOS(APR_BRIGADE_FIRST(stream->input)))
637 || APR_STATUS_IS_EAGAIN(status)) {
638 /* there might be data coming */
639 provider.source.fd = 0;
640 provider.source.ptr = NULL;
641 provider.read_callback = stream_data_read;
645 rv = nghttp2_submit_request(session->ngh2, NULL,
646 hd->nv, hd->nvlen, pp, stream);
648 if (APLOGcdebug(session->c)) {
649 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03363)
650 "h2_proxy_session(%s): submit %s%s -> %d",
651 session->id, stream->req->authority, stream->req->path,
657 stream->state = H2_STREAM_ST_OPEN;
658 h2_ihash_add(session->streams, stream);
659 dispatch_event(session, H2_PROXYS_EV_STREAM_SUBMITTED, rv, NULL);
666 static apr_status_t feed_brigade(h2_proxy_session *session, apr_bucket_brigade *bb)
668 apr_status_t status = APR_SUCCESS;
669 apr_size_t readlen = 0;
672 while (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) {
673 apr_bucket* b = APR_BRIGADE_FIRST(bb);
675 if (APR_BUCKET_IS_METADATA(b)) {
679 const char *bdata = NULL;
682 status = apr_bucket_read(b, &bdata, &blen, APR_BLOCK_READ);
683 if (status == APR_SUCCESS && blen > 0) {
684 n = nghttp2_session_mem_recv(session->ngh2, (const uint8_t *)bdata, blen);
685 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
686 "h2_proxy_session(%s): feeding %ld bytes -> %ld",
687 session->id, (long)blen, (long)n);
689 if (nghttp2_is_fatal((int)n)) {
690 status = APR_EGENERAL;
696 apr_bucket_split(b, n);
701 apr_bucket_delete(b);
704 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
705 "h2_proxy_session(%s): fed %ld bytes of input to session",
706 session->id, (long)readlen);
707 if (readlen == 0 && status == APR_SUCCESS) {
713 static apr_status_t h2_proxy_session_read(h2_proxy_session *session, int block,
714 apr_interval_time_t timeout)
716 apr_status_t status = APR_SUCCESS;
718 if (APR_BRIGADE_EMPTY(session->input)) {
719 apr_socket_t *socket = NULL;
720 apr_time_t save_timeout = -1;
723 socket = ap_get_conn_socket(session->c);
725 apr_socket_timeout_get(socket, &save_timeout);
726 apr_socket_timeout_set(socket, timeout);
729 /* cannot block on timeout */
730 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, session->c, APLOGNO(03379)
731 "h2_proxy_session(%s): unable to get conn socket",
737 status = ap_get_brigade(session->c->input_filters, session->input,
739 block? APR_BLOCK_READ : APR_NONBLOCK_READ,
741 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
742 "h2_proxy_session(%s): read from conn", session->id);
743 if (socket && save_timeout != -1) {
744 apr_socket_timeout_set(socket, save_timeout);
748 if (status == APR_SUCCESS) {
749 status = feed_brigade(session, session->input);
751 else if (APR_STATUS_IS_TIMEUP(status)) {
754 else if (!APR_STATUS_IS_EAGAIN(status)) {
755 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, APLOGNO(03380)
756 "h2_proxy_session(%s): read error", session->id);
757 dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, status, NULL);
763 apr_status_t h2_proxy_session_submit(h2_proxy_session *session,
764 const char *url, request_rec *r)
766 h2_proxy_stream *stream;
769 status = open_stream(session, url, r, &stream);
770 if (status == APR_SUCCESS) {
771 ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(03381)
772 "process stream(%d): %s %s%s, original: %s",
773 stream->id, stream->req->method,
774 stream->req->authority, stream->req->path,
776 status = submit_stream(session, stream);
781 static apr_status_t check_suspended(h2_proxy_session *session)
783 h2_proxy_stream *stream;
787 for (i = 0; i < session->suspended->nelts; ++i) {
788 stream_id = session->suspended->elts[i];
789 stream = nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
791 status = ap_get_brigade(stream->r->input_filters, stream->input,
792 AP_MODE_READBYTES, APR_NONBLOCK_READ,
793 APR_BUCKET_BUFF_SIZE);
794 if (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(stream->input)) {
795 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
796 "h2_proxy_stream(%s-%d): resuming",
797 session->id, stream_id);
798 stream->suspended = 0;
799 h2_iq_remove(session->suspended, stream_id);
800 nghttp2_session_resume_data(session->ngh2, stream_id);
801 dispatch_event(session, H2_PROXYS_EV_STREAM_RESUMED, 0, NULL);
802 check_suspended(session);
805 else if (status != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(status)) {
806 ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, session->c,
807 APLOGNO(03382) "h2_proxy_stream(%s-%d): check input",
808 session->id, stream_id);
809 h2_iq_remove(session->suspended, stream_id);
810 dispatch_event(session, H2_PROXYS_EV_STREAM_RESUMED, 0, NULL);
811 check_suspended(session);
817 h2_iq_remove(session->suspended, stream_id);
818 check_suspended(session);
825 static apr_status_t session_shutdown(h2_proxy_session *session, int reason,
828 apr_status_t status = APR_SUCCESS;
829 const char *err = msg;
831 AP_DEBUG_ASSERT(session);
832 if (!err && reason) {
833 err = nghttp2_strerror(reason);
835 nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, 0,
836 reason, (uint8_t*)err, err? strlen(err):0);
837 status = nghttp2_session_send(session->ngh2);
838 dispatch_event(session, H2_PROXYS_EV_LOCAL_GOAWAY, reason, err);
843 static const char *StateNames[] = {
844 "INIT", /* H2_PROXYS_ST_INIT */
845 "DONE", /* H2_PROXYS_ST_DONE */
846 "IDLE", /* H2_PROXYS_ST_IDLE */
847 "BUSY", /* H2_PROXYS_ST_BUSY */
848 "WAIT", /* H2_PROXYS_ST_WAIT */
849 "LSHUTDOWN", /* H2_PROXYS_ST_LOCAL_SHUTDOWN */
850 "RSHUTDOWN", /* H2_PROXYS_ST_REMOTE_SHUTDOWN */
853 static const char *state_name(h2_proxys_state state)
855 if (state >= (sizeof(StateNames)/sizeof(StateNames[0]))) {
858 return StateNames[state];
861 static int is_accepting_streams(h2_proxy_session *session)
863 switch (session->state) {
864 case H2_PROXYS_ST_IDLE:
865 case H2_PROXYS_ST_BUSY:
866 case H2_PROXYS_ST_WAIT:
873 static void transit(h2_proxy_session *session, const char *action,
874 h2_proxys_state nstate)
876 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03345)
877 "h2_proxy_session(%s): transit [%s] -- %s --> [%s]", session->id,
878 state_name(session->state), action, state_name(nstate));
879 session->state = nstate;
882 static void ev_init(h2_proxy_session *session, int arg, const char *msg)
884 switch (session->state) {
885 case H2_PROXYS_ST_INIT:
886 if (h2_ihash_empty(session->streams)) {
887 transit(session, "init", H2_PROXYS_ST_IDLE);
890 transit(session, "init", H2_PROXYS_ST_BUSY);
900 static void ev_local_goaway(h2_proxy_session *session, int arg, const char *msg)
902 switch (session->state) {
903 case H2_PROXYS_ST_LOCAL_SHUTDOWN:
904 /* already did that? */
906 case H2_PROXYS_ST_IDLE:
907 case H2_PROXYS_ST_REMOTE_SHUTDOWN:
909 transit(session, "local goaway", H2_PROXYS_ST_DONE);
912 transit(session, "local goaway", H2_PROXYS_ST_LOCAL_SHUTDOWN);
917 static void ev_remote_goaway(h2_proxy_session *session, int arg, const char *msg)
919 switch (session->state) {
920 case H2_PROXYS_ST_REMOTE_SHUTDOWN:
921 /* already received that? */
923 case H2_PROXYS_ST_IDLE:
924 case H2_PROXYS_ST_LOCAL_SHUTDOWN:
926 transit(session, "remote goaway", H2_PROXYS_ST_DONE);
929 transit(session, "remote goaway", H2_PROXYS_ST_REMOTE_SHUTDOWN);
934 static void ev_conn_error(h2_proxy_session *session, int arg, const char *msg)
936 switch (session->state) {
937 case H2_PROXYS_ST_INIT:
938 case H2_PROXYS_ST_DONE:
939 case H2_PROXYS_ST_LOCAL_SHUTDOWN:
941 transit(session, "conn error", H2_PROXYS_ST_DONE);
945 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, arg, session->c,
946 "h2_proxy_session(%s): conn error -> shutdown", session->id);
947 session_shutdown(session, arg, msg);
952 static void ev_proto_error(h2_proxy_session *session, int arg, const char *msg)
954 switch (session->state) {
955 case H2_PROXYS_ST_DONE:
956 case H2_PROXYS_ST_LOCAL_SHUTDOWN:
958 transit(session, "proto error", H2_PROXYS_ST_DONE);
962 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
963 "h2_proxy_session(%s): proto error -> shutdown", session->id);
964 session_shutdown(session, arg, msg);
969 static void ev_conn_timeout(h2_proxy_session *session, int arg, const char *msg)
971 switch (session->state) {
972 case H2_PROXYS_ST_LOCAL_SHUTDOWN:
973 transit(session, "conn timeout", H2_PROXYS_ST_DONE);
976 session_shutdown(session, arg, msg);
977 transit(session, "conn timeout", H2_PROXYS_ST_DONE);
982 static void ev_no_io(h2_proxy_session *session, int arg, const char *msg)
984 switch (session->state) {
985 case H2_PROXYS_ST_BUSY:
986 case H2_PROXYS_ST_LOCAL_SHUTDOWN:
987 case H2_PROXYS_ST_REMOTE_SHUTDOWN:
988 /* nothing for input and output to do. If we remain
989 * in this state, we go into a tight loop and suck up
990 * CPU cycles. Ideally, we'd like to do a blocking read, but that
991 * is not possible if we have scheduled tasks and wait
992 * for them to produce something. */
993 if (h2_ihash_empty(session->streams)) {
994 if (!is_accepting_streams(session)) {
995 /* We are no longer accepting new streams and have
996 * finished processing existing ones. Time to leave. */
997 session_shutdown(session, arg, msg);
998 transit(session, "no io", H2_PROXYS_ST_DONE);
1001 /* When we have no streams, no task events are possible,
1002 * switch to blocking reads */
1003 transit(session, "no io", H2_PROXYS_ST_IDLE);
1007 /* Unable to do blocking reads, as we wait on events from
1008 * task processing in other threads. Do a busy wait with
1010 transit(session, "no io", H2_PROXYS_ST_WAIT);
1019 static void ev_stream_submitted(h2_proxy_session *session, int stream_id,
1022 switch (session->state) {
1023 case H2_PROXYS_ST_IDLE:
1024 case H2_PROXYS_ST_WAIT:
1025 transit(session, "stream submitted", H2_PROXYS_ST_BUSY);
1033 static void ev_stream_done(h2_proxy_session *session, int stream_id,
1036 h2_proxy_stream *stream;
1038 stream = nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
1040 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03364)
1041 "h2_proxy_sesssion(%s): stream(%d) closed",
1042 session->id, stream_id);
1044 if (!stream->data_received) {
1046 /* if the response had no body, this is the time to flush
1047 * an empty brigade which will also "write" the resonse
1049 h2_proxy_stream_end_headers_out(stream);
1050 stream->data_received = 1;
1051 b = apr_bucket_flush_create(stream->r->connection->bucket_alloc);
1052 APR_BRIGADE_INSERT_TAIL(stream->output, b);
1053 b = apr_bucket_eos_create(stream->r->connection->bucket_alloc);
1054 APR_BRIGADE_INSERT_TAIL(stream->output, b);
1055 ap_pass_brigade(stream->r->output_filters, stream->output);
1058 stream->state = H2_STREAM_ST_CLOSED;
1059 h2_ihash_remove(session->streams, stream_id);
1060 h2_iq_remove(session->suspended, stream_id);
1061 if (session->done) {
1062 session->done(session, stream->r, 1, 1);
1066 switch (session->state) {
1073 static void ev_stream_resumed(h2_proxy_session *session, int arg, const char *msg)
1075 switch (session->state) {
1076 case H2_PROXYS_ST_WAIT:
1077 transit(session, "stream resumed", H2_PROXYS_ST_BUSY);
1085 static void ev_data_read(h2_proxy_session *session, int arg, const char *msg)
1087 switch (session->state) {
1088 case H2_PROXYS_ST_IDLE:
1089 case H2_PROXYS_ST_WAIT:
1090 transit(session, "data read", H2_PROXYS_ST_BUSY);
1098 static void ev_ngh2_done(h2_proxy_session *session, int arg, const char *msg)
1100 switch (session->state) {
1101 case H2_PROXYS_ST_DONE:
1105 transit(session, "nghttp2 done", H2_PROXYS_ST_DONE);
1110 static void ev_pre_close(h2_proxy_session *session, int arg, const char *msg)
1112 switch (session->state) {
1113 case H2_PROXYS_ST_DONE:
1114 case H2_PROXYS_ST_LOCAL_SHUTDOWN:
1118 session_shutdown(session, arg, msg);
1123 static void dispatch_event(h2_proxy_session *session, h2_proxys_event_t ev,
1124 int arg, const char *msg)
1127 case H2_PROXYS_EV_INIT:
1128 ev_init(session, arg, msg);
1130 case H2_PROXYS_EV_LOCAL_GOAWAY:
1131 ev_local_goaway(session, arg, msg);
1133 case H2_PROXYS_EV_REMOTE_GOAWAY:
1134 ev_remote_goaway(session, arg, msg);
1136 case H2_PROXYS_EV_CONN_ERROR:
1137 ev_conn_error(session, arg, msg);
1139 case H2_PROXYS_EV_PROTO_ERROR:
1140 ev_proto_error(session, arg, msg);
1142 case H2_PROXYS_EV_CONN_TIMEOUT:
1143 ev_conn_timeout(session, arg, msg);
1145 case H2_PROXYS_EV_NO_IO:
1146 ev_no_io(session, arg, msg);
1148 case H2_PROXYS_EV_STREAM_SUBMITTED:
1149 ev_stream_submitted(session, arg, msg);
1151 case H2_PROXYS_EV_STREAM_DONE:
1152 ev_stream_done(session, arg, msg);
1154 case H2_PROXYS_EV_STREAM_RESUMED:
1155 ev_stream_resumed(session, arg, msg);
1157 case H2_PROXYS_EV_DATA_READ:
1158 ev_data_read(session, arg, msg);
1160 case H2_PROXYS_EV_NGH2_DONE:
1161 ev_ngh2_done(session, arg, msg);
1163 case H2_PROXYS_EV_PRE_CLOSE:
1164 ev_pre_close(session, arg, msg);
1167 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
1168 "h2_proxy_session(%s): unknown event %d",
1174 apr_status_t h2_proxy_session_process(h2_proxy_session *session)
1176 apr_status_t status;
1177 int have_written = 0, have_read = 0;
1179 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
1180 "h2_proxy_session(%s): process", session->id);
1183 switch (session->state) {
1184 case H2_PROXYS_ST_INIT:
1185 status = session_start(session);
1186 if (status == APR_SUCCESS) {
1187 dispatch_event(session, H2_PROXYS_EV_INIT, 0, NULL);
1191 dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, status, NULL);
1195 case H2_PROXYS_ST_BUSY:
1196 case H2_PROXYS_ST_LOCAL_SHUTDOWN:
1197 case H2_PROXYS_ST_REMOTE_SHUTDOWN:
1198 while (nghttp2_session_want_write(session->ngh2)) {
1199 int rv = nghttp2_session_send(session->ngh2);
1200 if (rv < 0 && nghttp2_is_fatal(rv)) {
1201 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
1202 "h2_proxy_session(%s): write, rv=%d", session->id, rv);
1203 dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, rv, NULL);
1209 if (nghttp2_session_want_read(session->ngh2)) {
1210 status = h2_proxy_session_read(session, 0, 0);
1211 if (status == APR_SUCCESS) {
1216 if (!have_written && !have_read
1217 && !nghttp2_session_want_write(session->ngh2)) {
1218 dispatch_event(session, H2_PROXYS_EV_NO_IO, 0, NULL);
1223 case H2_PROXYS_ST_WAIT:
1224 if (check_suspended(session) == APR_EAGAIN) {
1225 /* no stream has become resumed. Do a blocking read with
1226 * ever increasing timeouts... */
1227 if (session->wait_timeout < 25) {
1228 session->wait_timeout = 25;
1231 session->wait_timeout = H2MIN(apr_time_from_msec(100),
1232 2*session->wait_timeout);
1235 status = h2_proxy_session_read(session, 1, session->wait_timeout);
1236 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
1238 "h2_proxy_session(%s): WAIT read, timeout=%fms",
1239 session->id, (float)session->wait_timeout/1000.0);
1240 if (status == APR_SUCCESS) {
1242 dispatch_event(session, H2_PROXYS_EV_DATA_READ, 0, NULL);
1244 else if (APR_STATUS_IS_TIMEUP(status)
1245 || APR_STATUS_IS_EAGAIN(status)) {
1246 /* go back to checking all inputs again */
1247 transit(session, "wait cycle", H2_PROXYS_ST_BUSY);
1252 case H2_PROXYS_ST_IDLE:
1255 case H2_PROXYS_ST_DONE: /* done, session terminated */
1259 ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EGENERAL, session->c,
1260 APLOGNO(03346)"h2_proxy_session(%s): unknown state %d",
1261 session->id, session->state);
1262 dispatch_event(session, H2_PROXYS_EV_PROTO_ERROR, 0, NULL);
1267 if (have_read || have_written) {
1268 session->wait_timeout = 0;
1271 if (!nghttp2_session_want_read(session->ngh2)
1272 && !nghttp2_session_want_write(session->ngh2)) {
1273 dispatch_event(session, H2_PROXYS_EV_NGH2_DONE, 0, NULL);
1276 return APR_SUCCESS; /* needs to be called again */
1280 h2_proxy_session *session;
1281 h2_proxy_request_done *done;
1284 static int done_iter(void *udata, void *val)
1286 cleanup_iter_ctx *ctx = udata;
1287 h2_proxy_stream *stream = val;
1288 int touched = (!ctx->session->last_stream_id ||
1289 stream->id <= ctx->session->last_stream_id);
1290 ctx->done(ctx->session, stream->r, 0, touched);
1294 void h2_proxy_session_cleanup(h2_proxy_session *session,
1295 h2_proxy_request_done *done)
1297 if (session->streams && !h2_ihash_empty(session->streams)) {
1298 cleanup_iter_ctx ctx;
1299 ctx.session = session;
1301 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03366)
1302 "h2_proxy_session(%s): terminated, %d streams unfinished",
1303 session->id, (int)h2_ihash_count(session->streams));
1304 h2_ihash_iter(session->streams, done_iter, &ctx);
1305 h2_ihash_clear(session->streams);
1310 h2_proxy_session *session;
1316 static int win_update_iter(void *udata, void *val)
1318 win_update_ctx *ctx = udata;
1319 h2_proxy_stream *stream = val;
1321 if (stream->r && stream->r->connection == ctx->c) {
1322 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, ctx->session->c,
1323 "h2_proxy_session(%s-%d): win_update %ld bytes",
1324 ctx->session->id, (int)stream->id, (long)ctx->bytes);
1325 nghttp2_session_consume(ctx->session->ngh2, stream->id, ctx->bytes);
1333 void h2_proxy_session_update_window(h2_proxy_session *session,
1334 conn_rec *c, apr_off_t bytes)
1336 if (session->streams && !h2_ihash_empty(session->streams)) {
1338 ctx.session = session;
1342 h2_ihash_iter(session->streams, win_update_iter, &ctx);
1345 /* could not find the stream any more, possibly closed, update
1346 * the connection window at least */
1347 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
1348 "h2_proxy_session(%s): win_update conn %ld bytes",
1349 session->id, (long)bytes);
1350 nghttp2_session_consume_connection(session->ngh2, (size_t)bytes);