--- /dev/null
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <stddef.h>
+#include <apr_strings.h>
+#include <nghttp2/nghttp2.h>
+#include <mpm_common.h>
+#include <httpd.h>
+#include <mod_proxy.h>
+#include "mod_http2.h"
+#include "h2.h"
+#include "h2_int_queue.h"
+#include "h2_request.h"
+#include "h2_util.h"
+#include "h2_proxy_session.h"
+typedef struct h2_proxy_stream {
+ int id;
+ apr_pool_t *pool;
+ h2_proxy_session *session;
+ const char *url;
+ request_rec *r;
+ h2_request *req;
+ h2_stream_state_t state;
+ unsigned int suspended : 1;
+ unsigned int data_received : 1;
+ apr_bucket_brigade *input;
+ apr_bucket_brigade *output;
+ apr_table_t *saves;
+} h2_proxy_stream;
+static void dispatch_event(h2_proxy_session *session, h2_proxys_event_t ev,
+ int arg, const char *msg);
+static apr_status_t proxy_session_pre_close(void *theconn)
+ proxy_conn_rec *p_conn = (proxy_conn_rec *)theconn;
+ h2_proxy_session *session = p_conn->data;
+ if (session && session->ngh2) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+ "proxy_session(%s): pool cleanup, state=%d, streams=%d",
+ session->id, session->state,
+ (int)h2_ihash_count(session->streams));
+ session->aborted = 1;
+ dispatch_event(session, H2_PROXYS_EV_PRE_CLOSE, 0, NULL);
+ nghttp2_session_del(session->ngh2);
+ session->ngh2 = NULL;
+ p_conn->data = NULL;
+ }
+ return APR_SUCCESS;
+static int proxy_pass_brigade(apr_bucket_alloc_t *bucket_alloc,
+ proxy_conn_rec *p_conn,
+ conn_rec *origin, apr_bucket_brigade *bb,
+ int flush)
+ apr_status_t status;
+ apr_off_t transferred;
+ if (flush) {
+ apr_bucket *e = apr_bucket_flush_create(bucket_alloc);
+ }
+ apr_brigade_length(bb, 0, &transferred);
+ if (transferred != -1)
+ p_conn->worker->s->transferred += transferred;
+ status = ap_pass_brigade(origin->output_filters, bb);
+ /* Cleanup the brigade now to avoid buckets lifetime
+ * issues in case of error returned below. */
+ apr_brigade_cleanup(bb);
+ if (status != APR_SUCCESS) {
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, status, origin, APLOGNO(03357)
+ "pass output failed to %pI (%s)",
+ p_conn->addr, p_conn->hostname);
+ }
+ return status;
+static ssize_t raw_send(nghttp2_session *ngh2, const uint8_t *data,
+ size_t length, int flags, void *user_data)
+ h2_proxy_session *session = user_data;
+ apr_bucket *b;
+ apr_status_t status;
+ int flush = 1;
+ if (data) {
+ b = apr_bucket_transient_create((const char*)data, length,
+ session->c->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(session->output, b);
+ }
+ status = proxy_pass_brigade(session->c->bucket_alloc,
+ session->p_conn, session->c,
+ session->output, flush);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
+ "h2_proxy_sesssion(%s): raw_send %d bytes, flush=%d",
+ session->id, (int)length, flush);
+ if (status != APR_SUCCESS) {
+ }
+ return length;
+static int on_frame_recv(nghttp2_session *ngh2, const nghttp2_frame *frame,
+ void *user_data)
+ h2_proxy_session *session = user_data;
+ int n;
+ if (APLOGcdebug(session->c)) {
+ char buffer[256];
+ h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03341)
+ "h2_proxy_session(%s): recv FRAME[%s]",
+ session->id, buffer);
+ }
+ switch (frame->hd.type) {
+ break;
+ break;
+ if (frame->settings.niv > 0) {
+ n = nghttp2_session_get_remote_settings(ngh2, NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS);
+ if (n > 0) {
+ session->remote_max_concurrent = n;
+ }
+ }
+ break;
+ /* we expect the remote server to tell us the highest stream id
+ * that it has started processing. */
+ session->last_stream_id = frame->goaway.last_stream_id;
+ dispatch_event(session, H2_PROXYS_EV_REMOTE_GOAWAY, 0, NULL);
+ if (APLOGcinfo(session->c)) {
+ char buffer[256];
+ h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03342)
+ "h2_proxy_session(%s): recv FRAME[%s]",
+ session->id, buffer);
+ }
+ break;
+ default:
+ break;
+ }
+ return 0;
+static int before_frame_send(nghttp2_session *ngh2,
+ const nghttp2_frame *frame, void *user_data)
+ h2_proxy_session *session = user_data;
+ if (APLOGcdebug(session->c)) {
+ char buffer[256];
+ h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03343)
+ "h2_proxy_session(%s): sent FRAME[%s]",
+ session->id, buffer);
+ }
+ return 0;
+static int add_header(void *table, const char *n, const char *v)
+ apr_table_addn(table, n, v);
+ return 1;
+static void process_proxy_header(request_rec *r, const char *n, const char *v)
+ static const struct {
+ const char *name;
+ ap_proxy_header_reverse_map_fn func;
+ } transform_hdrs[] = {
+ { "Location", ap_proxy_location_reverse_map },
+ { "Content-Location", ap_proxy_location_reverse_map },
+ { "URI", ap_proxy_location_reverse_map },
+ { "Destination", ap_proxy_location_reverse_map },
+ { "Set-Cookie", ap_proxy_cookie_reverse_map },
+ { NULL, NULL }
+ };
+ proxy_dir_conf *dconf;
+ int i;
+ for (i = 0; transform_hdrs[i].name; ++i) {
+ if (!strcasecmp(transform_hdrs[i].name, n)) {
+ dconf = ap_get_module_config(r->per_dir_config, &proxy_module);
+ apr_table_add(r->headers_out, n,
+ (*transform_hdrs[i].func)(r, dconf, v));
+ return;
+ }
+ }
+ apr_table_add(r->headers_out, n, v);
+static apr_status_t h2_proxy_stream_add_header_out(h2_proxy_stream *stream,
+ const char *n, apr_size_t nlen,
+ const char *v, apr_size_t vlen)
+ if (n[0] == ':') {
+ if (!stream->data_received && !strncmp(":status", n, nlen)) {
+ char *s = apr_pstrndup(stream->r->pool, v, vlen);
+ apr_table_setn(stream->r->notes, "proxy-status", s);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
+ "h2_proxy_stream(%s-%d): got status %s",
+ stream->session->id, stream->id, s);
+ stream->r->status = (int)apr_atoi64(s);
+ if (stream->r->status <= 0) {
+ stream->r->status = 500;
+ return APR_EGENERAL;
+ }
+ }
+ return APR_SUCCESS;
+ }
+ if (!h2_proxy_res_ignore_header(n, nlen)) {
+ char *hname, *hvalue;
+ hname = apr_pstrndup(stream->pool, n, nlen);
+ h2_util_camel_case_header(hname, nlen);
+ hvalue = apr_pstrndup(stream->pool, v, vlen);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
+ "h2_proxy_stream(%s-%d): got header %s: %s",
+ stream->session->id, stream->id, hname, hvalue);
+ process_proxy_header(stream->r, hname, hvalue);
+ }
+ return APR_SUCCESS;
+static int log_header(void *ctx, const char *key, const char *value)
+ h2_proxy_stream *stream = ctx;
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, stream->r,
+ "h2_proxy_stream(%s-%d), header_out %s: %s",
+ stream->session->id, stream->id, key, value);
+ return 1;
+static void h2_proxy_stream_end_headers_out(h2_proxy_stream *stream)
+ h2_proxy_session *session = stream->session;
+ request_rec *r = stream->r;
+ apr_pool_t *p = r->pool;
+ /* Now, add in the cookies from the response to the ones already saved */
+ apr_table_do(add_header, stream->saves, r->headers_out, "Set-Cookie", NULL);
+ /* and now load 'em all in */
+ if (!apr_is_empty_table(stream->saves)) {
+ apr_table_unset(r->headers_out, "Set-Cookie");
+ r->headers_out = apr_table_overlay(p, r->headers_out, stream->saves);
+ }
+ /* handle Via header in response */
+ if (session->conf->viaopt != via_off
+ && session->conf->viaopt != via_block) {
+ const char *server_name = ap_get_server_name(stream->r);
+ apr_port_t port = ap_get_server_port(stream->r);
+ char portstr[32];
+ /* If USE_CANONICAL_NAME_OFF was configured for the proxy virtual host,
+ * then the server name returned by ap_get_server_name() is the
+ * origin server name (which does make too much sense with Via: headers)
+ * so we use the proxy vhost's name instead.
+ */
+ if (server_name == stream->r->hostname) {
+ server_name = stream->r->server->server_hostname;
+ }
+ if (ap_is_default_port(port, stream->r)) {
+ portstr[0] = '\0';
+ }
+ else {
+ apr_snprintf(portstr, sizeof(portstr), ":%d", port);
+ }
+ /* create a "Via:" response header entry and merge it */
+ apr_table_addn(r->headers_out, "Via",
+ (session->conf->viaopt == via_full)
+ ? apr_psprintf(p, "%d.%d %s%s (%s)",
+ HTTP_VERSION_MAJOR(r->proto_num),
+ HTTP_VERSION_MINOR(r->proto_num),
+ server_name, portstr,
+ : apr_psprintf(p, "%d.%d %s%s",
+ HTTP_VERSION_MAJOR(r->proto_num),
+ HTTP_VERSION_MINOR(r->proto_num),
+ server_name, portstr)
+ );
+ }
+ if (APLOGrtrace2(stream->r)) {
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, stream->r,
+ "h2_proxy_stream(%s-%d), header_out after merging",
+ stream->session->id, stream->id);
+ apr_table_do(log_header, stream, stream->r->headers_out, NULL);
+ }
+static int on_data_chunk_recv(nghttp2_session *ngh2, uint8_t flags,
+ int32_t stream_id, const uint8_t *data,
+ size_t len, void *user_data)
+ h2_proxy_session *session = user_data;
+ h2_proxy_stream *stream;
+ apr_bucket *b;
+ apr_status_t status;
+ stream = nghttp2_session_get_stream_user_data(ngh2, stream_id);
+ if (!stream) {
+ ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, stream->r, APLOGNO(03358)
+ "h2_proxy_session(%s): recv data chunk for "
+ "unknown stream %d, ignored",
+ session->id, stream_id);
+ return 0;
+ }
+ if (!stream->data_received) {
+ /* last chance to manipulate response headers.
+ * after this, only trailers */
+ h2_proxy_stream_end_headers_out(stream);
+ stream->data_received = 1;
+ }
+ b = apr_bucket_transient_create((const char*)data, len,
+ stream->r->connection->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(stream->output, b);
+ if (flags & NGHTTP2_DATA_FLAG_EOF) {
+ b = apr_bucket_flush_create(stream->r->connection->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(stream->output, b);
+ }
+ ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, stream->r, APLOGNO(03359)
+ "h2_proxy_session(%s): pass response data for "
+ "stream %d, %d bytes", session->id, stream_id, (int)len);
+ status = ap_pass_brigade(stream->r->output_filters, stream->output);
+ if (status != APR_SUCCESS) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, APLOGNO(03344)
+ "h2_proxy_session(%s): passing output on stream %d",
+ session->id, stream->id);
+ nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE,
+ stream_id, NGHTTP2_STREAM_CLOSED);
+ }
+ return 0;
+static int on_stream_close(nghttp2_session *ngh2, int32_t stream_id,
+ uint32_t error_code, void *user_data)
+ h2_proxy_session *session = user_data;
+ if (!session->aborted) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03360)
+ "h2_proxy_session(%s): stream=%d, closed, err=%d",
+ session->id, stream_id, error_code);
+ dispatch_event(session, H2_PROXYS_EV_STREAM_DONE, stream_id, NULL);
+ }
+ return 0;
+static int on_header(nghttp2_session *ngh2, const nghttp2_frame *frame,
+ const uint8_t *namearg, size_t nlen,
+ const uint8_t *valuearg, size_t vlen, uint8_t flags,
+ void *user_data)
+ h2_proxy_session *session = user_data;
+ h2_proxy_stream *stream;
+ const char *n = (const char*)namearg;
+ const char *v = (const char*)valuearg;
+ (void)session;
+ if (frame->hd.type == NGHTTP2_HEADERS && nlen) {
+ stream = nghttp2_session_get_stream_user_data(ngh2, frame->hd.stream_id);
+ if (stream) {
+ if (h2_proxy_stream_add_header_out(stream, n, nlen, v, vlen)) {
+ }
+ }
+ }
+ else if (frame->hd.type == NGHTTP2_PUSH_PROMISE) {
+ }
+ return 0;
+static ssize_t stream_data_read(nghttp2_session *ngh2, int32_t stream_id,
+ uint8_t *buf, size_t length,
+ uint32_t *data_flags,
+ nghttp2_data_source *source, void *user_data)
+ h2_proxy_stream *stream;
+ apr_status_t status = APR_SUCCESS;
+ *data_flags = 0;
+ stream = nghttp2_session_get_stream_user_data(ngh2, stream_id);
+ if (!stream) {
+ ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, stream->r, APLOGNO(03361)
+ "h2_proxy_stream(%s): data_read, stream %d not found",
+ stream->session->id, stream_id);
+ }
+ if (APR_BRIGADE_EMPTY(stream->input)) {
+ status = ap_get_brigade(stream->r->input_filters, stream->input,
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, stream->r,
+ "h2_proxy_stream(%s-%d): request body read",
+ stream->session->id, stream->id);
+ }
+ if (status == APR_SUCCESS) {
+ ssize_t readlen = 0;
+ while (status == APR_SUCCESS
+ && (readlen < length)
+ && !APR_BRIGADE_EMPTY(stream->input)) {
+ apr_bucket* b = APR_BRIGADE_FIRST(stream->input);
+ if (APR_BUCKET_IS_EOS(b)) {
+ *data_flags |= NGHTTP2_DATA_FLAG_EOF;
+ }
+ else {
+ /* we do nothing more regarding any meta here */
+ }
+ }
+ else {
+ const char *bdata = NULL;
+ apr_size_t blen = 0;
+ status = apr_bucket_read(b, &bdata, &blen, APR_BLOCK_READ);
+ if (status == APR_SUCCESS && blen > 0) {
+ ssize_t copylen = H2MIN(length - readlen, blen);
+ memcpy(buf, bdata, copylen);
+ buf += copylen;
+ readlen += copylen;
+ if (copylen < blen) {
+ /* We have data left in the bucket. Split it. */
+ status = apr_bucket_split(b, copylen);
+ }
+ }
+ }
+ apr_bucket_delete(b);
+ }
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, stream->r,
+ "h2_proxy_stream(%d): request body read %ld bytes, flags=%d",
+ stream->id, (long)readlen, (int)*data_flags);
+ return readlen;
+ }
+ else if (APR_STATUS_IS_EAGAIN(status)) {
+ /* suspended stream, needs to be re-awakened */
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, stream->r,
+ "h2_proxy_stream(%s-%d): suspending",
+ stream->session->id, stream_id);
+ stream->suspended = 1;
+ h2_iq_add(stream->session->suspended, stream->id, NULL, NULL);
+ }
+ else {
+ nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE,
+ stream_id, NGHTTP2_STREAM_CLOSED);
+ }
+h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn,
+ proxy_server_conf *conf,
+ unsigned char window_bits_connection,
+ unsigned char window_bits_stream,
+ h2_proxy_request_done *done)
+ if (!p_conn->data) {
+ apr_pool_t *pool = p_conn->scpool;
+ h2_proxy_session *session;
+ nghttp2_session_callbacks *cbs;
+ nghttp2_option *option;
+ session = apr_pcalloc(pool, sizeof(*session));
+ apr_pool_pre_cleanup_register(pool, p_conn, proxy_session_pre_close);
+ p_conn->data = session;
+ session->id = apr_pstrdup(p_conn->scpool, id);
+ session->c = p_conn->connection;
+ session->p_conn = p_conn;
+ session->conf = conf;
+ session->pool = p_conn->scpool;
+ session->state = H2_PROXYS_ST_INIT;
+ session->window_bits_stream = window_bits_stream;
+ session->window_bits_connection = window_bits_connection;
+ session->streams = h2_ihash_create(pool, offsetof(h2_proxy_stream, id));
+ session->suspended = h2_iq_create(pool, 5);
+ session->done = done;
+ session->input = apr_brigade_create(session->pool, session->c->bucket_alloc);
+ session->output = apr_brigade_create(session->pool, session->c->bucket_alloc);
+ nghttp2_session_callbacks_new(&cbs);
+ nghttp2_session_callbacks_set_on_frame_recv_callback(cbs, on_frame_recv);
+ nghttp2_session_callbacks_set_on_data_chunk_recv_callback(cbs, on_data_chunk_recv);
+ nghttp2_session_callbacks_set_on_stream_close_callback(cbs, on_stream_close);
+ nghttp2_session_callbacks_set_on_header_callback(cbs, on_header);
+ nghttp2_session_callbacks_set_before_frame_send_callback(cbs, before_frame_send);
+ nghttp2_session_callbacks_set_send_callback(cbs, raw_send);
+ nghttp2_option_new(&option);
+ nghttp2_option_set_peer_max_concurrent_streams(option, 100);
+ nghttp2_option_set_no_auto_window_update(option, 1);
+ nghttp2_session_client_new2(&session->ngh2, cbs, session, option);
+ nghttp2_option_del(option);
+ nghttp2_session_callbacks_del(cbs);
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03362)
+ "setup session for %s", p_conn->hostname);
+ }
+ return p_conn->data;
+static apr_status_t session_start(h2_proxy_session *session)
+ nghttp2_settings_entry settings[2];
+ int rv, add_conn_window;
+ apr_socket_t *s;
+ s = ap_get_conn_socket(session->c);
+ if (s) {
+ ap_sock_disable_nagle(s);
+ }
+ settings[0].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH;
+ settings[0].value = 0;
+ settings[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
+ settings[1].value = (1 << session->window_bits_stream) - 1;
+ rv = nghttp2_submit_settings(session->ngh2, NGHTTP2_FLAG_NONE, settings,
+ H2_ALEN(settings));
+ /* If the connection window is larger than our default, trigger a WINDOW_UPDATE */
+ add_conn_window = ((1 << session->window_bits_connection) - 1 -
+ if (!rv && add_conn_window != 0) {
+ rv = nghttp2_submit_window_update(session->ngh2, NGHTTP2_FLAG_NONE, 0, add_conn_window);
+ }
+static apr_status_t open_stream(h2_proxy_session *session, const char *url,
+ request_rec *r, h2_proxy_stream **pstream)
+ h2_proxy_stream *stream;
+ apr_uri_t puri;
+ const char *authority, *scheme, *path;
+ stream = apr_pcalloc(r->pool, sizeof(*stream));
+ stream->pool = r->pool;
+ stream->url = url;
+ stream->r = r;
+ stream->session = session;
+ stream->state = H2_STREAM_ST_IDLE;
+ stream->input = apr_brigade_create(stream->pool, session->c->bucket_alloc);
+ stream->output = apr_brigade_create(stream->pool, session->c->bucket_alloc);
+ stream->req = h2_request_create(1, stream->pool, 0);
+ apr_uri_parse(stream->pool, url, &puri);
+ scheme = (strcmp(puri.scheme, "h2")? "http" : "https");
+ authority = puri.hostname;
+ if (!ap_strchr_c(authority, ':') && puri.port
+ && apr_uri_port_of_scheme(scheme) != puri.port) {
+ /* port info missing and port is not default for scheme: append */
+ authority = apr_psprintf(stream->pool, "%s:%d", authority, puri.port);
+ }
+ path = apr_uri_unparse(stream->pool, &puri, APR_URI_UNP_OMITSITEPART);
+ h2_request_make(stream->req, stream->pool, r->method, scheme,
+ authority, path, r->headers_in);
+ /* Tuck away all already existing cookies */
+ stream->saves = apr_table_make(r->pool, 2);
+ apr_table_do(add_header, stream->saves, r->headers_out,"Set-Cookie", NULL);
+ *pstream = stream;
+ return APR_SUCCESS;
+static apr_status_t submit_stream(h2_proxy_session *session, h2_proxy_stream *stream)
+ h2_ngheader *hd;
+ nghttp2_data_provider *pp = NULL;
+ nghttp2_data_provider provider;
+ int rv;
+ apr_status_t status;
+ hd = h2_util_ngheader_make_req(stream->pool, stream->req);
+ status = ap_get_brigade(stream->r->input_filters, stream->input,
+ if ((status == APR_SUCCESS && !APR_BUCKET_IS_EOS(APR_BRIGADE_FIRST(stream->input)))
+ || APR_STATUS_IS_EAGAIN(status)) {
+ /* there might be data coming */
+ provider.source.fd = 0;
+ provider.source.ptr = NULL;
+ provider.read_callback = stream_data_read;
+ pp = &provider;
+ }
+ rv = nghttp2_submit_request(session->ngh2, NULL,
+ hd->nv, hd->nvlen, pp, stream);
+ if (APLOGcdebug(session->c)) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03363)
+ "h2_proxy_session(%s): submit %s%s -> %d",
+ session->id, stream->req->authority, stream->req->path,
+ rv);
+ }
+ else {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+ "h2_proxy_session(%s-%d): submit %s%s",
+ session->id, rv, stream->req->authority, stream->req->path);
+ }
+ if (rv > 0) {
+ stream->id = rv;
+ stream->state = H2_STREAM_ST_OPEN;
+ h2_ihash_add(session->streams, stream);
+ dispatch_event(session, H2_PROXYS_EV_STREAM_SUBMITTED, rv, NULL);
+ return APR_SUCCESS;
+ }
+ return APR_EGENERAL;
+static apr_status_t feed_brigade(h2_proxy_session *session, apr_bucket_brigade *bb)
+ apr_status_t status = APR_SUCCESS;
+ apr_size_t readlen = 0;
+ ssize_t n;
+ while (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) {
+ apr_bucket* b = APR_BRIGADE_FIRST(bb);
+ /* nop */
+ }
+ else {
+ const char *bdata = NULL;
+ apr_size_t blen = 0;
+ status = apr_bucket_read(b, &bdata, &blen, APR_BLOCK_READ);
+ if (status == APR_SUCCESS && blen > 0) {
+ n = nghttp2_session_mem_recv(session->ngh2, (const uint8_t *)bdata, blen);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+ "h2_proxy_session(%s): feeding %ld bytes -> %ld",
+ session->id, (long)blen, (long)n);
+ if (n < 0) {
+ if (nghttp2_is_fatal((int)n)) {
+ status = APR_EGENERAL;
+ }
+ }
+ else {
+ readlen += n;
+ if (n < blen) {
+ apr_bucket_split(b, n);
+ }
+ }
+ }
+ }
+ apr_bucket_delete(b);
+ }
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
+ "h2_proxy_session(%s): fed %ld bytes of input to session",
+ session->id, (long)readlen);
+ if (readlen == 0 && status == APR_SUCCESS) {
+ return APR_EAGAIN;
+ }
+ return status;
+static apr_status_t h2_proxy_session_read(h2_proxy_session *session, int block,
+ apr_interval_time_t timeout)
+ apr_status_t status = APR_SUCCESS;
+ if (APR_BRIGADE_EMPTY(session->input)) {
+ apr_socket_t *socket = NULL;
+ apr_time_t save_timeout = -1;
+ if (block) {
+ socket = ap_get_conn_socket(session->c);
+ if (socket) {
+ apr_socket_timeout_get(socket, &save_timeout);
+ apr_socket_timeout_set(socket, timeout);
+ }
+ else {
+ /* cannot block on timeout */
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, session->c, APLOGNO(03379)
+ "h2_proxy_session(%s): unable to get conn socket",
+ session->id);
+ return APR_ENOTIMPL;
+ }
+ }
+ status = ap_get_brigade(session->c->input_filters, session->input,
+ 64 * 1024);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
+ "h2_proxy_session(%s): read from conn", session->id);
+ if (socket && save_timeout != -1) {
+ apr_socket_timeout_set(socket, save_timeout);
+ }
+ }
+ if (status == APR_SUCCESS) {
+ status = feed_brigade(session, session->input);
+ }
+ else if (APR_STATUS_IS_TIMEUP(status)) {
+ /* nop */
+ }
+ else if (!APR_STATUS_IS_EAGAIN(status)) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, APLOGNO(03380)
+ "h2_proxy_session(%s): read error", session->id);
+ dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, status, NULL);
+ }
+ return status;
+apr_status_t h2_proxy_session_submit(h2_proxy_session *session,
+ const char *url, request_rec *r)
+ h2_proxy_stream *stream;
+ apr_status_t status;
+ status = open_stream(session, url, r, &stream);
+ if (status == OK) {
+ ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(03381)
+ "process stream(%d): %s %s%s, original: %s",
+ stream->id, stream->req->method,
+ stream->req->authority, stream->req->path,
+ r->the_request);
+ status = submit_stream(session, stream);
+ }
+ return status;
+static apr_status_t check_suspended(h2_proxy_session *session)
+ h2_proxy_stream *stream;
+ int i, stream_id;
+ apr_status_t status;
+ for (i = 0; i < session->suspended->nelts; ++i) {
+ stream_id = session->suspended->elts[i];
+ stream = nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
+ if (stream) {
+ status = ap_get_brigade(stream->r->input_filters, stream->input,
+ if (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(stream->input)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
+ "h2_proxy_stream(%s-%d): resuming",
+ session->id, stream_id);
+ stream->suspended = 0;
+ h2_iq_remove(session->suspended, stream_id);
+ nghttp2_session_resume_data(session->ngh2, stream_id);
+ dispatch_event(session, H2_PROXYS_EV_STREAM_RESUMED, 0, NULL);
+ check_suspended(session);
+ return APR_SUCCESS;
+ }
+ else if (status != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(status)) {
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, session->c,
+ APLOGNO(03382) "h2_proxy_stream(%s-%d): check input",
+ session->id, stream_id);
+ h2_iq_remove(session->suspended, stream_id);
+ dispatch_event(session, H2_PROXYS_EV_STREAM_RESUMED, 0, NULL);
+ check_suspended(session);
+ return APR_SUCCESS;
+ }
+ }
+ else {
+ /* gone? */
+ h2_iq_remove(session->suspended, stream_id);
+ check_suspended(session);
+ return APR_SUCCESS;
+ }
+ }
+ return APR_EAGAIN;
+static apr_status_t session_shutdown(h2_proxy_session *session, int reason,
+ const char *msg)
+ apr_status_t status = APR_SUCCESS;
+ const char *err = msg;
+ AP_DEBUG_ASSERT(session);
+ if (!err && reason) {
+ err = nghttp2_strerror(reason);
+ }
+ nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, 0,
+ reason, (uint8_t*)err, err? strlen(err):0);
+ status = nghttp2_session_send(session->ngh2);
+ dispatch_event(session, H2_PROXYS_EV_LOCAL_GOAWAY, reason, err);
+ return status;
+static const char *StateNames[] = {
+static const char *state_name(h2_proxys_state state)
+ if (state >= (sizeof(StateNames)/sizeof(StateNames[0]))) {
+ return "unknown";
+ }
+ return StateNames[state];
+static int is_accepting_streams(h2_proxy_session *session)
+ switch (session->state) {
+ return 1;
+ default:
+ return 0;
+ }
+static void transit(h2_proxy_session *session, const char *action,
+ h2_proxys_state nstate)
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03345)
+ "h2_proxy_session(%s): transit [%s] -- %s --> [%s]", session->id,
+ state_name(session->state), action, state_name(nstate));
+ session->state = nstate;
+static void ev_init(h2_proxy_session *session, int arg, const char *msg)
+ switch (session->state) {
+ if (h2_ihash_is_empty(session->streams)) {
+ transit(session, "init", H2_PROXYS_ST_IDLE);
+ }
+ else {
+ transit(session, "init", H2_PROXYS_ST_BUSY);
+ }
+ break;
+ default:
+ /* nop */
+ break;
+ }
+static void ev_local_goaway(h2_proxy_session *session, int arg, const char *msg)
+ switch (session->state) {
+ /* already did that? */
+ break;
+ /* all done */
+ transit(session, "local goaway", H2_PROXYS_ST_DONE);
+ break;
+ default:
+ transit(session, "local goaway", H2_PROXYS_ST_LOCAL_SHUTDOWN);
+ break;
+ }
+static void ev_remote_goaway(h2_proxy_session *session, int arg, const char *msg)
+ switch (session->state) {
+ /* already received that? */
+ break;
+ /* all done */
+ transit(session, "remote goaway", H2_PROXYS_ST_DONE);
+ break;
+ default:
+ transit(session, "remote goaway", H2_PROXYS_ST_REMOTE_SHUTDOWN);
+ break;
+ }
+static void ev_conn_error(h2_proxy_session *session, int arg, const char *msg)
+ switch (session->state) {
+ /* just leave */
+ transit(session, "conn error", H2_PROXYS_ST_DONE);
+ break;
+ default:
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, arg, session->c,
+ "h2_proxy_session(%s): conn error -> shutdown", session->id);
+ session_shutdown(session, arg, msg);
+ break;
+ }
+static void ev_proto_error(h2_proxy_session *session, int arg, const char *msg)
+ switch (session->state) {
+ /* just leave */
+ transit(session, "proto error", H2_PROXYS_ST_DONE);
+ break;
+ default:
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+ "h2_proxy_session(%s): proto error -> shutdown", session->id);
+ session_shutdown(session, arg, msg);
+ break;
+ }
+static void ev_conn_timeout(h2_proxy_session *session, int arg, const char *msg)
+ switch (session->state) {
+ transit(session, "conn timeout", H2_PROXYS_ST_DONE);
+ break;
+ default:
+ session_shutdown(session, arg, msg);
+ transit(session, "conn timeout", H2_PROXYS_ST_DONE);
+ break;
+ }
+static void ev_no_io(h2_proxy_session *session, int arg, const char *msg)
+ switch (session->state) {
+ /* nothing for input and output to do. If we remain
+ * in this state, we go into a tight loop and suck up
+ * CPU cycles. Ideally, we'd like to do a blocking read, but that
+ * is not possible if we have scheduled tasks and wait
+ * for them to produce something. */
+ if (h2_ihash_is_empty(session->streams)) {
+ if (!is_accepting_streams(session)) {
+ /* We are no longer accepting new streams and have
+ * finished processing existing ones. Time to leave. */
+ session_shutdown(session, arg, msg);
+ transit(session, "no io", H2_PROXYS_ST_DONE);
+ }
+ else {
+ /* When we have no streams, no task events are possible,
+ * switch to blocking reads */
+ transit(session, "no io", H2_PROXYS_ST_IDLE);
+ }
+ }
+ else {
+ /* Unable to do blocking reads, as we wait on events from
+ * task processing in other threads. Do a busy wait with
+ * backoff timer. */
+ transit(session, "no io", H2_PROXYS_ST_WAIT);
+ }
+ break;
+ default:
+ /* nop */
+ break;
+ }
+static void ev_stream_submitted(h2_proxy_session *session, int stream_id,
+ const char *msg)
+ switch (session->state) {
+ transit(session, "stream submitted", H2_PROXYS_ST_BUSY);
+ break;
+ default:
+ /* nop */
+ break;
+ }
+static void ev_stream_done(h2_proxy_session *session, int stream_id,
+ const char *msg)
+ h2_proxy_stream *stream;
+ stream = nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
+ if (stream) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03364)
+ "h2_proxy_sesssion(%s): stream(%d) closed",
+ session->id, stream_id);
+ if (!stream->data_received) {
+ apr_bucket *b;
+ /* if the response had no body, this is the time to flush
+ * an empty brigade which will also "write" the resonse
+ * headers */
+ h2_proxy_stream_end_headers_out(stream);
+ stream->data_received = 1;
+ b = apr_bucket_flush_create(stream->r->connection->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(stream->output, b);
+ b = apr_bucket_eos_create(stream->r->connection->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(stream->output, b);
+ ap_pass_brigade(stream->r->output_filters, stream->output);
+ }
+ stream->state = H2_STREAM_ST_CLOSED;
+ h2_ihash_remove(session->streams, stream_id);
+ h2_iq_remove(session->suspended, stream_id);
+ if (session->done) {
+ session->done(session, stream->r, 1, 1);
+ }
+ }
+ switch (session->state) {
+ default:
+ /* nop */
+ break;
+ }
+static void ev_stream_resumed(h2_proxy_session *session, int arg, const char *msg)
+ switch (session->state) {
+ transit(session, "stream resumed", H2_PROXYS_ST_BUSY);
+ break;
+ default:
+ /* nop */
+ break;
+ }
+static void ev_data_read(h2_proxy_session *session, int arg, const char *msg)
+ switch (session->state) {
+ transit(session, "data read", H2_PROXYS_ST_BUSY);
+ break;
+ /* fall through */
+ default:
+ /* nop */
+ break;
+ }
+static void ev_ngh2_done(h2_proxy_session *session, int arg, const char *msg)
+ switch (session->state) {
+ /* nop */
+ break;
+ default:
+ transit(session, "nghttp2 done", H2_PROXYS_ST_DONE);
+ break;
+ }
+static void ev_pre_close(h2_proxy_session *session, int arg, const char *msg)
+ switch (session->state) {
+ /* nop */
+ break;
+ default:
+ session_shutdown(session, arg, msg);
+ break;
+ }
+static void dispatch_event(h2_proxy_session *session, h2_proxys_event_t ev,
+ int arg, const char *msg)
+ switch (ev) {
+ ev_init(session, arg, msg);
+ break;
+ ev_local_goaway(session, arg, msg);
+ break;
+ ev_remote_goaway(session, arg, msg);
+ break;
+ ev_conn_error(session, arg, msg);
+ break;
+ ev_proto_error(session, arg, msg);
+ break;
+ ev_conn_timeout(session, arg, msg);
+ break;
+ case H2_PROXYS_EV_NO_IO:
+ ev_no_io(session, arg, msg);
+ break;
+ ev_stream_submitted(session, arg, msg);
+ break;
+ ev_stream_done(session, arg, msg);
+ break;
+ ev_stream_resumed(session, arg, msg);
+ break;
+ ev_data_read(session, arg, msg);
+ break;
+ ev_ngh2_done(session, arg, msg);
+ break;
+ ev_pre_close(session, arg, msg);
+ break;
+ default:
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+ "h2_proxy_session(%s): unknown event %d",
+ session->id, ev);
+ break;
+ }
+apr_status_t h2_proxy_session_process(h2_proxy_session *session)
+ apr_status_t status;
+ int have_written = 0, have_read = 0;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ "h2_proxy_session(%s): process", session->id);
+ switch (session->state) {
+ status = session_start(session);
+ if (status == APR_SUCCESS) {
+ dispatch_event(session, H2_PROXYS_EV_INIT, 0, NULL);
+ goto run_loop;
+ }
+ else {
+ dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, status, NULL);
+ }
+ break;
+ while (nghttp2_session_want_write(session->ngh2)) {
+ int rv = nghttp2_session_send(session->ngh2);
+ if (rv < 0 && nghttp2_is_fatal(rv)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ "h2_proxy_session(%s): write, rv=%d", session->id, rv);
+ dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, rv, NULL);
+ break;
+ }
+ have_written = 1;
+ }
+ if (nghttp2_session_want_read(session->ngh2)) {
+ status = h2_proxy_session_read(session, 0, 0);
+ if (status == APR_SUCCESS) {
+ have_read = 1;
+ }
+ }
+ if (!have_written && !have_read
+ && !nghttp2_session_want_write(session->ngh2)) {
+ dispatch_event(session, H2_PROXYS_EV_NO_IO, 0, NULL);
+ goto run_loop;
+ }
+ break;
+ if (check_suspended(session) == APR_EAGAIN) {
+ /* no stream has become resumed. Do a blocking read with
+ * ever increasing timeouts... */
+ if (session->wait_timeout < 25) {
+ session->wait_timeout = 25;
+ }
+ else {
+ session->wait_timeout = H2MIN(apr_time_from_msec(100),
+ 2*session->wait_timeout);
+ }
+ status = h2_proxy_session_read(session, 1, session->wait_timeout);
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
+ APLOGNO(03365)
+ "h2_proxy_session(%s): WAIT read, timeout=%fms",
+ session->id, (float)session->wait_timeout/1000.0);
+ if (status == APR_SUCCESS) {
+ have_read = 1;
+ dispatch_event(session, H2_PROXYS_EV_DATA_READ, 0, NULL);
+ }
+ else if (APR_STATUS_IS_TIMEUP(status)
+ || APR_STATUS_IS_EAGAIN(status)) {
+ /* go back to checking all inputs again */
+ transit(session, "wait cycle", H2_PROXYS_ST_BUSY);
+ }
+ }
+ break;
+ break;
+ case H2_PROXYS_ST_DONE: /* done, session terminated */
+ return APR_EOF;
+ default:
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EGENERAL, session->c,
+ APLOGNO(03346)"h2_proxy_session(%s): unknown state %d",
+ session->id, session->state);
+ dispatch_event(session, H2_PROXYS_EV_PROTO_ERROR, 0, NULL);
+ break;
+ }
+ if (have_read || have_written) {
+ session->wait_timeout = 0;
+ }
+ if (!nghttp2_session_want_read(session->ngh2)
+ && !nghttp2_session_want_write(session->ngh2)) {
+ dispatch_event(session, H2_PROXYS_EV_NGH2_DONE, 0, NULL);
+ }
+ return APR_SUCCESS; /* needs to be called again */
+typedef struct {
+ h2_proxy_session *session;
+ h2_proxy_request_done *done;
+} cleanup_iter_ctx;
+static int done_iter(void *udata, void *val)
+ cleanup_iter_ctx *ctx = udata;
+ h2_proxy_stream *stream = val;
+ int touched = (!ctx->session->last_stream_id ||
+ stream->id <= ctx->session->last_stream_id);
+ ctx->done(ctx->session, stream->r, 0, touched);
+ return 1;
+void h2_proxy_session_cleanup(h2_proxy_session *session,
+ h2_proxy_request_done *done)
+ if (session->streams && !h2_ihash_is_empty(session->streams)) {
+ cleanup_iter_ctx ctx;
+ ctx.session = session;
+ ctx.done = done;
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03366)
+ "h2_proxy_session(%s): terminated, %d streams unfinished",
+ session->id, (int)h2_ihash_count(session->streams));
+ h2_ihash_iter(session->streams, done_iter, &ctx);
+ h2_ihash_clear(session->streams);
+ }
+typedef struct {
+ h2_proxy_session *session;
+ conn_rec *c;
+ apr_off_t bytes;
+ int updated;
+} win_update_ctx;
+static int win_update_iter(void *udata, void *val)
+ win_update_ctx *ctx = udata;
+ h2_proxy_stream *stream = val;
+ if (stream->r && stream->r->connection == ctx->c) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, ctx->session->c,
+ "h2_proxy_session(%s-%d): win_update %ld bytes",
+ ctx->session->id, (int)stream->id, (long)ctx->bytes);
+ nghttp2_session_consume(ctx->session->ngh2, stream->id, ctx->bytes);
+ ctx->updated = 1;
+ return 0;
+ }
+ return 1;
+void h2_proxy_session_update_window(h2_proxy_session *session,
+ conn_rec *c, apr_off_t bytes)
+ if (session->streams && !h2_ihash_is_empty(session->streams)) {
+ win_update_ctx ctx;
+ ctx.session = session;
+ ctx.c = c;
+ ctx.bytes = bytes;
+ ctx.updated = 0;
+ h2_ihash_iter(session->streams, win_update_iter, &ctx);
+ if (!ctx.updated) {
+ /* could not find the stream any more, possibly closed, update
+ * the connection window at least */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ "h2_proxy_session(%s): win_update conn %ld bytes",
+ session->id, (long)bytes);
+ nghttp2_session_consume_connection(session->ngh2, (size_t)bytes);
+ }
+ }
--- /dev/null
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <nghttp2/nghttp2.h>
+#include <httpd.h>
+#include <mod_proxy.h>
+#include "mod_http2.h"
+#include "mod_proxy_http2.h"
+#include "h2_int_queue.h"
+#include "h2_request.h"
+#include "h2_util.h"
+#include "h2_version.h"
+#include "h2_proxy_session.h"
+static void register_hook(apr_pool_t *p);
+AP_DECLARE_MODULE(proxy_http2) = {
+ NULL, /* create per-directory config structure */
+ NULL, /* merge per-directory config structures */
+ NULL, /* create per-server config structure */
+ NULL, /* merge per-server config structures */
+ NULL, /* command apr_table_t */
+ register_hook /* register hooks */
+/* Optional functions from mod_http2 */
+static int (*is_h2)(conn_rec *c);
+static apr_status_t (*req_engine_push)(const char *name, request_rec *r,
+ http2_req_engine_init *einit);
+static apr_status_t (*req_engine_pull)(h2_req_engine *engine,
+ apr_read_type_e block,
+ apr_uint32_t capacity,
+ request_rec **pr);
+static void (*req_engine_done)(h2_req_engine *engine, conn_rec *r_conn);
+typedef struct h2_proxy_ctx {
+ conn_rec *owner;
+ apr_pool_t *pool;
+ request_rec *rbase;
+ server_rec *server;
+ const char *proxy_func;
+ char server_portstr[32];
+ proxy_conn_rec *p_conn;
+ proxy_worker *worker;
+ proxy_server_conf *conf;
+ h2_req_engine *engine;
+ const char *engine_id;
+ const char *engine_type;
+ apr_pool_t *engine_pool;
+ apr_uint32_t req_buffer_size;
+ request_rec *next;
+ apr_size_t capacity;
+ unsigned standalone : 1;
+ unsigned is_ssl : 1;
+ unsigned flushall : 1;
+ apr_status_t r_status; /* status of our first request work */
+ h2_proxy_session *session; /* current http2 session against backend */
+} h2_proxy_ctx;
+static int h2_proxy_post_config(apr_pool_t *p, apr_pool_t *plog,
+ apr_pool_t *ptemp, server_rec *s)
+ void *data = NULL;
+ const char *init_key = "mod_proxy_http2_init_counter";
+ nghttp2_info *ngh2;
+ apr_status_t status = APR_SUCCESS;
+ (void)plog;(void)ptemp;
+ apr_pool_userdata_get(&data, init_key, s->process->pool);
+ if ( data == NULL ) {
+ apr_pool_userdata_set((const void *)1, init_key,
+ apr_pool_cleanup_null, s->process->pool);
+ return APR_SUCCESS;
+ }
+ ngh2 = nghttp2_version(0);
+ ap_log_error( APLOG_MARK, APLOG_INFO, 0, s, APLOGNO(03349)
+ "mod_proxy_http2 (v%s, nghttp2 %s), initializing...",
+ MOD_HTTP2_VERSION, ngh2? ngh2->version_str : "unknown");
+ is_h2 = APR_RETRIEVE_OPTIONAL_FN(http2_is_h2);
+ req_engine_push = APR_RETRIEVE_OPTIONAL_FN(http2_req_engine_push);
+ req_engine_pull = APR_RETRIEVE_OPTIONAL_FN(http2_req_engine_pull);
+ req_engine_done = APR_RETRIEVE_OPTIONAL_FN(http2_req_engine_done);
+ /* we need all of them */
+ if (!req_engine_push || !req_engine_pull || !req_engine_done) {
+ req_engine_push = NULL;
+ req_engine_pull = NULL;
+ req_engine_done = NULL;
+ }
+ return status;
+ * canonicalize the url into the request, if it is meant for us.
+ * slightly modified copy from mod_http
+ */
+static int proxy_http2_canon(request_rec *r, char *url)
+ char *host, *path, sport[7];
+ char *search = NULL;
+ const char *err;
+ const char *scheme;
+ const char *http_scheme;
+ apr_port_t port, def_port;
+ /* ap_port_of_scheme() */
+ if (strncasecmp(url, "h2c:", 4) == 0) {
+ url += 4;
+ scheme = "h2c";
+ http_scheme = "http";
+ }
+ else if (strncasecmp(url, "h2:", 3) == 0) {
+ url += 3;
+ scheme = "h2";
+ http_scheme = "https";
+ }
+ else {
+ return DECLINED;
+ }
+ port = def_port = ap_proxy_port_of_scheme(http_scheme);
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r,
+ "HTTP2: canonicalising URL %s", url);
+ /* do syntatic check.
+ * We break the URL into host, port, path, search
+ */
+ err = ap_proxy_canon_netloc(r->pool, &url, NULL, NULL, &host, &port);
+ if (err) {
+ ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(03350)
+ "error parsing URL %s: %s", url, err);
+ }
+ /*
+ * now parse path/search args, according to rfc1738:
+ * process the path.
+ *
+ * In a reverse proxy, our URL has been processed, so canonicalise
+ * unless proxy-nocanon is set to say it's raw
+ * In a forward proxy, we have and MUST NOT MANGLE the original.
+ */
+ switch (r->proxyreq) {
+ default: /* wtf are we doing here? */
+ if (apr_table_get(r->notes, "proxy-nocanon")) {
+ path = url; /* this is the raw path */
+ }
+ else {
+ path = ap_proxy_canonenc(r->pool, url, strlen(url),
+ enc_path, 0, r->proxyreq);
+ search = r->args;
+ }
+ break;
+ path = url;
+ break;
+ }
+ if (path == NULL) {
+ }
+ if (port != def_port) {
+ apr_snprintf(sport, sizeof(sport), ":%d", port);
+ }
+ else {
+ sport[0] = '\0';
+ }
+ if (ap_strchr_c(host, ':')) { /* if literal IPv6 address */
+ host = apr_pstrcat(r->pool, "[", host, "]", NULL);
+ }
+ r->filename = apr_pstrcat(r->pool, "proxy:", scheme, "://", host, sport,
+ "/", path, (search) ? "?" : "", (search) ? search : "", NULL);
+ return OK;
+static void out_consumed(void *baton, conn_rec *c, apr_off_t bytes)
+ h2_proxy_ctx *ctx = baton;
+ if (ctx->session) {
+ h2_proxy_session_update_window(ctx->session, c, bytes);
+ }
+static apr_status_t proxy_engine_init(h2_req_engine *engine,
+ const char *id,
+ const char *type,
+ apr_pool_t *pool,
+ apr_uint32_t req_buffer_size,
+ request_rec *r,
+ http2_output_consumed **pconsumed,
+ void **pctx)
+ h2_proxy_ctx *ctx = ap_get_module_config(r->connection->conn_config,
+ &proxy_http2_module);
+ if (ctx) {
+ conn_rec *c = ctx->owner;
+ h2_proxy_ctx *nctx;
+ /* we need another lifetime for this. If we do not host
+ * an engine, the context lives in r->pool. Since we expect
+ * to server more than r, we need to live longer */
+ nctx = apr_pcalloc(pool, sizeof(*nctx));
+ if (nctx == NULL) {
+ return APR_ENOMEM;
+ }
+ memcpy(nctx, ctx, sizeof(*nctx));
+ ctx = nctx;
+ ctx->pool = pool;
+ ctx->engine = engine;
+ ctx->engine_id = id;
+ ctx->engine_type = type;
+ ctx->engine_pool = pool;
+ ctx->req_buffer_size = req_buffer_size;
+ ctx->capacity = 100;
+ ap_set_module_config(c->conn_config, &proxy_http2_module, ctx);
+ *pconsumed = out_consumed;
+ *pctx = ctx;
+ return APR_SUCCESS;
+ }
+ ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, r, APLOGNO(03368)
+ "h2_proxy_session, engine init, no ctx found");
+ return APR_ENOTIMPL;
+static apr_status_t add_request(h2_proxy_session *session, request_rec *r)
+ h2_proxy_ctx *ctx = session->user_data;
+ const char *url;
+ apr_status_t status;
+ url = apr_table_get(r->notes, H2_PROXY_REQ_URL_NOTE);
+ apr_table_setn(r->notes, "proxy-source-port", apr_psprintf(r->pool, "%hu",
+ ctx->p_conn->connection->local_addr->port));
+ status = h2_proxy_session_submit(session, url, r);
+ if (status != OK) {
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, status, r->connection, APLOGNO(03351)
+ "pass request body failed to %pI (%s) from %s (%s)",
+ ctx->p_conn->addr, ctx->p_conn->hostname ?
+ ctx->p_conn->hostname: "", session->c->client_ip,
+ session->c->remote_host ? session->c->remote_host: "");
+ }
+ return status;
+static void request_done(h2_proxy_session *session, request_rec *r,
+ int complete, int touched)
+ h2_proxy_ctx *ctx = session->user_data;
+ const char *task_id = apr_table_get(r->connection->notes, H2_TASK_ID_NOTE);
+ if (!complete && !touched) {
+ /* untouched request, need rescheduling */
+ if (req_engine_push && is_h2 && is_h2(ctx->owner)) {
+ if (req_engine_push(ctx->engine_type, r, NULL) == APR_SUCCESS) {
+ /* push to engine */
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r->connection,
+ APLOGNO(03369)
+ "h2_proxy_session(%s): rescheduled request %s",
+ ctx->engine_id, task_id);
+ return;
+ }
+ }
+ }
+ if (r == ctx->rbase && complete) {
+ ctx->r_status = APR_SUCCESS;
+ }
+ if (complete) {
+ if (req_engine_done && ctx->engine) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r->connection,
+ APLOGNO(03370)
+ "h2_proxy_session(%s): finished request %s",
+ ctx->engine_id, task_id);
+ req_engine_done(ctx->engine, r->connection);
+ }
+ }
+ else {
+ if (req_engine_done && ctx->engine) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r->connection,
+ APLOGNO(03371)
+ "h2_proxy_session(%s): failed request %s",
+ ctx->engine_id, task_id);
+ req_engine_done(ctx->engine, r->connection);
+ }
+ }
+static apr_status_t next_request(h2_proxy_ctx *ctx, int before_leave)
+ if (ctx->next) {
+ return APR_SUCCESS;
+ }
+ else if (req_engine_pull && ctx->engine) {
+ apr_status_t status;
+ status = req_engine_pull(ctx->engine, before_leave?
+ ctx->capacity, &ctx->next);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, ctx->owner,
+ "h2_proxy_engine(%s): pulled request %s",
+ ctx->engine_id,
+ (ctx->next? ctx->next->the_request : "NULL"));
+ return APR_STATUS_IS_EAGAIN(status)? APR_SUCCESS : status;
+ }
+ return APR_EOF;
+static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) {
+ apr_status_t status = OK;
+ /* Step Four: Send the Request in a new HTTP/2 stream and
+ * loop until we got the response or encounter errors.
+ */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, ctx->owner,
+ "eng(%s): setup session", ctx->engine_id);
+ ctx->session = h2_proxy_session_setup(ctx->engine_id, ctx->p_conn, ctx->conf,
+ 30, h2_log2(ctx->req_buffer_size),
+ request_done);
+ if (!ctx->session) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner,
+ APLOGNO(03372) "session unavailable");
+ }
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner, APLOGNO(03373)
+ "eng(%s): run session %s", ctx->engine_id, ctx->session->id);
+ ctx->session->user_data = ctx;
+ while (1) {
+ if (ctx->next) {
+ add_request(ctx->session, ctx->next);
+ ctx->next = NULL;
+ }
+ status = h2_proxy_session_process(ctx->session);
+ if (status == APR_SUCCESS) {
+ apr_status_t s2;
+ /* ongoing processing, call again */
+ if (ctx->session->remote_max_concurrent > 0
+ && ctx->session->remote_max_concurrent != ctx->capacity) {
+ ctx->capacity = ctx->session->remote_max_concurrent;
+ }
+ s2 = next_request(ctx, 0);
+ if (s2 == APR_ECONNABORTED) {
+ /* master connection gone */
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, s2, ctx->owner,
+ APLOGNO(03374) "eng(%s): pull request",
+ ctx->engine_id);
+ status = s2;
+ break;
+ }
+ if (!ctx->next && h2_ihash_is_empty(ctx->session->streams)) {
+ break;
+ }
+ }
+ else {
+ /* end of processing, maybe error */
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, ctx->owner,
+ APLOGNO(03375) "eng(%s): end of session run",
+ ctx->engine_id);
+ /*
+ * Any open stream of that session needs to
+ * a) be reopened on the new session iff safe to do so
+ * b) reported as done (failed) otherwise
+ */
+ h2_proxy_session_cleanup(ctx->session, request_done);
+ break;
+ }
+ }
+ ctx->session->user_data = NULL;
+ ctx->session = NULL;
+ return status;
+static h2_proxy_ctx *push_request_somewhere(h2_proxy_ctx *ctx)
+ conn_rec *c = ctx->owner;
+ const char *engine_type, *hostname;
+ hostname = (ctx->p_conn->ssl_hostname?
+ ctx->p_conn->ssl_hostname : ctx->p_conn->hostname);
+ engine_type = apr_psprintf(ctx->pool, "proxy_http2 %s%s", hostname,
+ ctx->server_portstr);
+ if (c->master && req_engine_push && ctx->next && is_h2 && is_h2(c)) {
+ /* If we are have req_engine capabilities, push the handling of this
+ * request (e.g. slave connection) to a proxy_http2 engine which
+ * uses the same backend. We may be called to create an engine
+ * ourself. */
+ if (req_engine_push(engine_type, ctx->next, proxy_engine_init)
+ /* to renew the lifetime, we might have set a new ctx */
+ ctx = ap_get_module_config(c->conn_config, &proxy_http2_module);
+ if (ctx->engine == NULL) {
+ /* Another engine instance has taken over processing of this
+ * request. */
+ ctx->r_status = SUSPENDED;
+ ctx->next = NULL;
+ return ctx;
+ }
+ }
+ }
+ if (!ctx->engine) {
+ /* No engine was available or has been initialized, handle this
+ * request just by ourself. */
+ ctx->engine_id = apr_psprintf(ctx->pool, "eng-proxy-%ld", c->id);
+ ctx->engine_type = engine_type;
+ ctx->engine_pool = ctx->pool;
+ ctx->req_buffer_size = (32*1024);
+ ctx->standalone = 1;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+ "h2_proxy_http2(%ld): setup standalone engine for type %s",
+ c->id, engine_type);
+ }
+ else {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+ "H2: hosting engine %s", ctx->engine_id);
+ }
+ return ctx;
+static int proxy_http2_handler(request_rec *r,
+ proxy_worker *worker,
+ proxy_server_conf *conf,
+ char *url,
+ const char *proxyname,
+ apr_port_t proxyport)
+ const char *proxy_func;
+ char *locurl = url, *u;
+ apr_size_t slen;
+ int is_ssl = 0;
+ apr_status_t status;
+ h2_proxy_ctx *ctx;
+ apr_uri_t uri;
+ int reconnected = 0;
+ /* find the scheme */
+ if ((url[0] != 'h' && url[0] != 'H') || url[1] != '2') {
+ return DECLINED;
+ }
+ u = strchr(url, ':');
+ if (u == NULL || u[1] != '/' || u[2] != '/' || u[3] == '\0') {
+ return DECLINED;
+ }
+ slen = (u - url);
+ switch(slen) {
+ case 2:
+ proxy_func = "H2";
+ is_ssl = 1;
+ break;
+ case 3:
+ if (url[2] != 'c' && url[2] != 'C') {
+ return DECLINED;
+ }
+ proxy_func = "H2C";
+ break;
+ default:
+ return DECLINED;
+ }
+ ctx = apr_pcalloc(r->pool, sizeof(*ctx));
+ ctx->owner = r->connection;
+ ctx->pool = r->pool;
+ ctx->rbase = r;
+ ctx->server = r->server;
+ ctx->proxy_func = proxy_func;
+ ctx->is_ssl = is_ssl;
+ ctx->worker = worker;
+ ctx->conf = conf;
+ ctx->flushall = apr_table_get(r->subprocess_env, "proxy-flushall")? 1 : 0;
+ ctx->next = r;
+ r = NULL;
+ ap_set_module_config(ctx->owner->conn_config, &proxy_http2_module, ctx);
+ /* scheme says, this is for us. */
+ apr_table_setn(ctx->rbase->notes, H2_PROXY_REQ_URL_NOTE, url);
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, ctx->rbase,
+ "H2: serving URL %s", url);
+ /* Get a proxy_conn_rec from the worker, might be a new one, might
+ * be one still open from another request, or it might fail if the
+ * worker is stopped or in error. */
+ if ((status = ap_proxy_acquire_connection(ctx->proxy_func, &ctx->p_conn,
+ ctx->worker, ctx->server)) != OK) {
+ goto cleanup;
+ }
+ ctx->p_conn->is_ssl = ctx->is_ssl;
+ if (ctx->is_ssl) {
+ /* If there is still some data on an existing ssl connection, now
+ * would be a good timne to get rid of it. */
+ ap_proxy_ssl_connection_cleanup(ctx->p_conn, ctx->rbase);
+ }
+ /* Step One: Determine the URL to connect to (might be a proxy),
+ * initialize the backend accordingly and determine the server
+ * port string we can expect in responses. */
+ if ((status = ap_proxy_determine_connection(ctx->pool, ctx->rbase, conf, worker,
+ ctx->p_conn, &uri, &locurl,
+ proxyname, proxyport,
+ ctx->server_portstr,
+ sizeof(ctx->server_portstr))) != OK) {
+ goto cleanup;
+ }
+ /* If we are not already hosting an engine, try to push the request
+ * to an already existing engine or host a new engine here. */
+ if (!ctx->engine) {
+ ctx = push_request_somewhere(ctx);
+ if (ctx->r_status == SUSPENDED) {
+ /* request was pushed to another engine */
+ goto cleanup;
+ }
+ }
+ /* Step Two: Make the Connection (or check that an already existing
+ * socket is still usable). On success, we have a socket connected to
+ * backend->hostname. */
+ if (ap_proxy_connect_backend(ctx->proxy_func, ctx->p_conn, ctx->worker,
+ ctx->server)) {
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, ctx->owner, APLOGNO(03352)
+ "H2: failed to make connection to backend: %s",
+ ctx->p_conn->hostname);
+ goto cleanup;
+ }
+ /* Step Three: Create conn_rec for the socket we have open now. */
+ if (!ctx->p_conn->connection) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, ctx->owner, APLOGNO(03353)
+ "setup new connection: is_ssl=%d %s %s %s",
+ ctx->p_conn->is_ssl, ctx->p_conn->ssl_hostname,
+ locurl, ctx->p_conn->hostname);
+ if ((status = ap_proxy_connection_create(ctx->proxy_func, ctx->p_conn,
+ ctx->owner,
+ ctx->server)) != OK) {
+ goto cleanup;
+ }
+ /*
+ * On SSL connections set a note on the connection what CN is
+ * requested, such that mod_ssl can check if it is requested to do
+ * so.
+ */
+ if (ctx->p_conn->ssl_hostname) {
+ apr_table_setn(ctx->p_conn->connection->notes,
+ "proxy-request-hostname", ctx->p_conn->ssl_hostname);
+ }
+ if (ctx->is_ssl) {
+ apr_table_setn(ctx->p_conn->connection->notes,
+ "proxy-request-alpn-protos", "h2");
+ }
+ }
+ status = proxy_engine_run(ctx);
+ if (status == APR_SUCCESS) {
+ /* session and connection still ok */
+ if (next_request(ctx, 1) == APR_SUCCESS) {
+ /* more requests, run again */
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner, APLOGNO(03376)
+ "run_session, again");
+ goto run_session;
+ }
+ /* done */
+ ctx->engine = NULL;
+ }
+ if (!reconnected && ctx->engine && next_request(ctx, 1) == APR_SUCCESS) {
+ /* Still more to do, tear down old conn and start over */
+ if (ctx->p_conn) {
+ ctx->p_conn->close = 1;
+ /*proxy_run_detach_backend(r, ctx->p_conn);*/
+ ap_proxy_release_connection(ctx->proxy_func, ctx->p_conn, ctx->server);
+ ctx->p_conn = NULL;
+ }
+ reconnected = 1; /* we do this only once, then fail */
+ goto run_connect;
+ }
+ if (ctx->p_conn) {
+ if (status != APR_SUCCESS) {
+ /* close socket when errors happened or session shut down (EOF) */
+ ctx->p_conn->close = 1;
+ }
+/* proxy_run_detach_backend(ctx->rbase, ctx->p_conn);*/
+ ap_proxy_release_connection(ctx->proxy_func, ctx->p_conn, ctx->server);
+ ctx->p_conn = NULL;
+ }
+ ap_set_module_config(ctx->owner->conn_config, &proxy_http2_module, NULL);
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, ctx->owner,
+ APLOGNO(03377) "leaving handler");
+ return ctx->r_status;
+static void register_hook(apr_pool_t *p)
+ ap_hook_post_config(h2_proxy_post_config, NULL, NULL, APR_HOOK_MIDDLE);
+ proxy_hook_scheme_handler(proxy_http2_handler, NULL, NULL, APR_HOOK_FIRST);
+ proxy_hook_canon_handler(proxy_http2_canon, NULL, NULL, APR_HOOK_FIRST);