]> granicus.if.org Git - apache/blob - modules/http2/h2_proxy_session.c
0312ff83ad6895b363e30fcf99180b66294dfb64
[apache] / modules / http2 / h2_proxy_session.c
1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2  * contributor license agreements.  See the NOTICE file distributed with
3  * this work for additional information regarding copyright ownership.
4  * The ASF licenses this file to You under the Apache License, Version 2.0
5  * (the "License"); you may not use this file except in compliance with
6  * the License.  You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16  
17 /* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
18  *
19  * Licensed under the Apache License, Version 2.0 (the "License");
20  * you may not use this file except in compliance with the License.
21  * You may obtain a copy of the License at
22  *
23  * http://www.apache.org/licenses/LICENSE-2.0
24  
25  * Unless required by applicable law or agreed to in writing, software
26  * distributed under the License is distributed on an "AS IS" BASIS,
27  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
28  * See the License for the specific language governing permissions and
29  * limitations under the License.
30  */
31
32 #include <stddef.h>
33 #include <apr_strings.h>
34 #include <nghttp2/nghttp2.h>
35
36 #include <mpm_common.h>
37 #include <httpd.h>
38 #include <mod_proxy.h>
39
40 #include "mod_http2.h"
41 #include "h2.h"
42 #include "h2_proxy_util.h"
43 #include "h2_proxy_session.h"
44
45 APLOG_USE_MODULE(proxy_http2);
46
47 typedef struct h2_proxy_stream {
48     int id;
49     apr_pool_t *pool;
50     h2_proxy_session *session;
51
52     const char *url;
53     request_rec *r;
54     h2_proxy_request *req;
55     const char *real_server_uri;
56     const char *p_server_uri;
57     int standalone;
58
59     h2_proxy_stream_state_t state;
60     unsigned int suspended : 1;
61     unsigned int waiting_on_100 : 1;
62     unsigned int waiting_on_ping : 1;
63     uint32_t error_code;
64
65     apr_bucket_brigade *input;
66     apr_off_t data_sent;
67     apr_bucket_brigade *output;
68     apr_off_t data_received;
69     
70     apr_table_t *saves;
71 } h2_proxy_stream;
72
73
74 static void dispatch_event(h2_proxy_session *session, h2_proxys_event_t ev, 
75                            int arg, const char *msg);
76 static void ping_arrived(h2_proxy_session *session);
77 static apr_status_t check_suspended(h2_proxy_session *session);
78 static void stream_resume(h2_proxy_stream *stream);
79
80
81 static apr_status_t proxy_session_pre_close(void *theconn)
82 {
83     proxy_conn_rec *p_conn = (proxy_conn_rec *)theconn;
84     h2_proxy_session *session = p_conn->data;
85
86     if (session && session->ngh2) {
87         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, 
88                       "proxy_session(%s): pool cleanup, state=%d, streams=%d",
89                       session->id, session->state, 
90                       (int)h2_proxy_ihash_count(session->streams));
91         session->aborted = 1;
92         dispatch_event(session, H2_PROXYS_EV_PRE_CLOSE, 0, NULL);
93         nghttp2_session_del(session->ngh2);
94         session->ngh2 = NULL;
95         p_conn->data = NULL;
96     }
97     return APR_SUCCESS;
98 }
99
100 static int proxy_pass_brigade(apr_bucket_alloc_t *bucket_alloc,
101                               proxy_conn_rec *p_conn,
102                               conn_rec *origin, apr_bucket_brigade *bb,
103                               int flush)
104 {
105     apr_status_t status;
106     apr_off_t transferred;
107
108     if (flush) {
109         apr_bucket *e = apr_bucket_flush_create(bucket_alloc);
110         APR_BRIGADE_INSERT_TAIL(bb, e);
111     }
112     apr_brigade_length(bb, 0, &transferred);
113     if (transferred != -1)
114         p_conn->worker->s->transferred += transferred;
115     status = ap_pass_brigade(origin->output_filters, bb);
116     /* Cleanup the brigade now to avoid buckets lifetime
117      * issues in case of error returned below. */
118     apr_brigade_cleanup(bb);
119     if (status != APR_SUCCESS) {
120         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, origin, APLOGNO(03357)
121                       "pass output failed to %pI (%s)",
122                       p_conn->addr, p_conn->hostname);
123     }
124     return status;
125 }
126
127 static ssize_t raw_send(nghttp2_session *ngh2, const uint8_t *data,
128                         size_t length, int flags, void *user_data)
129 {
130     h2_proxy_session *session = user_data;
131     apr_bucket *b;
132     apr_status_t status;
133     int flush = 1;
134
135     if (data) {
136         b = apr_bucket_transient_create((const char*)data, length, 
137                                         session->c->bucket_alloc);
138         APR_BRIGADE_INSERT_TAIL(session->output, b);
139     }
140
141     status = proxy_pass_brigade(session->c->bucket_alloc,  
142                                 session->p_conn, session->c, 
143                                 session->output, flush);
144     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c, 
145                   "h2_proxy_sesssion(%s): raw_send %d bytes, flush=%d", 
146                   session->id, (int)length, flush);
147     if (status != APR_SUCCESS) {
148         return NGHTTP2_ERR_CALLBACK_FAILURE;
149     }
150     return length;
151 }
152
153 static int on_frame_recv(nghttp2_session *ngh2, const nghttp2_frame *frame,
154                          void *user_data) 
155 {
156     h2_proxy_session *session = user_data;
157     h2_proxy_stream *stream;
158     request_rec *r;
159     int n;
160     
161     if (APLOGcdebug(session->c)) {
162         char buffer[256];
163         
164         h2_proxy_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
165         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03341)
166                       "h2_proxy_session(%s): recv FRAME[%s]",
167                       session->id, buffer);
168     }
169
170     session->last_frame_received = apr_time_now();
171     switch (frame->hd.type) {
172         case NGHTTP2_HEADERS:
173             stream = nghttp2_session_get_stream_user_data(ngh2, frame->hd.stream_id);
174             if (!stream) {
175                 return NGHTTP2_ERR_CALLBACK_FAILURE;
176             }
177             r = stream->r;
178             if (r->status >= 100 && r->status < 200) {
179                 /* By default, we will forward all interim responses when
180                  * we are sitting on a HTTP/2 connection to the client */
181                 int forward = session->h2_front;
182                 switch(r->status) {
183                     case 100:
184                         if (stream->waiting_on_100) {
185                             stream->waiting_on_100 = 0;
186                             r->status_line = ap_get_status_line(r->status);
187                             forward = 1;
188                         } 
189                         break;
190                     case 103:
191                         /* workaround until we get this into http protocol base
192                          * parts. without this, unknown codes are converted to
193                          * 500... */
194                         r->status_line = "103 Early Hints";
195                         break;
196                     default:
197                         r->status_line = ap_get_status_line(r->status);
198                         break;
199                 }
200                 ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(03487) 
201                               "h2_proxy_session(%s): got interim HEADERS, "
202                               "status=%d, will forward=%d",
203                               session->id, r->status, forward);
204                 if (forward) {
205                     ap_send_interim_response(r, 1);
206                 }
207             }
208             stream_resume(stream);
209             break;
210         case NGHTTP2_PING:
211             if (session->check_ping) {
212                 session->check_ping = 0;
213                 ping_arrived(session);
214             }
215             break;
216         case NGHTTP2_PUSH_PROMISE:
217             break;
218         case NGHTTP2_SETTINGS:
219             if (frame->settings.niv > 0) {
220                 n = nghttp2_session_get_remote_settings(ngh2, NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS);
221                 if (n > 0) {
222                     session->remote_max_concurrent = n;
223                 }
224             }
225             break;
226         case NGHTTP2_GOAWAY:
227             /* we expect the remote server to tell us the highest stream id
228              * that it has started processing. */
229             session->last_stream_id = frame->goaway.last_stream_id;
230             dispatch_event(session, H2_PROXYS_EV_REMOTE_GOAWAY, 0, NULL);
231             break;
232         default:
233             break;
234     }
235     return 0;
236 }
237
238 static int before_frame_send(nghttp2_session *ngh2,
239                              const nghttp2_frame *frame, void *user_data)
240 {
241     h2_proxy_session *session = user_data;
242     if (APLOGcdebug(session->c)) {
243         char buffer[256];
244
245         h2_proxy_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
246         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03343)
247                       "h2_proxy_session(%s): sent FRAME[%s]",
248                       session->id, buffer);
249     }
250     return 0;
251 }
252
253 static int add_header(void *table, const char *n, const char *v)
254 {
255     apr_table_addn(table, n, v);
256     return 1;
257 }
258
259 static void process_proxy_header(h2_proxy_stream *stream, const char *n, const char *v)
260 {
261     static const struct {
262         const char *name;
263         ap_proxy_header_reverse_map_fn func;
264     } transform_hdrs[] = {
265         { "Location", ap_proxy_location_reverse_map },
266         { "Content-Location", ap_proxy_location_reverse_map },
267         { "URI", ap_proxy_location_reverse_map },
268         { "Destination", ap_proxy_location_reverse_map },
269         { "Set-Cookie", ap_proxy_cookie_reverse_map },
270         { NULL, NULL }
271     };
272     request_rec *r = stream->r;
273     proxy_dir_conf *dconf;
274     int i;
275     
276     dconf = ap_get_module_config(r->per_dir_config, &proxy_module);
277     if (!dconf->preserve_host) {
278         for (i = 0; transform_hdrs[i].name; ++i) {
279             if (!ap_cstr_casecmp(transform_hdrs[i].name, n)) {
280                 apr_table_add(r->headers_out, n,
281                               (*transform_hdrs[i].func)(r, dconf, v));
282                 return;
283             }
284         }
285         if (!ap_cstr_casecmp("Link", n)) {
286             dconf = ap_get_module_config(r->per_dir_config, &proxy_module);
287             apr_table_add(r->headers_out, n,
288                           h2_proxy_link_reverse_map(r, dconf, 
289                                                     stream->real_server_uri, stream->p_server_uri, v));
290             return;
291         }
292     }
293     apr_table_add(r->headers_out, n, v);
294 }
295
296 static apr_status_t h2_proxy_stream_add_header_out(h2_proxy_stream *stream,
297                                                    const char *n, apr_size_t nlen,
298                                                    const char *v, apr_size_t vlen)
299 {
300     if (n[0] == ':') {
301         if (!stream->data_received && !strncmp(":status", n, nlen)) {
302             char *s = apr_pstrndup(stream->r->pool, v, vlen);
303             
304             apr_table_setn(stream->r->notes, "proxy-status", s);
305             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, 
306                           "h2_proxy_stream(%s-%d): got status %s", 
307                           stream->session->id, stream->id, s);
308             stream->r->status = (int)apr_atoi64(s);
309             if (stream->r->status <= 0) {
310                 stream->r->status = 500;
311                 return APR_EGENERAL;
312             }
313         }
314         return APR_SUCCESS;
315     }
316     
317     if (!h2_proxy_res_ignore_header(n, nlen)) {
318         char *hname, *hvalue;
319     
320         hname = apr_pstrndup(stream->pool, n, nlen);
321         h2_proxy_util_camel_case_header(hname, nlen);
322         hvalue = apr_pstrndup(stream->pool, v, vlen);
323         
324         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, 
325                       "h2_proxy_stream(%s-%d): got header %s: %s", 
326                       stream->session->id, stream->id, hname, hvalue);
327         process_proxy_header(stream, hname, hvalue);
328     }
329     return APR_SUCCESS;
330 }
331
332 static int log_header(void *ctx, const char *key, const char *value)
333 {
334     h2_proxy_stream *stream = ctx;
335     ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, stream->r, 
336                   "h2_proxy_stream(%s-%d), header_out %s: %s", 
337                   stream->session->id, stream->id, key, value);
338     return 1;
339 }
340
341 static void h2_proxy_stream_end_headers_out(h2_proxy_stream *stream) 
342 {
343     h2_proxy_session *session = stream->session;
344     request_rec *r = stream->r;
345     apr_pool_t *p = r->pool;
346     
347     /* Now, add in the cookies from the response to the ones already saved */
348     apr_table_do(add_header, stream->saves, r->headers_out, "Set-Cookie", NULL);
349     
350     /* and now load 'em all in */
351     if (!apr_is_empty_table(stream->saves)) {
352         apr_table_unset(r->headers_out, "Set-Cookie");
353         r->headers_out = apr_table_overlay(p, r->headers_out, stream->saves);
354     }
355     
356     /* handle Via header in response */
357     if (session->conf->viaopt != via_off 
358         && session->conf->viaopt != via_block) {
359         const char *server_name = ap_get_server_name(stream->r);
360         apr_port_t port = ap_get_server_port(stream->r);
361         char portstr[32];
362         
363         /* If USE_CANONICAL_NAME_OFF was configured for the proxy virtual host,
364          * then the server name returned by ap_get_server_name() is the
365          * origin server name (which doesn't make sense with Via: headers)
366          * so we use the proxy vhost's name instead.
367          */
368         if (server_name == stream->r->hostname) {
369             server_name = stream->r->server->server_hostname;
370         }
371         if (ap_is_default_port(port, stream->r)) {
372             portstr[0] = '\0';
373         }
374         else {
375             apr_snprintf(portstr, sizeof(portstr), ":%d", port);
376         }
377
378         /* create a "Via:" response header entry and merge it */
379         apr_table_addn(r->headers_out, "Via",
380                        (session->conf->viaopt == via_full)
381                        ? apr_psprintf(p, "%d.%d %s%s (%s)",
382                                       HTTP_VERSION_MAJOR(r->proto_num),
383                                       HTTP_VERSION_MINOR(r->proto_num),
384                                       server_name, portstr,
385                                       AP_SERVER_BASEVERSION)
386                        : apr_psprintf(p, "%d.%d %s%s",
387                                       HTTP_VERSION_MAJOR(r->proto_num),
388                                       HTTP_VERSION_MINOR(r->proto_num),
389                                       server_name, portstr)
390                        );
391     }
392     
393     if (APLOGrtrace2(stream->r)) {
394         ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, stream->r, 
395                       "h2_proxy_stream(%s-%d), header_out after merging", 
396                       stream->session->id, stream->id);
397         apr_table_do(log_header, stream, stream->r->headers_out, NULL);
398     }
399 }
400
401 static int stream_response_data(nghttp2_session *ngh2, uint8_t flags,
402                                 int32_t stream_id, const uint8_t *data,
403                                 size_t len, void *user_data) 
404 {
405     h2_proxy_session *session = user_data;
406     h2_proxy_stream *stream;
407     apr_bucket *b;
408     apr_status_t status;
409     
410     stream = nghttp2_session_get_stream_user_data(ngh2, stream_id);
411     if (!stream) {
412         ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(03358)
413                      "h2_proxy_session(%s): recv data chunk for "
414                      "unknown stream %d, ignored", 
415                      session->id, stream_id);
416         return 0;
417     }
418     
419     if (!stream->data_received) {
420         /* last chance to manipulate response headers.
421          * after this, only trailers */
422         h2_proxy_stream_end_headers_out(stream);
423     }
424     stream->data_received += len;
425     
426     b = apr_bucket_transient_create((const char*)data, len, 
427                                     stream->r->connection->bucket_alloc);
428     APR_BRIGADE_INSERT_TAIL(stream->output, b);
429     /* always flush after a DATA frame, as we have no other indication
430      * of buffer use */
431     b = apr_bucket_flush_create(stream->r->connection->bucket_alloc);
432     APR_BRIGADE_INSERT_TAIL(stream->output, b);
433     
434     status = ap_pass_brigade(stream->r->output_filters, stream->output);
435     ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, stream->r, APLOGNO(03359)
436                   "h2_proxy_session(%s): stream=%d, response DATA %ld, %ld"
437                   " total", session->id, stream_id, (long)len,
438                   (long)stream->data_received);
439     if (status != APR_SUCCESS) {
440         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, APLOGNO(03344)
441                       "h2_proxy_session(%s): passing output on stream %d", 
442                       session->id, stream->id);
443         nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE,
444                                   stream_id, NGHTTP2_STREAM_CLOSED);
445         return NGHTTP2_ERR_STREAM_CLOSING;
446     }
447     if (stream->standalone) {
448         nghttp2_session_consume(ngh2, stream_id, len);
449         ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, stream->r,
450                       "h2_proxy_session(%s): stream %d, win_update %d bytes",
451                       session->id, stream_id, (int)len);
452     }
453     return 0;
454 }
455
456 static int on_stream_close(nghttp2_session *ngh2, int32_t stream_id,
457                            uint32_t error_code, void *user_data) 
458 {
459     h2_proxy_session *session = user_data;
460     h2_proxy_stream *stream;
461     if (!session->aborted) {
462         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03360)
463                       "h2_proxy_session(%s): stream=%d, closed, err=%d", 
464                       session->id, stream_id, error_code);
465         stream = h2_proxy_ihash_get(session->streams, stream_id);
466         if (stream) {
467             stream->error_code = error_code;
468         }
469         dispatch_event(session, H2_PROXYS_EV_STREAM_DONE, stream_id, NULL);
470     }
471     return 0;
472 }
473
474 static int on_header(nghttp2_session *ngh2, const nghttp2_frame *frame,
475                      const uint8_t *namearg, size_t nlen,
476                      const uint8_t *valuearg, size_t vlen, uint8_t flags,
477                      void *user_data) 
478 {
479     h2_proxy_session *session = user_data;
480     h2_proxy_stream *stream;
481     const char *n = (const char*)namearg;
482     const char *v = (const char*)valuearg;
483     
484     (void)session;
485     if (frame->hd.type == NGHTTP2_HEADERS && nlen) {
486         stream = nghttp2_session_get_stream_user_data(ngh2, frame->hd.stream_id);
487         if (stream) {
488             if (h2_proxy_stream_add_header_out(stream, n, nlen, v, vlen)) {
489                 return NGHTTP2_ERR_CALLBACK_FAILURE;
490             }
491         }
492     }
493     else if (frame->hd.type == NGHTTP2_PUSH_PROMISE) {
494     }
495     
496     return 0;
497 }
498
499 static ssize_t stream_request_data(nghttp2_session *ngh2, int32_t stream_id, 
500                                    uint8_t *buf, size_t length,
501                                    uint32_t *data_flags, 
502                                    nghttp2_data_source *source, void *user_data)
503 {
504     h2_proxy_stream *stream;
505     apr_status_t status = APR_SUCCESS;
506     
507     *data_flags = 0;
508     stream = nghttp2_session_get_stream_user_data(ngh2, stream_id);
509     if (!stream) {
510         ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(03361)
511                      "h2_proxy_stream(%s): data_read, stream %d not found", 
512                      stream->session->id, stream_id);
513         return NGHTTP2_ERR_CALLBACK_FAILURE;
514     }
515     
516     if (stream->session->check_ping) {
517         /* suspend until we hear from the other side */
518         stream->waiting_on_ping = 1;
519         status = APR_EAGAIN;
520     }
521     else if (stream->r->expecting_100) {
522         /* suspend until the answer comes */
523         stream->waiting_on_100 = 1;
524         status = APR_EAGAIN;
525     }
526     else if (APR_BRIGADE_EMPTY(stream->input)) {
527         status = ap_get_brigade(stream->r->input_filters, stream->input,
528                                 AP_MODE_READBYTES, APR_NONBLOCK_READ,
529                                 H2MAX(APR_BUCKET_BUFF_SIZE, length));
530         ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, stream->r, 
531                       "h2_proxy_stream(%s-%d): request body read", 
532                       stream->session->id, stream->id);
533     }
534
535     if (status == APR_SUCCESS) {
536         ssize_t readlen = 0;
537         while (status == APR_SUCCESS 
538                && (readlen < length)
539                && !APR_BRIGADE_EMPTY(stream->input)) {
540             apr_bucket* b = APR_BRIGADE_FIRST(stream->input);
541             if (APR_BUCKET_IS_METADATA(b)) {
542                 if (APR_BUCKET_IS_EOS(b)) {
543                     *data_flags |= NGHTTP2_DATA_FLAG_EOF;
544                 }
545                 else {
546                     /* we do nothing more regarding any meta here */
547                 }
548             }
549             else {
550                 const char *bdata = NULL;
551                 apr_size_t blen = 0;
552                 status = apr_bucket_read(b, &bdata, &blen, APR_BLOCK_READ);
553                 
554                 if (status == APR_SUCCESS && blen > 0) {
555                     ssize_t copylen = H2MIN(length - readlen, blen);
556                     memcpy(buf, bdata, copylen);
557                     buf += copylen;
558                     readlen += copylen;
559                     if (copylen < blen) {
560                         /* We have data left in the bucket. Split it. */
561                         status = apr_bucket_split(b, copylen);
562                     }
563                 }
564             }
565             apr_bucket_delete(b);
566         }
567
568         stream->data_sent += readlen;
569         ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, stream->r, APLOGNO(03468) 
570                       "h2_proxy_stream(%d): request DATA %ld, %ld"
571                       " total, flags=%d", 
572                       stream->id, (long)readlen, (long)stream->data_sent,
573                       (int)*data_flags);
574         return readlen;
575     }
576     else if (APR_STATUS_IS_EAGAIN(status)) {
577         /* suspended stream, needs to be re-awakened */
578         ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, stream->r, 
579                       "h2_proxy_stream(%s-%d): suspending", 
580                       stream->session->id, stream_id);
581         stream->suspended = 1;
582         h2_proxy_iq_add(stream->session->suspended, stream->id, NULL, NULL);
583         return NGHTTP2_ERR_DEFERRED;
584     }
585     else {
586         nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE, 
587                                   stream_id, NGHTTP2_STREAM_CLOSED);
588         return NGHTTP2_ERR_STREAM_CLOSING;
589     }
590 }
591
592 #ifdef H2_NG2_INVALID_HEADER_CB
593 static int on_invalid_header_cb(nghttp2_session *ngh2, 
594                                 const nghttp2_frame *frame, 
595                                 const uint8_t *name, size_t namelen, 
596                                 const uint8_t *value, size_t valuelen, 
597                                 uint8_t flags, void *user_data)
598 {
599     h2_proxy_session *session = user_data;
600     if (APLOGcdebug(session->c)) {
601         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03469)
602                       "h2_proxy_session(%s-%d): denying stream with invalid header "
603                       "'%s: %s'", session->id, (int)frame->hd.stream_id,
604                       apr_pstrndup(session->pool, (const char *)name, namelen),
605                       apr_pstrndup(session->pool, (const char *)value, valuelen));
606     }
607     return nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
608                                      frame->hd.stream_id, 
609                                      NGHTTP2_PROTOCOL_ERROR);
610 }
611 #endif
612
613 h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn,
614                                          proxy_server_conf *conf,
615                                          int h2_front, 
616                                          unsigned char window_bits_connection,
617                                          unsigned char window_bits_stream,
618                                          h2_proxy_request_done *done)
619 {
620     if (!p_conn->data) {
621         apr_pool_t *pool = p_conn->scpool;
622         h2_proxy_session *session;
623         nghttp2_session_callbacks *cbs;
624         nghttp2_option *option;
625
626         session = apr_pcalloc(pool, sizeof(*session));
627         apr_pool_pre_cleanup_register(pool, p_conn, proxy_session_pre_close);
628         p_conn->data = session;
629         
630         session->id = apr_pstrdup(p_conn->scpool, id);
631         session->c = p_conn->connection;
632         session->p_conn = p_conn;
633         session->conf = conf;
634         session->pool = p_conn->scpool;
635         session->state = H2_PROXYS_ST_INIT;
636         session->h2_front = h2_front;
637         session->window_bits_stream = window_bits_stream;
638         session->window_bits_connection = window_bits_connection;
639         session->streams = h2_proxy_ihash_create(pool, offsetof(h2_proxy_stream, id));
640         session->suspended = h2_proxy_iq_create(pool, 5);
641         session->done = done;
642     
643         session->input = apr_brigade_create(session->pool, session->c->bucket_alloc);
644         session->output = apr_brigade_create(session->pool, session->c->bucket_alloc);
645     
646         nghttp2_session_callbacks_new(&cbs);
647         nghttp2_session_callbacks_set_on_frame_recv_callback(cbs, on_frame_recv);
648         nghttp2_session_callbacks_set_on_data_chunk_recv_callback(cbs, stream_response_data);
649         nghttp2_session_callbacks_set_on_stream_close_callback(cbs, on_stream_close);
650         nghttp2_session_callbacks_set_on_header_callback(cbs, on_header);
651         nghttp2_session_callbacks_set_before_frame_send_callback(cbs, before_frame_send);
652         nghttp2_session_callbacks_set_send_callback(cbs, raw_send);
653 #ifdef H2_NG2_INVALID_HEADER_CB
654         nghttp2_session_callbacks_set_on_invalid_header_callback(cbs, on_invalid_header_cb);
655 #endif
656         
657         nghttp2_option_new(&option);
658         nghttp2_option_set_peer_max_concurrent_streams(option, 100);
659         nghttp2_option_set_no_auto_window_update(option, 1);
660         
661         nghttp2_session_client_new2(&session->ngh2, cbs, session, option);
662         
663         nghttp2_option_del(option);
664         nghttp2_session_callbacks_del(cbs);
665
666         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03362)
667                       "setup session for %s", p_conn->hostname);
668     }
669     else {
670         h2_proxy_session *session = p_conn->data;
671         apr_interval_time_t age = apr_time_now() - session->last_frame_received;
672         if (age > apr_time_from_sec(1)) {
673             session->check_ping = 1;
674             nghttp2_submit_ping(session->ngh2, 0, (const uint8_t *)"nevergonnagiveyouup");
675         }
676     }
677     return p_conn->data;
678 }
679
680 static apr_status_t session_start(h2_proxy_session *session) 
681 {
682     nghttp2_settings_entry settings[2];
683     int rv, add_conn_window;
684     apr_socket_t *s;
685     
686     s = ap_get_conn_socket(session->c);
687 #if (!defined(WIN32) && !defined(NETWARE)) || defined(DOXYGEN)
688     if (s) {
689         ap_sock_disable_nagle(s);
690     }
691 #endif
692     
693     settings[0].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH;
694     settings[0].value = 0;
695     settings[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
696     settings[1].value = (1 << session->window_bits_stream) - 1;
697     
698     rv = nghttp2_submit_settings(session->ngh2, NGHTTP2_FLAG_NONE, settings, 
699                                  H2_ALEN(settings));
700     
701     /* If the connection window is larger than our default, trigger a WINDOW_UPDATE */
702     add_conn_window = ((1 << session->window_bits_connection) - 1 -
703                        NGHTTP2_INITIAL_CONNECTION_WINDOW_SIZE);
704     if (!rv && add_conn_window != 0) {
705         rv = nghttp2_submit_window_update(session->ngh2, NGHTTP2_FLAG_NONE, 0, add_conn_window);
706     }
707     return rv? APR_EGENERAL : APR_SUCCESS;
708 }
709
710 static apr_status_t open_stream(h2_proxy_session *session, const char *url,
711                                 request_rec *r, int standalone,
712                                 h2_proxy_stream **pstream)
713 {
714     h2_proxy_stream *stream;
715     apr_uri_t puri;
716     const char *authority, *scheme, *path;
717     apr_status_t status;
718     proxy_dir_conf *dconf;
719
720     stream = apr_pcalloc(r->pool, sizeof(*stream));
721
722     stream->pool = r->pool;
723     stream->url = url;
724     stream->r = r;
725     stream->standalone = standalone;
726     stream->session = session;
727     stream->state = H2_STREAM_ST_IDLE;
728     
729     stream->input = apr_brigade_create(stream->pool, session->c->bucket_alloc);
730     stream->output = apr_brigade_create(stream->pool, session->c->bucket_alloc);
731     
732     stream->req = h2_proxy_req_create(1, stream->pool, 0);
733
734     status = apr_uri_parse(stream->pool, url, &puri);
735     if (status != APR_SUCCESS)
736         return status;
737     
738     scheme = (strcmp(puri.scheme, "h2")? "http" : "https");
739     
740     dconf = ap_get_module_config(r->per_dir_config, &proxy_module);
741     if (dconf->preserve_host) {
742         authority = r->hostname;
743     }
744     else {
745         authority = puri.hostname;
746         if (!ap_strchr_c(authority, ':') && puri.port
747             && apr_uri_port_of_scheme(scheme) != puri.port) {
748             /* port info missing and port is not default for scheme: append */
749             authority = apr_psprintf(stream->pool, "%s:%d", authority, puri.port);
750         }
751     }
752     
753     /* we need this for mapping relative uris in headers ("Link") back
754      * to local uris */
755     stream->real_server_uri = apr_psprintf(stream->pool, "%s://%s", scheme, authority); 
756     stream->p_server_uri = apr_psprintf(stream->pool, "%s://%s", puri.scheme, authority); 
757     path = apr_uri_unparse(stream->pool, &puri, APR_URI_UNP_OMITSITEPART);
758     h2_proxy_req_make(stream->req, stream->pool, r->method, scheme,
759                 authority, path, r->headers_in);
760
761     if (dconf->add_forwarded_headers) {
762         if (PROXYREQ_REVERSE == r->proxyreq) {
763             const char *buf;
764
765             /* Add X-Forwarded-For: so that the upstream has a chance to
766              * determine, where the original request came from.
767              */
768             apr_table_mergen(stream->req->headers, "X-Forwarded-For",
769                              r->useragent_ip);
770
771             /* Add X-Forwarded-Host: so that upstream knows what the
772              * original request hostname was.
773              */
774             if ((buf = apr_table_get(r->headers_in, "Host"))) {
775                 apr_table_mergen(stream->req->headers, "X-Forwarded-Host", buf);
776             }
777
778             /* Add X-Forwarded-Server: so that upstream knows what the
779              * name of this proxy server is (if there are more than one)
780              * XXX: This duplicates Via: - do we strictly need it?
781              */
782             apr_table_mergen(stream->req->headers, "X-Forwarded-Server",
783                              r->server->server_hostname);
784         }
785     }
786     
787     /* Tuck away all already existing cookies */
788     stream->saves = apr_table_make(r->pool, 2);
789     apr_table_do(add_header, stream->saves, r->headers_out, "Set-Cookie", NULL);
790
791     *pstream = stream;
792     
793     return APR_SUCCESS;
794 }
795
796 static apr_status_t submit_stream(h2_proxy_session *session, h2_proxy_stream *stream)
797 {
798     h2_proxy_ngheader *hd;
799     nghttp2_data_provider *pp = NULL;
800     nghttp2_data_provider provider;
801     int rv, may_have_request_body = 1;
802     apr_status_t status;
803
804     hd = h2_proxy_util_nghd_make_req(stream->pool, stream->req);
805     
806     /* If we expect a 100-continue response, we must refrain from reading
807        any input until we get it. Reading the input will possibly trigger
808        HTTP_IN filter to generate the 100-continue itself. */
809     if (stream->waiting_on_100 || stream->waiting_on_ping) {
810         /* make a small test if we get an EOF/EOS immediately */
811         status = ap_get_brigade(stream->r->input_filters, stream->input,
812                                 AP_MODE_READBYTES, APR_NONBLOCK_READ,
813                                 APR_BUCKET_BUFF_SIZE);
814         may_have_request_body = APR_STATUS_IS_EAGAIN(status)
815                                 || (status == APR_SUCCESS 
816                                     && !APR_BUCKET_IS_EOS(APR_BRIGADE_FIRST(stream->input)));
817     }
818     
819     if (may_have_request_body) {
820         provider.source.fd = 0;
821         provider.source.ptr = NULL;
822         provider.read_callback = stream_request_data;
823         pp = &provider;
824     }
825
826     rv = nghttp2_submit_request(session->ngh2, NULL, 
827                                 hd->nv, hd->nvlen, pp, stream);
828                                 
829     ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03363)
830                   "h2_proxy_session(%s): submit %s%s -> %d", 
831                   session->id, stream->req->authority, stream->req->path,
832                   rv);
833     if (rv > 0) {
834         stream->id = rv;
835         stream->state = H2_STREAM_ST_OPEN;
836         h2_proxy_ihash_add(session->streams, stream);
837         dispatch_event(session, H2_PROXYS_EV_STREAM_SUBMITTED, rv, NULL);
838         
839         return APR_SUCCESS;
840     }
841     return APR_EGENERAL;
842 }
843
844 static apr_status_t feed_brigade(h2_proxy_session *session, apr_bucket_brigade *bb)
845 {
846     apr_status_t status = APR_SUCCESS;
847     apr_size_t readlen = 0;
848     ssize_t n;
849     
850     while (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) {
851         apr_bucket* b = APR_BRIGADE_FIRST(bb);
852         
853         if (APR_BUCKET_IS_METADATA(b)) {
854             /* nop */
855         }
856         else {
857             const char *bdata = NULL;
858             apr_size_t blen = 0;
859             
860             status = apr_bucket_read(b, &bdata, &blen, APR_BLOCK_READ);
861             if (status == APR_SUCCESS && blen > 0) {
862                 n = nghttp2_session_mem_recv(session->ngh2, (const uint8_t *)bdata, blen);
863                 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, 
864                               "h2_proxy_session(%s): feeding %ld bytes -> %ld", 
865                               session->id, (long)blen, (long)n);
866                 if (n < 0) {
867                     if (nghttp2_is_fatal((int)n)) {
868                         status = APR_EGENERAL;
869                     }
870                 }
871                 else {
872                     readlen += n;
873                     if (n < blen) {
874                         apr_bucket_split(b, n);
875                     }
876                 }
877             }
878         }
879         apr_bucket_delete(b);
880     }
881     
882     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c, 
883                   "h2_proxy_session(%s): fed %ld bytes of input to session", 
884                   session->id, (long)readlen);
885     if (readlen == 0 && status == APR_SUCCESS) {
886         return APR_EAGAIN;
887     }
888     return status;
889 }
890
891 static apr_status_t h2_proxy_session_read(h2_proxy_session *session, int block, 
892                                           apr_interval_time_t timeout)
893 {
894     apr_status_t status = APR_SUCCESS;
895     
896     if (APR_BRIGADE_EMPTY(session->input)) {
897         apr_socket_t *socket = NULL;
898         apr_time_t save_timeout = -1;
899         
900         if (block) {
901             socket = ap_get_conn_socket(session->c);
902             if (socket) {
903                 apr_socket_timeout_get(socket, &save_timeout);
904                 apr_socket_timeout_set(socket, timeout);
905             }
906             else {
907                 /* cannot block on timeout */
908                 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, session->c, APLOGNO(03379)
909                               "h2_proxy_session(%s): unable to get conn socket", 
910                               session->id);
911                 return APR_ENOTIMPL;
912             }
913         }
914         
915         status = ap_get_brigade(session->c->input_filters, session->input, 
916                                 AP_MODE_READBYTES, 
917                                 block? APR_BLOCK_READ : APR_NONBLOCK_READ, 
918                                 64 * 1024);
919         ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, session->c, 
920                       "h2_proxy_session(%s): read from conn", session->id);
921         if (socket && save_timeout != -1) {
922             apr_socket_timeout_set(socket, save_timeout);
923         }
924     }
925     
926     if (status == APR_SUCCESS) {
927         status = feed_brigade(session, session->input);
928     }
929     else if (APR_STATUS_IS_TIMEUP(status)) {
930         /* nop */
931     }
932     else if (!APR_STATUS_IS_EAGAIN(status)) {
933         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, APLOGNO(03380)
934                       "h2_proxy_session(%s): read error", session->id);
935         dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, status, NULL);
936     }
937
938     return status;
939 }
940
941 apr_status_t h2_proxy_session_submit(h2_proxy_session *session, 
942                                      const char *url, request_rec *r,
943                                      int standalone)
944 {
945     h2_proxy_stream *stream;
946     apr_status_t status;
947     
948     status = open_stream(session, url, r, standalone, &stream);
949     if (status == APR_SUCCESS) {
950         ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(03381)
951                       "process stream(%d): %s %s%s, original: %s", 
952                       stream->id, stream->req->method, 
953                       stream->req->authority, stream->req->path, 
954                       r->the_request);
955         status = submit_stream(session, stream);
956     }
957     return status;
958 }
959
960 static void stream_resume(h2_proxy_stream *stream)
961 {
962     h2_proxy_session *session = stream->session;
963     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, 
964                   "h2_proxy_stream(%s-%d): resuming", 
965                   session->id, stream->id);
966     stream->suspended = 0;
967     h2_proxy_iq_remove(session->suspended, stream->id);
968     nghttp2_session_resume_data(session->ngh2, stream->id);
969     dispatch_event(session, H2_PROXYS_EV_STREAM_RESUMED, 0, NULL);
970 }
971
972 static apr_status_t check_suspended(h2_proxy_session *session)
973 {
974     h2_proxy_stream *stream;
975     int i, stream_id;
976     apr_status_t status;
977     
978     for (i = 0; i < session->suspended->nelts; ++i) {
979         stream_id = session->suspended->elts[i];
980         stream = nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
981         if (stream) {
982             if (stream->waiting_on_100 || stream->waiting_on_ping) {
983                 status = APR_EAGAIN;
984             }
985             else {
986                 status = ap_get_brigade(stream->r->input_filters, stream->input,
987                                         AP_MODE_READBYTES, APR_NONBLOCK_READ,
988                                         APR_BUCKET_BUFF_SIZE);
989             }
990             if (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(stream->input)) {
991                 stream_resume(stream);
992                 check_suspended(session);
993                 return APR_SUCCESS;
994             }
995             else if (status != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(status)) {
996                 ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, session->c, 
997                               APLOGNO(03382) "h2_proxy_stream(%s-%d): check input", 
998                               session->id, stream_id);
999                 stream_resume(stream);
1000                 check_suspended(session);
1001                 return APR_SUCCESS;
1002             }
1003         }
1004         else {
1005             /* gone? */
1006             h2_proxy_iq_remove(session->suspended, stream_id);
1007             check_suspended(session);
1008             return APR_SUCCESS;
1009         }
1010     }
1011     return APR_EAGAIN;
1012 }
1013
1014 static apr_status_t session_shutdown(h2_proxy_session *session, int reason, 
1015                                      const char *msg)
1016 {
1017     apr_status_t status = APR_SUCCESS;
1018     const char *err = msg;
1019     
1020     ap_assert(session);
1021     if (!err && reason) {
1022         err = nghttp2_strerror(reason);
1023     }
1024     nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, 0, 
1025                           reason, (uint8_t*)err, err? strlen(err):0);
1026     status = nghttp2_session_send(session->ngh2);
1027     dispatch_event(session, H2_PROXYS_EV_LOCAL_GOAWAY, reason, err);
1028     return status;
1029 }
1030
1031
1032 static const char *StateNames[] = {
1033     "INIT",      /* H2_PROXYS_ST_INIT */
1034     "DONE",      /* H2_PROXYS_ST_DONE */
1035     "IDLE",      /* H2_PROXYS_ST_IDLE */
1036     "BUSY",      /* H2_PROXYS_ST_BUSY */
1037     "WAIT",      /* H2_PROXYS_ST_WAIT */
1038     "LSHUTDOWN", /* H2_PROXYS_ST_LOCAL_SHUTDOWN */
1039     "RSHUTDOWN", /* H2_PROXYS_ST_REMOTE_SHUTDOWN */
1040 };
1041
1042 static const char *state_name(h2_proxys_state state)
1043 {
1044     if (state >= (sizeof(StateNames)/sizeof(StateNames[0]))) {
1045         return "unknown";
1046     }
1047     return StateNames[state];
1048 }
1049
1050 static int is_accepting_streams(h2_proxy_session *session)
1051 {
1052     switch (session->state) {
1053         case H2_PROXYS_ST_IDLE:
1054         case H2_PROXYS_ST_BUSY:
1055         case H2_PROXYS_ST_WAIT:
1056             return 1;
1057         default:
1058             return 0;
1059     }
1060 }
1061
1062 static void transit(h2_proxy_session *session, const char *action, 
1063                     h2_proxys_state nstate)
1064 {
1065     ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03345)
1066                   "h2_proxy_session(%s): transit [%s] -- %s --> [%s]", session->id,
1067                   state_name(session->state), action, state_name(nstate));
1068     session->state = nstate;
1069 }
1070
1071 static void ev_init(h2_proxy_session *session, int arg, const char *msg)
1072 {
1073     switch (session->state) {
1074         case H2_PROXYS_ST_INIT:
1075             if (h2_proxy_ihash_empty(session->streams)) {
1076                 transit(session, "init", H2_PROXYS_ST_IDLE);
1077             }
1078             else {
1079                 transit(session, "init", H2_PROXYS_ST_BUSY);
1080             }
1081             break;
1082
1083         default:
1084             /* nop */
1085             break;
1086     }
1087 }
1088
1089 static void ev_local_goaway(h2_proxy_session *session, int arg, const char *msg)
1090 {
1091     switch (session->state) {
1092         case H2_PROXYS_ST_LOCAL_SHUTDOWN:
1093             /* already did that? */
1094             break;
1095         case H2_PROXYS_ST_IDLE:
1096         case H2_PROXYS_ST_REMOTE_SHUTDOWN:
1097             /* all done */
1098             transit(session, "local goaway", H2_PROXYS_ST_DONE);
1099             break;
1100         default:
1101             transit(session, "local goaway", H2_PROXYS_ST_LOCAL_SHUTDOWN);
1102             break;
1103     }
1104 }
1105
1106 static void ev_remote_goaway(h2_proxy_session *session, int arg, const char *msg)
1107 {
1108     switch (session->state) {
1109         case H2_PROXYS_ST_REMOTE_SHUTDOWN:
1110             /* already received that? */
1111             break;
1112         case H2_PROXYS_ST_IDLE:
1113         case H2_PROXYS_ST_LOCAL_SHUTDOWN:
1114             /* all done */
1115             transit(session, "remote goaway", H2_PROXYS_ST_DONE);
1116             break;
1117         default:
1118             transit(session, "remote goaway", H2_PROXYS_ST_REMOTE_SHUTDOWN);
1119             break;
1120     }
1121 }
1122
1123 static void ev_conn_error(h2_proxy_session *session, int arg, const char *msg)
1124 {
1125     switch (session->state) {
1126         case H2_PROXYS_ST_INIT:
1127         case H2_PROXYS_ST_DONE:
1128         case H2_PROXYS_ST_LOCAL_SHUTDOWN:
1129             /* just leave */
1130             transit(session, "conn error", H2_PROXYS_ST_DONE);
1131             break;
1132         
1133         default:
1134             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, arg, session->c,
1135                           "h2_proxy_session(%s): conn error -> shutdown", session->id);
1136             session_shutdown(session, arg, msg);
1137             break;
1138     }
1139 }
1140
1141 static void ev_proto_error(h2_proxy_session *session, int arg, const char *msg)
1142 {
1143     switch (session->state) {
1144         case H2_PROXYS_ST_DONE:
1145         case H2_PROXYS_ST_LOCAL_SHUTDOWN:
1146             /* just leave */
1147             transit(session, "proto error", H2_PROXYS_ST_DONE);
1148             break;
1149         
1150         default:
1151             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
1152                           "h2_proxy_session(%s): proto error -> shutdown", session->id);
1153             session_shutdown(session, arg, msg);
1154             break;
1155     }
1156 }
1157
1158 static void ev_conn_timeout(h2_proxy_session *session, int arg, const char *msg)
1159 {
1160     switch (session->state) {
1161         case H2_PROXYS_ST_LOCAL_SHUTDOWN:
1162             transit(session, "conn timeout", H2_PROXYS_ST_DONE);
1163             break;
1164         default:
1165             session_shutdown(session, arg, msg);
1166             transit(session, "conn timeout", H2_PROXYS_ST_DONE);
1167             break;
1168     }
1169 }
1170
1171 static void ev_no_io(h2_proxy_session *session, int arg, const char *msg)
1172 {
1173     switch (session->state) {
1174         case H2_PROXYS_ST_BUSY:
1175         case H2_PROXYS_ST_LOCAL_SHUTDOWN:
1176         case H2_PROXYS_ST_REMOTE_SHUTDOWN:
1177             /* nothing for input and output to do. If we remain
1178              * in this state, we go into a tight loop and suck up
1179              * CPU cycles. Ideally, we'd like to do a blocking read, but that
1180              * is not possible if we have scheduled tasks and wait
1181              * for them to produce something. */
1182             if (h2_proxy_ihash_empty(session->streams)) {
1183                 if (!is_accepting_streams(session)) {
1184                     /* We are no longer accepting new streams and have
1185                      * finished processing existing ones. Time to leave. */
1186                     session_shutdown(session, arg, msg);
1187                     transit(session, "no io", H2_PROXYS_ST_DONE);
1188                 }
1189                 else {
1190                     /* When we have no streams, no task events are possible,
1191                      * switch to blocking reads */
1192                     transit(session, "no io", H2_PROXYS_ST_IDLE);
1193                 }
1194             }
1195             else {
1196                 /* Unable to do blocking reads, as we wait on events from
1197                  * task processing in other threads. Do a busy wait with
1198                  * backoff timer. */
1199                 transit(session, "no io", H2_PROXYS_ST_WAIT);
1200             }
1201             break;
1202         default:
1203             /* nop */
1204             break;
1205     }
1206 }
1207
1208 static void ev_stream_submitted(h2_proxy_session *session, int stream_id, 
1209                                 const char *msg)
1210 {
1211     switch (session->state) {
1212         case H2_PROXYS_ST_IDLE:
1213         case H2_PROXYS_ST_WAIT:
1214             transit(session, "stream submitted", H2_PROXYS_ST_BUSY);
1215             break;
1216         default:
1217             /* nop */
1218             break;
1219     }
1220 }
1221
1222 static void ev_stream_done(h2_proxy_session *session, int stream_id, 
1223                            const char *msg)
1224 {
1225     h2_proxy_stream *stream;
1226     
1227     stream = nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
1228     if (stream) {
1229         int touched = (stream->data_sent || 
1230                        stream_id <= session->last_stream_id);
1231         apr_status_t status = (stream->error_code == 0)? APR_SUCCESS : APR_EINVAL;
1232         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03364)
1233                       "h2_proxy_sesssion(%s): stream(%d) closed "
1234                       "(touched=%d, error=%d)", 
1235                       session->id, stream_id, touched, stream->error_code);
1236         
1237         if (status != APR_SUCCESS) {
1238             stream->r->status = 500;
1239         }
1240         else if (!stream->data_received) {
1241             apr_bucket *b;
1242             /* if the response had no body, this is the time to flush
1243              * an empty brigade which will also write the resonse
1244              * headers */
1245             h2_proxy_stream_end_headers_out(stream);
1246             stream->data_received = 1;
1247             b = apr_bucket_flush_create(stream->r->connection->bucket_alloc);
1248             APR_BRIGADE_INSERT_TAIL(stream->output, b);
1249             b = apr_bucket_eos_create(stream->r->connection->bucket_alloc);
1250             APR_BRIGADE_INSERT_TAIL(stream->output, b);
1251             ap_pass_brigade(stream->r->output_filters, stream->output);
1252         }
1253         
1254         stream->state = H2_STREAM_ST_CLOSED;
1255         h2_proxy_ihash_remove(session->streams, stream_id);
1256         h2_proxy_iq_remove(session->suspended, stream_id);
1257         if (session->done) {
1258             session->done(session, stream->r, status, touched);
1259         }
1260     }
1261     
1262     switch (session->state) {
1263         default:
1264             /* nop */
1265             break;
1266     }
1267 }
1268
1269 static void ev_stream_resumed(h2_proxy_session *session, int arg, const char *msg)
1270 {
1271     switch (session->state) {
1272         case H2_PROXYS_ST_WAIT:
1273             transit(session, "stream resumed", H2_PROXYS_ST_BUSY);
1274             break;
1275         default:
1276             /* nop */
1277             break;
1278     }
1279 }
1280
1281 static void ev_data_read(h2_proxy_session *session, int arg, const char *msg)
1282 {
1283     switch (session->state) {
1284         case H2_PROXYS_ST_IDLE:
1285         case H2_PROXYS_ST_WAIT:
1286             transit(session, "data read", H2_PROXYS_ST_BUSY);
1287             break;
1288         default:
1289             /* nop */
1290             break;
1291     }
1292 }
1293
1294 static void ev_ngh2_done(h2_proxy_session *session, int arg, const char *msg)
1295 {
1296     switch (session->state) {
1297         case H2_PROXYS_ST_DONE:
1298             /* nop */
1299             break;
1300         default:
1301             transit(session, "nghttp2 done", H2_PROXYS_ST_DONE);
1302             break;
1303     }
1304 }
1305
1306 static void ev_pre_close(h2_proxy_session *session, int arg, const char *msg)
1307 {
1308     switch (session->state) {
1309         case H2_PROXYS_ST_DONE:
1310         case H2_PROXYS_ST_LOCAL_SHUTDOWN:
1311             /* nop */
1312             break;
1313         default:
1314             session_shutdown(session, arg, msg);
1315             break;
1316     }
1317 }
1318
1319 static void dispatch_event(h2_proxy_session *session, h2_proxys_event_t ev, 
1320                            int arg, const char *msg)
1321 {
1322     switch (ev) {
1323         case H2_PROXYS_EV_INIT:
1324             ev_init(session, arg, msg);
1325             break;            
1326         case H2_PROXYS_EV_LOCAL_GOAWAY:
1327             ev_local_goaway(session, arg, msg);
1328             break;
1329         case H2_PROXYS_EV_REMOTE_GOAWAY:
1330             ev_remote_goaway(session, arg, msg);
1331             break;
1332         case H2_PROXYS_EV_CONN_ERROR:
1333             ev_conn_error(session, arg, msg);
1334             break;
1335         case H2_PROXYS_EV_PROTO_ERROR:
1336             ev_proto_error(session, arg, msg);
1337             break;
1338         case H2_PROXYS_EV_CONN_TIMEOUT:
1339             ev_conn_timeout(session, arg, msg);
1340             break;
1341         case H2_PROXYS_EV_NO_IO:
1342             ev_no_io(session, arg, msg);
1343             break;
1344         case H2_PROXYS_EV_STREAM_SUBMITTED:
1345             ev_stream_submitted(session, arg, msg);
1346             break;
1347         case H2_PROXYS_EV_STREAM_DONE:
1348             ev_stream_done(session, arg, msg);
1349             break;
1350         case H2_PROXYS_EV_STREAM_RESUMED:
1351             ev_stream_resumed(session, arg, msg);
1352             break;
1353         case H2_PROXYS_EV_DATA_READ:
1354             ev_data_read(session, arg, msg);
1355             break;
1356         case H2_PROXYS_EV_NGH2_DONE:
1357             ev_ngh2_done(session, arg, msg);
1358             break;
1359         case H2_PROXYS_EV_PRE_CLOSE:
1360             ev_pre_close(session, arg, msg);
1361             break;
1362         default:
1363             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
1364                           "h2_proxy_session(%s): unknown event %d", 
1365                           session->id, ev);
1366             break;
1367     }
1368 }
1369
1370 static int send_loop(h2_proxy_session *session)
1371 {
1372     while (nghttp2_session_want_write(session->ngh2)) {
1373         int rv = nghttp2_session_send(session->ngh2);
1374         if (rv < 0 && nghttp2_is_fatal(rv)) {
1375             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, 
1376                           "h2_proxy_session(%s): write, rv=%d", session->id, rv);
1377             dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, rv, NULL);
1378             break;
1379         }
1380         return 1;
1381     }
1382     return 0;
1383 }
1384
1385 apr_status_t h2_proxy_session_process(h2_proxy_session *session)
1386 {
1387     apr_status_t status;
1388     int have_written = 0, have_read = 0;
1389
1390     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, 
1391                   "h2_proxy_session(%s): process", session->id);
1392            
1393 run_loop:
1394     switch (session->state) {
1395         case H2_PROXYS_ST_INIT:
1396             status = session_start(session);
1397             if (status == APR_SUCCESS) {
1398                 dispatch_event(session, H2_PROXYS_EV_INIT, 0, NULL);
1399                 goto run_loop;
1400             }
1401             else {
1402                 dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, status, NULL);
1403             }
1404             break;
1405             
1406         case H2_PROXYS_ST_BUSY:
1407         case H2_PROXYS_ST_LOCAL_SHUTDOWN:
1408         case H2_PROXYS_ST_REMOTE_SHUTDOWN:
1409             have_written = send_loop(session);
1410             
1411             if (nghttp2_session_want_read(session->ngh2)) {
1412                 status = h2_proxy_session_read(session, 0, 0);
1413                 if (status == APR_SUCCESS) {
1414                     have_read = 1;
1415                 }
1416             }
1417             
1418             if (!have_written && !have_read 
1419                 && !nghttp2_session_want_write(session->ngh2)) {
1420                 dispatch_event(session, H2_PROXYS_EV_NO_IO, 0, NULL);
1421                 goto run_loop;
1422             }
1423             break;
1424             
1425         case H2_PROXYS_ST_WAIT:
1426             if (check_suspended(session) == APR_EAGAIN) {
1427                 /* no stream has become resumed. Do a blocking read with
1428                  * ever increasing timeouts... */
1429                 if (session->wait_timeout < 25) {
1430                     session->wait_timeout = 25;
1431                 }
1432                 else {
1433                     session->wait_timeout = H2MIN(apr_time_from_msec(100), 
1434                                                   2*session->wait_timeout);
1435                 }
1436                 
1437                 status = h2_proxy_session_read(session, 1, session->wait_timeout);
1438                 ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, session->c, 
1439                               APLOGNO(03365)
1440                               "h2_proxy_session(%s): WAIT read, timeout=%fms", 
1441                               session->id, (float)session->wait_timeout/1000.0);
1442                 if (status == APR_SUCCESS) {
1443                     have_read = 1;
1444                     dispatch_event(session, H2_PROXYS_EV_DATA_READ, 0, NULL);
1445                 }
1446                 else if (APR_STATUS_IS_TIMEUP(status)
1447                     || APR_STATUS_IS_EAGAIN(status)) {
1448                     /* go back to checking all inputs again */
1449                     transit(session, "wait cycle", H2_PROXYS_ST_BUSY);
1450                 }
1451             }
1452             break;
1453             
1454         case H2_PROXYS_ST_IDLE:
1455             break;
1456
1457         case H2_PROXYS_ST_DONE: /* done, session terminated */
1458             return APR_EOF;
1459             
1460         default:
1461             ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EGENERAL, session->c,
1462                           APLOGNO(03346)"h2_proxy_session(%s): unknown state %d", 
1463                           session->id, session->state);
1464             dispatch_event(session, H2_PROXYS_EV_PROTO_ERROR, 0, NULL);
1465             break;
1466     }
1467
1468
1469     if (have_read || have_written) {
1470         session->wait_timeout = 0;
1471     }
1472     
1473     if (!nghttp2_session_want_read(session->ngh2)
1474         && !nghttp2_session_want_write(session->ngh2)) {
1475         dispatch_event(session, H2_PROXYS_EV_NGH2_DONE, 0, NULL);
1476     }
1477     
1478     return APR_SUCCESS; /* needs to be called again */
1479 }
1480
1481 typedef struct {
1482     h2_proxy_session *session;
1483     h2_proxy_request_done *done;
1484 } cleanup_iter_ctx;
1485
1486 static int cancel_iter(void *udata, void *val)
1487 {
1488     cleanup_iter_ctx *ctx = udata;
1489     h2_proxy_stream *stream = val;
1490     nghttp2_submit_rst_stream(ctx->session->ngh2, NGHTTP2_FLAG_NONE,
1491                               stream->id, 0);
1492     return 1;
1493 }
1494
1495 void h2_proxy_session_cancel_all(h2_proxy_session *session)
1496 {
1497     if (!h2_proxy_ihash_empty(session->streams)) {
1498         cleanup_iter_ctx ctx;
1499         ctx.session = session;
1500         ctx.done = session->done;
1501         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03366)
1502                       "h2_proxy_session(%s): cancel  %d streams",
1503                       session->id, (int)h2_proxy_ihash_count(session->streams));
1504         h2_proxy_ihash_iter(session->streams, cancel_iter, &ctx);
1505         session_shutdown(session, 0, NULL);
1506     }
1507 }
1508
1509 static int done_iter(void *udata, void *val)
1510 {
1511     cleanup_iter_ctx *ctx = udata;
1512     h2_proxy_stream *stream = val;
1513     int touched = (stream->data_sent || 
1514                    stream->id <= ctx->session->last_stream_id);
1515     ctx->done(ctx->session, stream->r, APR_ECONNABORTED, touched);
1516     return 1;
1517 }
1518
1519 void h2_proxy_session_cleanup(h2_proxy_session *session, 
1520                               h2_proxy_request_done *done)
1521 {
1522     if (!h2_proxy_ihash_empty(session->streams)) {
1523         cleanup_iter_ctx ctx;
1524         ctx.session = session;
1525         ctx.done = done;
1526         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03519)
1527                       "h2_proxy_session(%s): terminated, %d streams unfinished",
1528                       session->id, (int)h2_proxy_ihash_count(session->streams));
1529         h2_proxy_ihash_iter(session->streams, done_iter, &ctx);
1530         h2_proxy_ihash_clear(session->streams);
1531     }
1532 }
1533
1534 static int ping_arrived_iter(void *udata, void *val)
1535 {
1536     h2_proxy_stream *stream = val;
1537     if (stream->waiting_on_ping) {
1538         stream->waiting_on_ping = 0;
1539         stream_resume(stream);
1540     }
1541     return 1;
1542 }
1543
1544 static void ping_arrived(h2_proxy_session *session)
1545 {
1546     if (!h2_proxy_ihash_empty(session->streams)) {
1547         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03470)
1548                       "h2_proxy_session(%s): ping arrived, unblocking streams",
1549                       session->id);
1550         h2_proxy_ihash_iter(session->streams, ping_arrived_iter, &session);
1551     }
1552 }
1553
1554 typedef struct {
1555     h2_proxy_session *session;
1556     conn_rec *c;
1557     apr_off_t bytes;
1558     int updated;
1559 } win_update_ctx;
1560
1561 static int win_update_iter(void *udata, void *val)
1562 {
1563     win_update_ctx *ctx = udata;
1564     h2_proxy_stream *stream = val;
1565     
1566     if (stream->r && stream->r->connection == ctx->c) {
1567         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, ctx->session->c, 
1568                       "h2_proxy_session(%s-%d): win_update %ld bytes",
1569                       ctx->session->id, (int)stream->id, (long)ctx->bytes);
1570         nghttp2_session_consume(ctx->session->ngh2, stream->id, ctx->bytes);
1571         ctx->updated = 1;
1572         return 0;
1573     }
1574     return 1;
1575 }
1576
1577
1578 void h2_proxy_session_update_window(h2_proxy_session *session, 
1579                                     conn_rec *c, apr_off_t bytes)
1580 {
1581     if (!h2_proxy_ihash_empty(session->streams)) {
1582         win_update_ctx ctx;
1583         ctx.session = session;
1584         ctx.c = c;
1585         ctx.bytes = bytes;
1586         ctx.updated = 0;
1587         h2_proxy_ihash_iter(session->streams, win_update_iter, &ctx);
1588         
1589         if (!ctx.updated) {
1590             /* could not find the stream any more, possibly closed, update
1591              * the connection window at least */
1592             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, 
1593                           "h2_proxy_session(%s): win_update conn %ld bytes",
1594                           session->id, (long)bytes);
1595             nghttp2_session_consume_connection(session->ngh2, (size_t)bytes);
1596         }
1597     }
1598 }
1599