]> granicus.if.org Git - apache/blob
1732102
[apache] /
1 /* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <stddef.h>
17 #include <apr_strings.h>
18 #include <nghttp2/nghttp2.h>
19
20 #include <mpm_common.h>
21 #include <httpd.h>
22 #include <mod_proxy.h>
23
24 #include "mod_http2.h"
25 #include "h2.h"
26 #include "h2_util.h"
27 #include "h2_proxy_session.h"
28
29 APLOG_USE_MODULE(proxy_http2);
30
31 typedef struct h2_proxy_stream {
32     int id;
33     apr_pool_t *pool;
34     h2_proxy_session *session;
35
36     const char *url;
37     request_rec *r;
38     h2_request *req;
39
40     h2_stream_state_t state;
41     unsigned int suspended : 1;
42     unsigned int data_received : 1;
43
44     apr_bucket_brigade *input;
45     apr_bucket_brigade *output;
46     
47     apr_table_t *saves;
48 } h2_proxy_stream;
49
50
51 static void dispatch_event(h2_proxy_session *session, h2_proxys_event_t ev, 
52                            int arg, const char *msg);
53
54
55 static apr_status_t proxy_session_pre_close(void *theconn)
56 {
57     proxy_conn_rec *p_conn = (proxy_conn_rec *)theconn;
58     h2_proxy_session *session = p_conn->data;
59
60     if (session && session->ngh2) {
61         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, 
62                       "proxy_session(%s): pool cleanup, state=%d, streams=%d",
63                       session->id, session->state, 
64                       (int)h2_ihash_count(session->streams));
65         session->aborted = 1;
66         dispatch_event(session, H2_PROXYS_EV_PRE_CLOSE, 0, NULL);
67         nghttp2_session_del(session->ngh2);
68         session->ngh2 = NULL;
69         p_conn->data = NULL;
70     }
71     return APR_SUCCESS;
72 }
73
74 static int proxy_pass_brigade(apr_bucket_alloc_t *bucket_alloc,
75                               proxy_conn_rec *p_conn,
76                               conn_rec *origin, apr_bucket_brigade *bb,
77                               int flush)
78 {
79     apr_status_t status;
80     apr_off_t transferred;
81
82     if (flush) {
83         apr_bucket *e = apr_bucket_flush_create(bucket_alloc);
84         APR_BRIGADE_INSERT_TAIL(bb, e);
85     }
86     apr_brigade_length(bb, 0, &transferred);
87     if (transferred != -1)
88         p_conn->worker->s->transferred += transferred;
89     status = ap_pass_brigade(origin->output_filters, bb);
90     /* Cleanup the brigade now to avoid buckets lifetime
91      * issues in case of error returned below. */
92     apr_brigade_cleanup(bb);
93     if (status != APR_SUCCESS) {
94         ap_log_cerror(APLOG_MARK, APLOG_ERR, status, origin, APLOGNO(03357)
95                       "pass output failed to %pI (%s)",
96                       p_conn->addr, p_conn->hostname);
97     }
98     return status;
99 }
100
101 static ssize_t raw_send(nghttp2_session *ngh2, const uint8_t *data,
102                         size_t length, int flags, void *user_data)
103 {
104     h2_proxy_session *session = user_data;
105     apr_bucket *b;
106     apr_status_t status;
107     int flush = 1;
108
109     if (data) {
110         b = apr_bucket_transient_create((const char*)data, length, 
111                                         session->c->bucket_alloc);
112         APR_BRIGADE_INSERT_TAIL(session->output, b);
113     }
114
115     status = proxy_pass_brigade(session->c->bucket_alloc,  
116                                 session->p_conn, session->c, 
117                                 session->output, flush);
118     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c, 
119                   "h2_proxy_sesssion(%s): raw_send %d bytes, flush=%d", 
120                   session->id, (int)length, flush);
121     if (status != APR_SUCCESS) {
122         return NGHTTP2_ERR_CALLBACK_FAILURE;
123     }
124     return length;
125 }
126
127 static int on_frame_recv(nghttp2_session *ngh2, const nghttp2_frame *frame,
128                          void *user_data) 
129 {
130     h2_proxy_session *session = user_data;
131     int n;
132     
133     if (APLOGcdebug(session->c)) {
134         char buffer[256];
135         
136         h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
137         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03341)
138                       "h2_proxy_session(%s): recv FRAME[%s]",
139                       session->id, buffer);
140     }
141
142     switch (frame->hd.type) {
143         case NGHTTP2_HEADERS:
144             break;
145         case NGHTTP2_PUSH_PROMISE:
146             break;
147         case NGHTTP2_SETTINGS:
148             if (frame->settings.niv > 0) {
149                 n = nghttp2_session_get_remote_settings(ngh2, NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS);
150                 if (n > 0) {
151                     session->remote_max_concurrent = n;
152                 }
153             }
154             break;
155         case NGHTTP2_GOAWAY:
156             /* we expect the remote server to tell us the highest stream id
157              * that it has started processing. */
158             session->last_stream_id = frame->goaway.last_stream_id;
159             dispatch_event(session, H2_PROXYS_EV_REMOTE_GOAWAY, 0, NULL);
160             if (APLOGcinfo(session->c)) {
161                 char buffer[256];
162                 
163                 h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
164                 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03342)
165                               "h2_proxy_session(%s): recv FRAME[%s]",
166                               session->id, buffer);
167             }
168             break;
169         default:
170             break;
171     }
172     return 0;
173 }
174
175 static int before_frame_send(nghttp2_session *ngh2,
176                              const nghttp2_frame *frame, void *user_data)
177 {
178     h2_proxy_session *session = user_data;
179     if (APLOGcdebug(session->c)) {
180         char buffer[256];
181
182         h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
183         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03343)
184                       "h2_proxy_session(%s): sent FRAME[%s]",
185                       session->id, buffer);
186     }
187     return 0;
188 }
189
190 static int add_header(void *table, const char *n, const char *v)
191 {
192     apr_table_addn(table, n, v);
193     return 1;
194 }
195
196 static void process_proxy_header(request_rec *r, const char *n, const char *v)
197 {
198     static const struct {
199         const char *name;
200         ap_proxy_header_reverse_map_fn func;
201     } transform_hdrs[] = {
202         { "Location", ap_proxy_location_reverse_map },
203         { "Content-Location", ap_proxy_location_reverse_map },
204         { "URI", ap_proxy_location_reverse_map },
205         { "Destination", ap_proxy_location_reverse_map },
206         { "Set-Cookie", ap_proxy_cookie_reverse_map },
207         { NULL, NULL }
208     };
209     proxy_dir_conf *dconf;
210     int i;
211     
212     for (i = 0; transform_hdrs[i].name; ++i) {
213         if (!ap_casecmpstr(transform_hdrs[i].name, n)) {
214             dconf = ap_get_module_config(r->per_dir_config, &proxy_module);
215             apr_table_add(r->headers_out, n,
216                           (*transform_hdrs[i].func)(r, dconf, v));
217             return;
218        }
219     }
220     apr_table_add(r->headers_out, n, v);
221 }
222
223 static apr_status_t h2_proxy_stream_add_header_out(h2_proxy_stream *stream,
224                                                    const char *n, apr_size_t nlen,
225                                                    const char *v, apr_size_t vlen)
226 {
227     if (n[0] == ':') {
228         if (!stream->data_received && !strncmp(":status", n, nlen)) {
229             char *s = apr_pstrndup(stream->r->pool, v, vlen);
230             
231             apr_table_setn(stream->r->notes, "proxy-status", s);
232             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, 
233                           "h2_proxy_stream(%s-%d): got status %s", 
234                           stream->session->id, stream->id, s);
235             stream->r->status = (int)apr_atoi64(s);
236             if (stream->r->status <= 0) {
237                 stream->r->status = 500;
238                 return APR_EGENERAL;
239             }
240         }
241         return APR_SUCCESS;
242     }
243     
244     if (!h2_proxy_res_ignore_header(n, nlen)) {
245         char *hname, *hvalue;
246     
247         hname = apr_pstrndup(stream->pool, n, nlen);
248         h2_util_camel_case_header(hname, nlen);
249         hvalue = apr_pstrndup(stream->pool, v, vlen);
250         
251         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, 
252                       "h2_proxy_stream(%s-%d): got header %s: %s", 
253                       stream->session->id, stream->id, hname, hvalue);
254         process_proxy_header(stream->r, hname, hvalue);
255     }
256     return APR_SUCCESS;
257 }
258
259 static int log_header(void *ctx, const char *key, const char *value)
260 {
261     h2_proxy_stream *stream = ctx;
262     ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, stream->r, 
263                   "h2_proxy_stream(%s-%d), header_out %s: %s", 
264                   stream->session->id, stream->id, key, value);
265     return 1;
266 }
267
268 static void h2_proxy_stream_end_headers_out(h2_proxy_stream *stream) 
269 {
270     h2_proxy_session *session = stream->session;
271     request_rec *r = stream->r;
272     apr_pool_t *p = r->pool;
273     
274     /* Now, add in the cookies from the response to the ones already saved */
275     apr_table_do(add_header, stream->saves, r->headers_out, "Set-Cookie", NULL);
276     
277     /* and now load 'em all in */
278     if (!apr_is_empty_table(stream->saves)) {
279         apr_table_unset(r->headers_out, "Set-Cookie");
280         r->headers_out = apr_table_overlay(p, r->headers_out, stream->saves);
281     }
282     
283     /* handle Via header in response */
284     if (session->conf->viaopt != via_off 
285         && session->conf->viaopt != via_block) {
286         const char *server_name = ap_get_server_name(stream->r);
287         apr_port_t port = ap_get_server_port(stream->r);
288         char portstr[32];
289         
290         /* If USE_CANONICAL_NAME_OFF was configured for the proxy virtual host,
291          * then the server name returned by ap_get_server_name() is the
292          * origin server name (which does make too much sense with Via: headers)
293          * so we use the proxy vhost's name instead.
294          */
295         if (server_name == stream->r->hostname) {
296             server_name = stream->r->server->server_hostname;
297         }
298         if (ap_is_default_port(port, stream->r)) {
299             portstr[0] = '\0';
300         }
301         else {
302             apr_snprintf(portstr, sizeof(portstr), ":%d", port);
303         }
304
305         /* create a "Via:" response header entry and merge it */
306         apr_table_addn(r->headers_out, "Via",
307                        (session->conf->viaopt == via_full)
308                        ? apr_psprintf(p, "%d.%d %s%s (%s)",
309                                       HTTP_VERSION_MAJOR(r->proto_num),
310                                       HTTP_VERSION_MINOR(r->proto_num),
311                                       server_name, portstr,
312                                       AP_SERVER_BASEVERSION)
313                        : apr_psprintf(p, "%d.%d %s%s",
314                                       HTTP_VERSION_MAJOR(r->proto_num),
315                                       HTTP_VERSION_MINOR(r->proto_num),
316                                       server_name, portstr)
317                        );
318     }
319     
320     if (APLOGrtrace2(stream->r)) {
321         ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, stream->r, 
322                       "h2_proxy_stream(%s-%d), header_out after merging", 
323                       stream->session->id, stream->id);
324         apr_table_do(log_header, stream, stream->r->headers_out, NULL);
325     }
326 }
327
328 static int on_data_chunk_recv(nghttp2_session *ngh2, uint8_t flags,
329                               int32_t stream_id, const uint8_t *data,
330                               size_t len, void *user_data) 
331 {
332     h2_proxy_session *session = user_data;
333     h2_proxy_stream *stream;
334     apr_bucket *b;
335     apr_status_t status;
336     
337     stream = nghttp2_session_get_stream_user_data(ngh2, stream_id);
338     if (!stream) {
339         ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(03358)
340                      "h2_proxy_session(%s): recv data chunk for "
341                      "unknown stream %d, ignored", 
342                      session->id, stream_id);
343         return 0;
344     }
345     
346     if (!stream->data_received) {
347         /* last chance to manipulate response headers.
348          * after this, only trailers */
349         h2_proxy_stream_end_headers_out(stream);
350         stream->data_received = 1;
351     }
352     
353     b = apr_bucket_transient_create((const char*)data, len, 
354                                     stream->r->connection->bucket_alloc);
355     APR_BRIGADE_INSERT_TAIL(stream->output, b);
356     /* always flush after a DATA frame, as we have no other indication
357      * of buffer use */
358     b = apr_bucket_flush_create(stream->r->connection->bucket_alloc);
359     APR_BRIGADE_INSERT_TAIL(stream->output, b);
360     
361     ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, stream->r, APLOGNO(03359)
362                   "h2_proxy_session(%s): pass response data for "
363                   "stream %d, %d bytes", session->id, stream_id, (int)len);
364     status = ap_pass_brigade(stream->r->output_filters, stream->output);
365     if (status != APR_SUCCESS) {
366         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, APLOGNO(03344)
367                       "h2_proxy_session(%s): passing output on stream %d", 
368                       session->id, stream->id);
369         nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE,
370                                   stream_id, NGHTTP2_STREAM_CLOSED);
371         return NGHTTP2_ERR_STREAM_CLOSING;
372     }
373     return 0;
374 }
375
376 static int on_stream_close(nghttp2_session *ngh2, int32_t stream_id,
377                            uint32_t error_code, void *user_data) 
378 {
379     h2_proxy_session *session = user_data;
380     if (!session->aborted) {
381         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03360)
382                       "h2_proxy_session(%s): stream=%d, closed, err=%d", 
383                       session->id, stream_id, error_code);
384         dispatch_event(session, H2_PROXYS_EV_STREAM_DONE, stream_id, NULL);
385     }
386     return 0;
387 }
388
389 static int on_header(nghttp2_session *ngh2, const nghttp2_frame *frame,
390                      const uint8_t *namearg, size_t nlen,
391                      const uint8_t *valuearg, size_t vlen, uint8_t flags,
392                      void *user_data) 
393 {
394     h2_proxy_session *session = user_data;
395     h2_proxy_stream *stream;
396     const char *n = (const char*)namearg;
397     const char *v = (const char*)valuearg;
398     
399     (void)session;
400     if (frame->hd.type == NGHTTP2_HEADERS && nlen) {
401         stream = nghttp2_session_get_stream_user_data(ngh2, frame->hd.stream_id);
402         if (stream) {
403             if (h2_proxy_stream_add_header_out(stream, n, nlen, v, vlen)) {
404                 return NGHTTP2_ERR_CALLBACK_FAILURE;
405             }
406         }
407     }
408     else if (frame->hd.type == NGHTTP2_PUSH_PROMISE) {
409     }
410     
411     return 0;
412 }
413
414 static ssize_t stream_data_read(nghttp2_session *ngh2, int32_t stream_id, 
415                                 uint8_t *buf, size_t length,
416                                 uint32_t *data_flags, 
417                                 nghttp2_data_source *source, void *user_data)
418 {
419     h2_proxy_stream *stream;
420     apr_status_t status = APR_SUCCESS;
421     
422     *data_flags = 0;
423     stream = nghttp2_session_get_stream_user_data(ngh2, stream_id);
424     if (!stream) {
425         ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(03361)
426                      "h2_proxy_stream(%s): data_read, stream %d not found", 
427                      stream->session->id, stream_id);
428         return NGHTTP2_ERR_CALLBACK_FAILURE;
429     }
430     
431     if (APR_BRIGADE_EMPTY(stream->input)) {
432         status = ap_get_brigade(stream->r->input_filters, stream->input,
433                                 AP_MODE_READBYTES, APR_NONBLOCK_READ,
434                                 H2MAX(APR_BUCKET_BUFF_SIZE, length));
435         ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, stream->r, 
436                       "h2_proxy_stream(%s-%d): request body read", 
437                       stream->session->id, stream->id);
438     }
439
440     if (status == APR_SUCCESS) {
441         ssize_t readlen = 0;
442         while (status == APR_SUCCESS 
443                && (readlen < length)
444                && !APR_BRIGADE_EMPTY(stream->input)) {
445             apr_bucket* b = APR_BRIGADE_FIRST(stream->input);
446             if (APR_BUCKET_IS_METADATA(b)) {
447                 if (APR_BUCKET_IS_EOS(b)) {
448                     *data_flags |= NGHTTP2_DATA_FLAG_EOF;
449                 }
450                 else {
451                     /* we do nothing more regarding any meta here */
452                 }
453             }
454             else {
455                 const char *bdata = NULL;
456                 apr_size_t blen = 0;
457                 status = apr_bucket_read(b, &bdata, &blen, APR_BLOCK_READ);
458                 
459                 if (status == APR_SUCCESS && blen > 0) {
460                     ssize_t copylen = H2MIN(length - readlen, blen);
461                     memcpy(buf, bdata, copylen);
462                     buf += copylen;
463                     readlen += copylen;
464                     if (copylen < blen) {
465                         /* We have data left in the bucket. Split it. */
466                         status = apr_bucket_split(b, copylen);
467                     }
468                 }
469             }
470             apr_bucket_delete(b);
471         }
472
473         ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, stream->r, 
474                       "h2_proxy_stream(%d): request body read %ld bytes, flags=%d", 
475                       stream->id, (long)readlen, (int)*data_flags);
476         return readlen;
477     }
478     else if (APR_STATUS_IS_EAGAIN(status)) {
479         /* suspended stream, needs to be re-awakened */
480         ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, stream->r, 
481                       "h2_proxy_stream(%s-%d): suspending", 
482                       stream->session->id, stream_id);
483         stream->suspended = 1;
484         h2_iq_add(stream->session->suspended, stream->id, NULL, NULL);
485         return NGHTTP2_ERR_DEFERRED;
486     }
487     else {
488         nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE, 
489                                   stream_id, NGHTTP2_STREAM_CLOSED);
490         return NGHTTP2_ERR_STREAM_CLOSING;
491     }
492 }
493
494 h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn,
495                                          proxy_server_conf *conf,
496                                          unsigned char window_bits_connection,
497                                          unsigned char window_bits_stream,
498                                          h2_proxy_request_done *done)
499 {
500     if (!p_conn->data) {
501         apr_pool_t *pool = p_conn->scpool;
502         h2_proxy_session *session;
503         nghttp2_session_callbacks *cbs;
504         nghttp2_option *option;
505
506         session = apr_pcalloc(pool, sizeof(*session));
507         apr_pool_pre_cleanup_register(pool, p_conn, proxy_session_pre_close);
508         p_conn->data = session;
509         
510         session->id = apr_pstrdup(p_conn->scpool, id);
511         session->c = p_conn->connection;
512         session->p_conn = p_conn;
513         session->conf = conf;
514         session->pool = p_conn->scpool;
515         session->state = H2_PROXYS_ST_INIT;
516         session->window_bits_stream = window_bits_stream;
517         session->window_bits_connection = window_bits_connection;
518         session->streams = h2_ihash_create(pool, offsetof(h2_proxy_stream, id));
519         session->suspended = h2_iq_create(pool, 5);
520         session->done = done;
521     
522         session->input = apr_brigade_create(session->pool, session->c->bucket_alloc);
523         session->output = apr_brigade_create(session->pool, session->c->bucket_alloc);
524     
525         nghttp2_session_callbacks_new(&cbs);
526         nghttp2_session_callbacks_set_on_frame_recv_callback(cbs, on_frame_recv);
527         nghttp2_session_callbacks_set_on_data_chunk_recv_callback(cbs, on_data_chunk_recv);
528         nghttp2_session_callbacks_set_on_stream_close_callback(cbs, on_stream_close);
529         nghttp2_session_callbacks_set_on_header_callback(cbs, on_header);
530         nghttp2_session_callbacks_set_before_frame_send_callback(cbs, before_frame_send);
531         nghttp2_session_callbacks_set_send_callback(cbs, raw_send);
532         
533         nghttp2_option_new(&option);
534         nghttp2_option_set_peer_max_concurrent_streams(option, 100);
535         nghttp2_option_set_no_auto_window_update(option, 1);
536         
537         nghttp2_session_client_new2(&session->ngh2, cbs, session, option);
538         
539         nghttp2_option_del(option);
540         nghttp2_session_callbacks_del(cbs);
541
542         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03362)
543                       "setup session for %s", p_conn->hostname);
544     }
545     return p_conn->data;
546 }
547
548 static apr_status_t session_start(h2_proxy_session *session) 
549 {
550     nghttp2_settings_entry settings[2];
551     int rv, add_conn_window;
552     apr_socket_t *s;
553     
554     s = ap_get_conn_socket(session->c);
555 #if (!defined(WIN32) && !defined(NETWARE)) || defined(DOXYGEN)
556     if (s) {
557         ap_sock_disable_nagle(s);
558     }
559 #endif
560     
561     settings[0].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH;
562     settings[0].value = 0;
563     settings[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
564     settings[1].value = (1 << session->window_bits_stream) - 1;
565     
566     rv = nghttp2_submit_settings(session->ngh2, NGHTTP2_FLAG_NONE, settings, 
567                                  H2_ALEN(settings));
568     
569     /* If the connection window is larger than our default, trigger a WINDOW_UPDATE */
570     add_conn_window = ((1 << session->window_bits_connection) - 1 -
571                        NGHTTP2_INITIAL_CONNECTION_WINDOW_SIZE);
572     if (!rv && add_conn_window != 0) {
573         rv = nghttp2_submit_window_update(session->ngh2, NGHTTP2_FLAG_NONE, 0, add_conn_window);
574     }
575     return rv? APR_EGENERAL : APR_SUCCESS;
576 }
577
578 static apr_status_t open_stream(h2_proxy_session *session, const char *url,
579                                 request_rec *r, h2_proxy_stream **pstream)
580 {
581     h2_proxy_stream *stream;
582     apr_uri_t puri;
583     const char *authority, *scheme, *path;
584     apr_status_t status;
585
586     stream = apr_pcalloc(r->pool, sizeof(*stream));
587
588     stream->pool = r->pool;
589     stream->url = url;
590     stream->r = r;
591     stream->session = session;
592     stream->state = H2_STREAM_ST_IDLE;
593     
594     stream->input = apr_brigade_create(stream->pool, session->c->bucket_alloc);
595     stream->output = apr_brigade_create(stream->pool, session->c->bucket_alloc);
596     
597     stream->req = h2_req_create(1, stream->pool, 0);
598
599     status = apr_uri_parse(stream->pool, url, &puri);
600     if (status != APR_SUCCESS)
601         return status;
602
603     scheme = (strcmp(puri.scheme, "h2")? "http" : "https");
604     authority = puri.hostname;
605     if (!ap_strchr_c(authority, ':') && puri.port
606         && apr_uri_port_of_scheme(scheme) != puri.port) {
607         /* port info missing and port is not default for scheme: append */
608         authority = apr_psprintf(stream->pool, "%s:%d", authority, puri.port);
609     }
610     path = apr_uri_unparse(stream->pool, &puri, APR_URI_UNP_OMITSITEPART);
611     h2_req_make(stream->req, stream->pool, r->method, scheme,
612                 authority, path, r->headers_in);
613
614     /* Tuck away all already existing cookies */
615     stream->saves = apr_table_make(r->pool, 2);
616     apr_table_do(add_header, stream->saves, r->headers_out,"Set-Cookie", NULL);
617
618     *pstream = stream;
619     
620     return APR_SUCCESS;
621 }
622
623 static apr_status_t submit_stream(h2_proxy_session *session, h2_proxy_stream *stream)
624 {
625     h2_ngheader *hd;
626     nghttp2_data_provider *pp = NULL;
627     nghttp2_data_provider provider;
628     int rv;
629     apr_status_t status;
630
631     hd = h2_util_ngheader_make_req(stream->pool, stream->req);
632     
633     status = ap_get_brigade(stream->r->input_filters, stream->input,
634                             AP_MODE_READBYTES, APR_NONBLOCK_READ,
635                             APR_BUCKET_BUFF_SIZE);
636     if ((status == APR_SUCCESS && !APR_BUCKET_IS_EOS(APR_BRIGADE_FIRST(stream->input)))
637         || APR_STATUS_IS_EAGAIN(status)) {
638         /* there might be data coming */
639         provider.source.fd = 0;
640         provider.source.ptr = NULL;
641         provider.read_callback = stream_data_read;
642         pp = &provider;
643     }
644
645     rv = nghttp2_submit_request(session->ngh2, NULL, 
646                                 hd->nv, hd->nvlen, pp, stream);
647                                 
648     if (APLOGcdebug(session->c)) {
649         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03363)
650                       "h2_proxy_session(%s): submit %s%s -> %d", 
651                       session->id, stream->req->authority, stream->req->path,
652                       rv);
653     }
654     
655     if (rv > 0) {
656         stream->id = rv;
657         stream->state = H2_STREAM_ST_OPEN;
658         h2_ihash_add(session->streams, stream);
659         dispatch_event(session, H2_PROXYS_EV_STREAM_SUBMITTED, rv, NULL);
660         
661         return APR_SUCCESS;
662     }
663     return APR_EGENERAL;
664 }
665
666 static apr_status_t feed_brigade(h2_proxy_session *session, apr_bucket_brigade *bb)
667 {
668     apr_status_t status = APR_SUCCESS;
669     apr_size_t readlen = 0;
670     ssize_t n;
671     
672     while (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) {
673         apr_bucket* b = APR_BRIGADE_FIRST(bb);
674         
675         if (APR_BUCKET_IS_METADATA(b)) {
676             /* nop */
677         }
678         else {
679             const char *bdata = NULL;
680             apr_size_t blen = 0;
681             
682             status = apr_bucket_read(b, &bdata, &blen, APR_BLOCK_READ);
683             if (status == APR_SUCCESS && blen > 0) {
684                 n = nghttp2_session_mem_recv(session->ngh2, (const uint8_t *)bdata, blen);
685                 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, 
686                               "h2_proxy_session(%s): feeding %ld bytes -> %ld", 
687                               session->id, (long)blen, (long)n);
688                 if (n < 0) {
689                     if (nghttp2_is_fatal((int)n)) {
690                         status = APR_EGENERAL;
691                     }
692                 }
693                 else {
694                     readlen += n;
695                     if (n < blen) {
696                         apr_bucket_split(b, n);
697                     }
698                 }
699             }
700         }
701         apr_bucket_delete(b);
702     }
703     
704     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c, 
705                   "h2_proxy_session(%s): fed %ld bytes of input to session", 
706                   session->id, (long)readlen);
707     if (readlen == 0 && status == APR_SUCCESS) {
708         return APR_EAGAIN;
709     }
710     return status;
711 }
712
713 static apr_status_t h2_proxy_session_read(h2_proxy_session *session, int block, 
714                                           apr_interval_time_t timeout)
715 {
716     apr_status_t status = APR_SUCCESS;
717     
718     if (APR_BRIGADE_EMPTY(session->input)) {
719         apr_socket_t *socket = NULL;
720         apr_time_t save_timeout = -1;
721         
722         if (block) {
723             socket = ap_get_conn_socket(session->c);
724             if (socket) {
725                 apr_socket_timeout_get(socket, &save_timeout);
726                 apr_socket_timeout_set(socket, timeout);
727             }
728             else {
729                 /* cannot block on timeout */
730                 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, session->c, APLOGNO(03379)
731                               "h2_proxy_session(%s): unable to get conn socket", 
732                               session->id);
733                 return APR_ENOTIMPL;
734             }
735         }
736         
737         status = ap_get_brigade(session->c->input_filters, session->input, 
738                                 AP_MODE_READBYTES, 
739                                 block? APR_BLOCK_READ : APR_NONBLOCK_READ, 
740                                 64 * 1024);
741         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, 
742                       "h2_proxy_session(%s): read from conn", session->id);
743         if (socket && save_timeout != -1) {
744             apr_socket_timeout_set(socket, save_timeout);
745         }
746     }
747     
748     if (status == APR_SUCCESS) {
749         status = feed_brigade(session, session->input);
750     }
751     else if (APR_STATUS_IS_TIMEUP(status)) {
752         /* nop */
753     }
754     else if (!APR_STATUS_IS_EAGAIN(status)) {
755         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, APLOGNO(03380)
756                       "h2_proxy_session(%s): read error", session->id);
757         dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, status, NULL);
758     }
759
760     return status;
761 }
762
763 apr_status_t h2_proxy_session_submit(h2_proxy_session *session, 
764                                      const char *url, request_rec *r)
765 {
766     h2_proxy_stream *stream;
767     apr_status_t status;
768     
769     status = open_stream(session, url, r, &stream);
770     if (status == APR_SUCCESS) {
771         ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(03381)
772                       "process stream(%d): %s %s%s, original: %s", 
773                       stream->id, stream->req->method, 
774                       stream->req->authority, stream->req->path, 
775                       r->the_request);
776         status = submit_stream(session, stream);
777     }
778     return status;
779 }
780
781 static apr_status_t check_suspended(h2_proxy_session *session)
782 {
783     h2_proxy_stream *stream;
784     int i, stream_id;
785     apr_status_t status;
786     
787     for (i = 0; i < session->suspended->nelts; ++i) {
788         stream_id = session->suspended->elts[i];
789         stream = nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
790         if (stream) {
791             status = ap_get_brigade(stream->r->input_filters, stream->input,
792                                     AP_MODE_READBYTES, APR_NONBLOCK_READ,
793                                     APR_BUCKET_BUFF_SIZE);
794             if (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(stream->input)) {
795                 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, 
796                               "h2_proxy_stream(%s-%d): resuming", 
797                               session->id, stream_id);
798                 stream->suspended = 0;
799                 h2_iq_remove(session->suspended, stream_id);
800                 nghttp2_session_resume_data(session->ngh2, stream_id);
801                 dispatch_event(session, H2_PROXYS_EV_STREAM_RESUMED, 0, NULL);
802                 check_suspended(session);
803                 return APR_SUCCESS;
804             }
805             else if (status != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(status)) {
806                 ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, session->c, 
807                               APLOGNO(03382) "h2_proxy_stream(%s-%d): check input", 
808                               session->id, stream_id);
809                 h2_iq_remove(session->suspended, stream_id);
810                 dispatch_event(session, H2_PROXYS_EV_STREAM_RESUMED, 0, NULL);
811                 check_suspended(session);
812                 return APR_SUCCESS;
813             }
814         }
815         else {
816             /* gone? */
817             h2_iq_remove(session->suspended, stream_id);
818             check_suspended(session);
819             return APR_SUCCESS;
820         }
821     }
822     return APR_EAGAIN;
823 }
824
825 static apr_status_t session_shutdown(h2_proxy_session *session, int reason, 
826                                      const char *msg)
827 {
828     apr_status_t status = APR_SUCCESS;
829     const char *err = msg;
830     
831     AP_DEBUG_ASSERT(session);
832     if (!err && reason) {
833         err = nghttp2_strerror(reason);
834     }
835     nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, 0, 
836                           reason, (uint8_t*)err, err? strlen(err):0);
837     status = nghttp2_session_send(session->ngh2);
838     dispatch_event(session, H2_PROXYS_EV_LOCAL_GOAWAY, reason, err);
839     return status;
840 }
841
842
843 static const char *StateNames[] = {
844     "INIT",      /* H2_PROXYS_ST_INIT */
845     "DONE",      /* H2_PROXYS_ST_DONE */
846     "IDLE",      /* H2_PROXYS_ST_IDLE */
847     "BUSY",      /* H2_PROXYS_ST_BUSY */
848     "WAIT",      /* H2_PROXYS_ST_WAIT */
849     "LSHUTDOWN", /* H2_PROXYS_ST_LOCAL_SHUTDOWN */
850     "RSHUTDOWN", /* H2_PROXYS_ST_REMOTE_SHUTDOWN */
851 };
852
853 static const char *state_name(h2_proxys_state state)
854 {
855     if (state >= (sizeof(StateNames)/sizeof(StateNames[0]))) {
856         return "unknown";
857     }
858     return StateNames[state];
859 }
860
861 static int is_accepting_streams(h2_proxy_session *session)
862 {
863     switch (session->state) {
864         case H2_PROXYS_ST_IDLE:
865         case H2_PROXYS_ST_BUSY:
866         case H2_PROXYS_ST_WAIT:
867             return 1;
868         default:
869             return 0;
870     }
871 }
872
873 static void transit(h2_proxy_session *session, const char *action, 
874                     h2_proxys_state nstate)
875 {
876     ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03345)
877                   "h2_proxy_session(%s): transit [%s] -- %s --> [%s]", session->id,
878                   state_name(session->state), action, state_name(nstate));
879     session->state = nstate;
880 }
881
882 static void ev_init(h2_proxy_session *session, int arg, const char *msg)
883 {
884     switch (session->state) {
885         case H2_PROXYS_ST_INIT:
886             if (h2_ihash_empty(session->streams)) {
887                 transit(session, "init", H2_PROXYS_ST_IDLE);
888             }
889             else {
890                 transit(session, "init", H2_PROXYS_ST_BUSY);
891             }
892             break;
893
894         default:
895             /* nop */
896             break;
897     }
898 }
899
900 static void ev_local_goaway(h2_proxy_session *session, int arg, const char *msg)
901 {
902     switch (session->state) {
903         case H2_PROXYS_ST_LOCAL_SHUTDOWN:
904             /* already did that? */
905             break;
906         case H2_PROXYS_ST_IDLE:
907         case H2_PROXYS_ST_REMOTE_SHUTDOWN:
908             /* all done */
909             transit(session, "local goaway", H2_PROXYS_ST_DONE);
910             break;
911         default:
912             transit(session, "local goaway", H2_PROXYS_ST_LOCAL_SHUTDOWN);
913             break;
914     }
915 }
916
917 static void ev_remote_goaway(h2_proxy_session *session, int arg, const char *msg)
918 {
919     switch (session->state) {
920         case H2_PROXYS_ST_REMOTE_SHUTDOWN:
921             /* already received that? */
922             break;
923         case H2_PROXYS_ST_IDLE:
924         case H2_PROXYS_ST_LOCAL_SHUTDOWN:
925             /* all done */
926             transit(session, "remote goaway", H2_PROXYS_ST_DONE);
927             break;
928         default:
929             transit(session, "remote goaway", H2_PROXYS_ST_REMOTE_SHUTDOWN);
930             break;
931     }
932 }
933
934 static void ev_conn_error(h2_proxy_session *session, int arg, const char *msg)
935 {
936     switch (session->state) {
937         case H2_PROXYS_ST_INIT:
938         case H2_PROXYS_ST_DONE:
939         case H2_PROXYS_ST_LOCAL_SHUTDOWN:
940             /* just leave */
941             transit(session, "conn error", H2_PROXYS_ST_DONE);
942             break;
943         
944         default:
945             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, arg, session->c,
946                           "h2_proxy_session(%s): conn error -> shutdown", session->id);
947             session_shutdown(session, arg, msg);
948             break;
949     }
950 }
951
952 static void ev_proto_error(h2_proxy_session *session, int arg, const char *msg)
953 {
954     switch (session->state) {
955         case H2_PROXYS_ST_DONE:
956         case H2_PROXYS_ST_LOCAL_SHUTDOWN:
957             /* just leave */
958             transit(session, "proto error", H2_PROXYS_ST_DONE);
959             break;
960         
961         default:
962             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
963                           "h2_proxy_session(%s): proto error -> shutdown", session->id);
964             session_shutdown(session, arg, msg);
965             break;
966     }
967 }
968
969 static void ev_conn_timeout(h2_proxy_session *session, int arg, const char *msg)
970 {
971     switch (session->state) {
972         case H2_PROXYS_ST_LOCAL_SHUTDOWN:
973             transit(session, "conn timeout", H2_PROXYS_ST_DONE);
974             break;
975         default:
976             session_shutdown(session, arg, msg);
977             transit(session, "conn timeout", H2_PROXYS_ST_DONE);
978             break;
979     }
980 }
981
982 static void ev_no_io(h2_proxy_session *session, int arg, const char *msg)
983 {
984     switch (session->state) {
985         case H2_PROXYS_ST_BUSY:
986         case H2_PROXYS_ST_LOCAL_SHUTDOWN:
987         case H2_PROXYS_ST_REMOTE_SHUTDOWN:
988             /* nothing for input and output to do. If we remain
989              * in this state, we go into a tight loop and suck up
990              * CPU cycles. Ideally, we'd like to do a blocking read, but that
991              * is not possible if we have scheduled tasks and wait
992              * for them to produce something. */
993             if (h2_ihash_empty(session->streams)) {
994                 if (!is_accepting_streams(session)) {
995                     /* We are no longer accepting new streams and have
996                      * finished processing existing ones. Time to leave. */
997                     session_shutdown(session, arg, msg);
998                     transit(session, "no io", H2_PROXYS_ST_DONE);
999                 }
1000                 else {
1001                     /* When we have no streams, no task events are possible,
1002                      * switch to blocking reads */
1003                     transit(session, "no io", H2_PROXYS_ST_IDLE);
1004                 }
1005             }
1006             else {
1007                 /* Unable to do blocking reads, as we wait on events from
1008                  * task processing in other threads. Do a busy wait with
1009                  * backoff timer. */
1010                 transit(session, "no io", H2_PROXYS_ST_WAIT);
1011             }
1012             break;
1013         default:
1014             /* nop */
1015             break;
1016     }
1017 }
1018
1019 static void ev_stream_submitted(h2_proxy_session *session, int stream_id, 
1020                                 const char *msg)
1021 {
1022     switch (session->state) {
1023         case H2_PROXYS_ST_IDLE:
1024         case H2_PROXYS_ST_WAIT:
1025             transit(session, "stream submitted", H2_PROXYS_ST_BUSY);
1026             break;
1027         default:
1028             /* nop */
1029             break;
1030     }
1031 }
1032
1033 static void ev_stream_done(h2_proxy_session *session, int stream_id, 
1034                            const char *msg)
1035 {
1036     h2_proxy_stream *stream;
1037     
1038     stream = nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
1039     if (stream) {
1040         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03364)
1041                       "h2_proxy_sesssion(%s): stream(%d) closed", 
1042                       session->id, stream_id);
1043         
1044         if (!stream->data_received) {
1045             apr_bucket *b;
1046             /* if the response had no body, this is the time to flush
1047              * an empty brigade which will also "write" the resonse
1048              * headers */
1049             h2_proxy_stream_end_headers_out(stream);
1050             stream->data_received = 1;
1051             b = apr_bucket_flush_create(stream->r->connection->bucket_alloc);
1052             APR_BRIGADE_INSERT_TAIL(stream->output, b);
1053             b = apr_bucket_eos_create(stream->r->connection->bucket_alloc);
1054             APR_BRIGADE_INSERT_TAIL(stream->output, b);
1055             ap_pass_brigade(stream->r->output_filters, stream->output);
1056         }
1057         
1058         stream->state = H2_STREAM_ST_CLOSED;
1059         h2_ihash_remove(session->streams, stream_id);
1060         h2_iq_remove(session->suspended, stream_id);
1061         if (session->done) {
1062             session->done(session, stream->r, 1, 1);
1063         }
1064     }
1065     
1066     switch (session->state) {
1067         default:
1068             /* nop */
1069             break;
1070     }
1071 }
1072
1073 static void ev_stream_resumed(h2_proxy_session *session, int arg, const char *msg)
1074 {
1075     switch (session->state) {
1076         case H2_PROXYS_ST_WAIT:
1077             transit(session, "stream resumed", H2_PROXYS_ST_BUSY);
1078             break;
1079         default:
1080             /* nop */
1081             break;
1082     }
1083 }
1084
1085 static void ev_data_read(h2_proxy_session *session, int arg, const char *msg)
1086 {
1087     switch (session->state) {
1088         case H2_PROXYS_ST_IDLE:
1089         case H2_PROXYS_ST_WAIT:
1090             transit(session, "data read", H2_PROXYS_ST_BUSY);
1091             break;
1092         default:
1093             /* nop */
1094             break;
1095     }
1096 }
1097
1098 static void ev_ngh2_done(h2_proxy_session *session, int arg, const char *msg)
1099 {
1100     switch (session->state) {
1101         case H2_PROXYS_ST_DONE:
1102             /* nop */
1103             break;
1104         default:
1105             transit(session, "nghttp2 done", H2_PROXYS_ST_DONE);
1106             break;
1107     }
1108 }
1109
1110 static void ev_pre_close(h2_proxy_session *session, int arg, const char *msg)
1111 {
1112     switch (session->state) {
1113         case H2_PROXYS_ST_DONE:
1114         case H2_PROXYS_ST_LOCAL_SHUTDOWN:
1115             /* nop */
1116             break;
1117         default:
1118             session_shutdown(session, arg, msg);
1119             break;
1120     }
1121 }
1122
1123 static void dispatch_event(h2_proxy_session *session, h2_proxys_event_t ev, 
1124                            int arg, const char *msg)
1125 {
1126     switch (ev) {
1127         case H2_PROXYS_EV_INIT:
1128             ev_init(session, arg, msg);
1129             break;            
1130         case H2_PROXYS_EV_LOCAL_GOAWAY:
1131             ev_local_goaway(session, arg, msg);
1132             break;
1133         case H2_PROXYS_EV_REMOTE_GOAWAY:
1134             ev_remote_goaway(session, arg, msg);
1135             break;
1136         case H2_PROXYS_EV_CONN_ERROR:
1137             ev_conn_error(session, arg, msg);
1138             break;
1139         case H2_PROXYS_EV_PROTO_ERROR:
1140             ev_proto_error(session, arg, msg);
1141             break;
1142         case H2_PROXYS_EV_CONN_TIMEOUT:
1143             ev_conn_timeout(session, arg, msg);
1144             break;
1145         case H2_PROXYS_EV_NO_IO:
1146             ev_no_io(session, arg, msg);
1147             break;
1148         case H2_PROXYS_EV_STREAM_SUBMITTED:
1149             ev_stream_submitted(session, arg, msg);
1150             break;
1151         case H2_PROXYS_EV_STREAM_DONE:
1152             ev_stream_done(session, arg, msg);
1153             break;
1154         case H2_PROXYS_EV_STREAM_RESUMED:
1155             ev_stream_resumed(session, arg, msg);
1156             break;
1157         case H2_PROXYS_EV_DATA_READ:
1158             ev_data_read(session, arg, msg);
1159             break;
1160         case H2_PROXYS_EV_NGH2_DONE:
1161             ev_ngh2_done(session, arg, msg);
1162             break;
1163         case H2_PROXYS_EV_PRE_CLOSE:
1164             ev_pre_close(session, arg, msg);
1165             break;
1166         default:
1167             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
1168                           "h2_proxy_session(%s): unknown event %d", 
1169                           session->id, ev);
1170             break;
1171     }
1172 }
1173
1174 apr_status_t h2_proxy_session_process(h2_proxy_session *session)
1175 {
1176     apr_status_t status;
1177     int have_written = 0, have_read = 0;
1178
1179     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, 
1180                   "h2_proxy_session(%s): process", session->id);
1181            
1182 run_loop:
1183     switch (session->state) {
1184         case H2_PROXYS_ST_INIT:
1185             status = session_start(session);
1186             if (status == APR_SUCCESS) {
1187                 dispatch_event(session, H2_PROXYS_EV_INIT, 0, NULL);
1188                 goto run_loop;
1189             }
1190             else {
1191                 dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, status, NULL);
1192             }
1193             break;
1194             
1195         case H2_PROXYS_ST_BUSY:
1196         case H2_PROXYS_ST_LOCAL_SHUTDOWN:
1197         case H2_PROXYS_ST_REMOTE_SHUTDOWN:
1198             while (nghttp2_session_want_write(session->ngh2)) {
1199                 int rv = nghttp2_session_send(session->ngh2);
1200                 if (rv < 0 && nghttp2_is_fatal(rv)) {
1201                     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, 
1202                                   "h2_proxy_session(%s): write, rv=%d", session->id, rv);
1203                     dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, rv, NULL);
1204                     break;
1205                 }
1206                 have_written = 1;
1207             }
1208             
1209             if (nghttp2_session_want_read(session->ngh2)) {
1210                 status = h2_proxy_session_read(session, 0, 0);
1211                 if (status == APR_SUCCESS) {
1212                     have_read = 1;
1213                 }
1214             }
1215             
1216             if (!have_written && !have_read 
1217                 && !nghttp2_session_want_write(session->ngh2)) {
1218                 dispatch_event(session, H2_PROXYS_EV_NO_IO, 0, NULL);
1219                 goto run_loop;
1220             }
1221             break;
1222             
1223         case H2_PROXYS_ST_WAIT:
1224             if (check_suspended(session) == APR_EAGAIN) {
1225                 /* no stream has become resumed. Do a blocking read with
1226                  * ever increasing timeouts... */
1227                 if (session->wait_timeout < 25) {
1228                     session->wait_timeout = 25;
1229                 }
1230                 else {
1231                     session->wait_timeout = H2MIN(apr_time_from_msec(100), 
1232                                                   2*session->wait_timeout);
1233                 }
1234                 
1235                 status = h2_proxy_session_read(session, 1, session->wait_timeout);
1236                 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, 
1237                               APLOGNO(03365)
1238                               "h2_proxy_session(%s): WAIT read, timeout=%fms", 
1239                               session->id, (float)session->wait_timeout/1000.0);
1240                 if (status == APR_SUCCESS) {
1241                     have_read = 1;
1242                     dispatch_event(session, H2_PROXYS_EV_DATA_READ, 0, NULL);
1243                 }
1244                 else if (APR_STATUS_IS_TIMEUP(status)
1245                     || APR_STATUS_IS_EAGAIN(status)) {
1246                     /* go back to checking all inputs again */
1247                     transit(session, "wait cycle", H2_PROXYS_ST_BUSY);
1248                 }
1249             }
1250             break;
1251             
1252         case H2_PROXYS_ST_IDLE:
1253             break;
1254
1255         case H2_PROXYS_ST_DONE: /* done, session terminated */
1256             return APR_EOF;
1257             
1258         default:
1259             ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EGENERAL, session->c,
1260                           APLOGNO(03346)"h2_proxy_session(%s): unknown state %d", 
1261                           session->id, session->state);
1262             dispatch_event(session, H2_PROXYS_EV_PROTO_ERROR, 0, NULL);
1263             break;
1264     }
1265
1266
1267     if (have_read || have_written) {
1268         session->wait_timeout = 0;
1269     }
1270     
1271     if (!nghttp2_session_want_read(session->ngh2)
1272         && !nghttp2_session_want_write(session->ngh2)) {
1273         dispatch_event(session, H2_PROXYS_EV_NGH2_DONE, 0, NULL);
1274     }
1275     
1276     return APR_SUCCESS; /* needs to be called again */
1277 }
1278
1279 typedef struct {
1280     h2_proxy_session *session;
1281     h2_proxy_request_done *done;
1282 } cleanup_iter_ctx;
1283
1284 static int done_iter(void *udata, void *val)
1285 {
1286     cleanup_iter_ctx *ctx = udata;
1287     h2_proxy_stream *stream = val;
1288     int touched = (!ctx->session->last_stream_id || 
1289                    stream->id <= ctx->session->last_stream_id);
1290     ctx->done(ctx->session, stream->r, 0, touched);
1291     return 1;
1292 }
1293
1294 void h2_proxy_session_cleanup(h2_proxy_session *session, 
1295                               h2_proxy_request_done *done)
1296 {
1297     if (session->streams && !h2_ihash_empty(session->streams)) {
1298         cleanup_iter_ctx ctx;
1299         ctx.session = session;
1300         ctx.done = done;
1301         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03366)
1302                       "h2_proxy_session(%s): terminated, %d streams unfinished",
1303                       session->id, (int)h2_ihash_count(session->streams));
1304         h2_ihash_iter(session->streams, done_iter, &ctx);
1305         h2_ihash_clear(session->streams);
1306     }
1307 }
1308
1309 typedef struct {
1310     h2_proxy_session *session;
1311     conn_rec *c;
1312     apr_off_t bytes;
1313     int updated;
1314 } win_update_ctx;
1315
1316 static int win_update_iter(void *udata, void *val)
1317 {
1318     win_update_ctx *ctx = udata;
1319     h2_proxy_stream *stream = val;
1320     
1321     if (stream->r && stream->r->connection == ctx->c) {
1322         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, ctx->session->c, 
1323                       "h2_proxy_session(%s-%d): win_update %ld bytes",
1324                       ctx->session->id, (int)stream->id, (long)ctx->bytes);
1325         nghttp2_session_consume(ctx->session->ngh2, stream->id, ctx->bytes);
1326         ctx->updated = 1;
1327         return 0;
1328     }
1329     return 1;
1330 }
1331
1332
1333 void h2_proxy_session_update_window(h2_proxy_session *session, 
1334                                     conn_rec *c, apr_off_t bytes)
1335 {
1336     if (session->streams && !h2_ihash_empty(session->streams)) {
1337         win_update_ctx ctx;
1338         ctx.session = session;
1339         ctx.c = c;
1340         ctx.bytes = bytes;
1341         ctx.updated = 0;
1342         h2_ihash_iter(session->streams, win_update_iter, &ctx);
1343         
1344         if (!ctx.updated) {
1345             /* could not find the stream any more, possibly closed, update
1346              * the connection window at least */
1347             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, 
1348                           "h2_proxy_session(%s): win_update conn %ld bytes",
1349                           session->id, (long)bytes);
1350             nghttp2_session_consume_connection(session->ngh2, (size_t)bytes);
1351         }
1352     }
1353 }
1354