]> granicus.if.org Git - apache/blob - modules/http2/h2_filter.c
On the trunk:
[apache] / modules / http2 / h2_filter.c
1 /* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <assert.h>
17
18 #include <apr_strings.h>
19 #include <httpd.h>
20 #include <http_core.h>
21 #include <http_protocol.h>
22 #include <http_log.h>
23 #include <http_connection.h>
24 #include <scoreboard.h>
25
26 #include "h2_private.h"
27 #include "h2.h"
28 #include "h2_config.h"
29 #include "h2_conn_io.h"
30 #include "h2_ctx.h"
31 #include "h2_mplx.h"
32 #include "h2_push.h"
33 #include "h2_task.h"
34 #include "h2_stream.h"
35 #include "h2_request.h"
36 #include "h2_headers.h"
37 #include "h2_stream.h"
38 #include "h2_session.h"
39 #include "h2_util.h"
40 #include "h2_version.h"
41
42 #include "h2_filter.h"
43
44 #define UNSET       -1
45 #define H2MIN(x,y) ((x) < (y) ? (x) : (y))
46
47 static apr_status_t recv_RAW_DATA(conn_rec *c, h2_filter_cin *cin, 
48                                   apr_bucket *b, apr_read_type_e block)
49 {
50     h2_session *session = cin->session;
51     apr_status_t status = APR_SUCCESS;
52     apr_size_t len;
53     const char *data;
54     ssize_t n;
55     
56     status = apr_bucket_read(b, &data, &len, block);
57     
58     while (status == APR_SUCCESS && len > 0) {
59         n = nghttp2_session_mem_recv(session->ngh2, (const uint8_t *)data, len);
60         
61         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
62                       H2_SSSN_MSG(session, "fed %ld bytes to nghttp2, %ld read"),
63                       (long)len, (long)n);
64         if (n < 0) {
65             if (nghttp2_is_fatal((int)n)) {
66                 h2_session_event(session, H2_SESSION_EV_PROTO_ERROR, 
67                                  (int)n, nghttp2_strerror((int)n));
68                 status = APR_EGENERAL;
69             }
70         }
71         else {
72             session->io.bytes_read += n;
73             if (len <= n) {
74                 break;
75             }
76             len -= n;
77             data += n;
78         }
79     }
80     
81     return status;
82 }
83
84 static apr_status_t recv_RAW_brigade(conn_rec *c, h2_filter_cin *cin, 
85                                      apr_bucket_brigade *bb, 
86                                      apr_read_type_e block)
87 {
88     apr_status_t status = APR_SUCCESS;
89     apr_bucket* b;
90     int consumed = 0;
91     
92     h2_util_bb_log(c, c->id, APLOG_TRACE2, "RAW_in", bb);
93     while (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) {
94         b = APR_BRIGADE_FIRST(bb);
95
96         if (APR_BUCKET_IS_METADATA(b)) {
97             /* nop */
98         }
99         else {
100             status = recv_RAW_DATA(c, cin, b, block);
101         }
102         consumed = 1;
103         apr_bucket_delete(b);
104     }
105     
106     if (!consumed && status == APR_SUCCESS && block == APR_NONBLOCK_READ) {
107         return APR_EAGAIN;
108     }
109     return status;
110 }
111
112 h2_filter_cin *h2_filter_cin_create(h2_session *session)
113 {
114     h2_filter_cin *cin;
115     
116     cin = apr_pcalloc(session->pool, sizeof(*cin));
117     if (!cin) {
118         return NULL;
119     }
120     cin->session = session;
121     return cin;
122 }
123
124 void h2_filter_cin_timeout_set(h2_filter_cin *cin, apr_interval_time_t timeout)
125 {
126     cin->timeout = timeout;
127 }
128
129 apr_status_t h2_filter_core_input(ap_filter_t* f,
130                                   apr_bucket_brigade* brigade,
131                                   ap_input_mode_t mode,
132                                   apr_read_type_e block,
133                                   apr_off_t readbytes) 
134 {
135     h2_filter_cin *cin = f->ctx;
136     apr_status_t status = APR_SUCCESS;
137     apr_interval_time_t saved_timeout = UNSET;
138     const int trace1 = APLOGctrace1(f->c);
139     
140     if (trace1) {
141         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
142                       "h2_session(%ld): read, %s, mode=%d, readbytes=%ld", 
143                       (long)f->c->id, (block == APR_BLOCK_READ)? 
144                       "BLOCK_READ" : "NONBLOCK_READ", mode, (long)readbytes);
145     }
146     
147     if (mode == AP_MODE_INIT || mode == AP_MODE_SPECULATIVE) {
148         return ap_get_brigade(f->next, brigade, mode, block, readbytes);
149     }
150     
151     if (mode != AP_MODE_READBYTES) {
152         return (block == APR_BLOCK_READ)? APR_SUCCESS : APR_EAGAIN;
153     }
154     
155     if (!cin->bb) {
156         cin->bb = apr_brigade_create(cin->session->pool, f->c->bucket_alloc);
157     }
158
159     if (!cin->socket) {
160         cin->socket = ap_get_conn_socket(f->c);
161     }
162     
163     if (APR_BRIGADE_EMPTY(cin->bb)) {
164         /* We only do a blocking read when we have no streams to process. So,
165          * in httpd scoreboard lingo, we are in a KEEPALIVE connection state.
166          */
167         if (block == APR_BLOCK_READ) {
168             if (cin->timeout > 0) {
169                 apr_socket_timeout_get(cin->socket, &saved_timeout);
170                 apr_socket_timeout_set(cin->socket, cin->timeout);
171             }
172         }
173         status = ap_get_brigade(f->next, cin->bb, AP_MODE_READBYTES,
174                                 block, readbytes);
175         if (saved_timeout != UNSET) {
176             apr_socket_timeout_set(cin->socket, saved_timeout);
177         }
178     }
179     
180     switch (status) {
181         case APR_SUCCESS:
182             status = recv_RAW_brigade(f->c, cin, cin->bb, block);
183             break;
184         case APR_EOF:
185         case APR_EAGAIN:
186         case APR_TIMEUP:
187             if (trace1) {
188                 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
189                               "h2_session(%ld): read", f->c->id);
190             }
191             break;
192         default:
193             ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, f->c, APLOGNO(03046)
194                           "h2_session(%ld): error reading", f->c->id);
195             break;
196     }
197     return status;
198 }
199
200 /*******************************************************************************
201  * http2 connection status handler + stream out source
202  ******************************************************************************/
203
204 typedef struct {
205     apr_bucket_refcount refcount;
206     h2_bucket_event_cb *cb;
207     void *ctx;
208 } h2_bucket_observer;
209  
210 static apr_status_t bucket_read(apr_bucket *b, const char **str,
211                                 apr_size_t *len, apr_read_type_e block)
212 {
213     (void)b;
214     (void)block;
215     *str = NULL;
216     *len = 0;
217     return APR_SUCCESS;
218 }
219
220 static void bucket_destroy(void *data)
221 {
222     h2_bucket_observer *h = data;
223     if (apr_bucket_shared_destroy(h)) {
224         if (h->cb) {
225             h->cb(h->ctx, H2_BUCKET_EV_BEFORE_DESTROY, NULL);
226         }
227         apr_bucket_free(h);
228     }
229 }
230
231 apr_bucket * h2_bucket_observer_make(apr_bucket *b, h2_bucket_event_cb *cb,
232                                  void *ctx)
233 {
234     h2_bucket_observer *br;
235
236     br = apr_bucket_alloc(sizeof(*br), b->list);
237     br->cb = cb;
238     br->ctx = ctx;
239
240     b = apr_bucket_shared_make(b, br, 0, 0);
241     b->type = &h2_bucket_type_observer;
242     return b;
243
244
245 apr_bucket * h2_bucket_observer_create(apr_bucket_alloc_t *list, 
246                                        h2_bucket_event_cb *cb, void *ctx)
247 {
248     apr_bucket *b = apr_bucket_alloc(sizeof(*b), list);
249
250     APR_BUCKET_INIT(b);
251     b->free = apr_bucket_free;
252     b->list = list;
253     b = h2_bucket_observer_make(b, cb, ctx);
254     return b;
255 }
256                                        
257 apr_status_t h2_bucket_observer_fire(apr_bucket *b, h2_bucket_event event)
258 {
259     if (H2_BUCKET_IS_OBSERVER(b)) {
260         h2_bucket_observer *l = (h2_bucket_observer *)b->data; 
261         return l->cb(l->ctx, event, b);
262     }
263     return APR_EINVAL;
264 }
265
266 const apr_bucket_type_t h2_bucket_type_observer = {
267     "H2OBS", 5, APR_BUCKET_METADATA,
268     bucket_destroy,
269     bucket_read,
270     apr_bucket_setaside_noop,
271     apr_bucket_split_notimpl,
272     apr_bucket_shared_copy
273 };
274
275 apr_bucket *h2_bucket_observer_beam(struct h2_bucket_beam *beam,
276                                     apr_bucket_brigade *dest,
277                                     const apr_bucket *src)
278 {
279     if (H2_BUCKET_IS_OBSERVER(src)) {
280         h2_bucket_observer *l = (h2_bucket_observer *)src->data; 
281         apr_bucket *b = h2_bucket_observer_create(dest->bucket_alloc, 
282                                                   l->cb, l->ctx);
283         APR_BRIGADE_INSERT_TAIL(dest, b);
284         l->cb = NULL;
285         l->ctx = NULL;
286         h2_bucket_observer_fire(b, H2_BUCKET_EV_BEFORE_MASTER_SEND);
287         return b;
288     }
289     return NULL;
290 }
291
292 static apr_status_t bbout(apr_bucket_brigade *bb, const char *fmt, ...)
293 {
294     va_list args;
295     apr_status_t rv;
296
297     va_start(args, fmt);
298     rv = apr_brigade_vprintf(bb, NULL, NULL, fmt, args);
299     va_end(args);
300
301     return rv;
302 }
303
304 static void add_settings(apr_bucket_brigade *bb, h2_session *s, int last) 
305 {
306     h2_mplx *m = s->mplx;
307     
308     bbout(bb, "  \"settings\": {\n");
309     bbout(bb, "    \"SETTINGS_MAX_CONCURRENT_STREAMS\": %d,\n", m->max_streams); 
310     bbout(bb, "    \"SETTINGS_MAX_FRAME_SIZE\": %d,\n", 16*1024); 
311     bbout(bb, "    \"SETTINGS_INITIAL_WINDOW_SIZE\": %d,\n",
312           h2_config_geti(s->config, H2_CONF_WIN_SIZE));
313     bbout(bb, "    \"SETTINGS_ENABLE_PUSH\": %d\n", h2_session_push_enabled(s)); 
314     bbout(bb, "  }%s\n", last? "" : ",");
315 }
316
317 static void add_peer_settings(apr_bucket_brigade *bb, h2_session *s, int last) 
318 {
319     bbout(bb, "  \"peerSettings\": {\n");
320     bbout(bb, "    \"SETTINGS_MAX_CONCURRENT_STREAMS\": %d,\n", 
321         nghttp2_session_get_remote_settings(s->ngh2, NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS)); 
322     bbout(bb, "    \"SETTINGS_MAX_FRAME_SIZE\": %d,\n", 
323         nghttp2_session_get_remote_settings(s->ngh2, NGHTTP2_SETTINGS_MAX_FRAME_SIZE)); 
324     bbout(bb, "    \"SETTINGS_INITIAL_WINDOW_SIZE\": %d,\n", 
325         nghttp2_session_get_remote_settings(s->ngh2, NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE)); 
326     bbout(bb, "    \"SETTINGS_ENABLE_PUSH\": %d,\n", 
327         nghttp2_session_get_remote_settings(s->ngh2, NGHTTP2_SETTINGS_ENABLE_PUSH)); 
328     bbout(bb, "    \"SETTINGS_HEADER_TABLE_SIZE\": %d,\n", 
329         nghttp2_session_get_remote_settings(s->ngh2, NGHTTP2_SETTINGS_HEADER_TABLE_SIZE)); 
330     bbout(bb, "    \"SETTINGS_MAX_HEADER_LIST_SIZE\": %d\n", 
331         nghttp2_session_get_remote_settings(s->ngh2, NGHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE)); 
332     bbout(bb, "  }%s\n", last? "" : ",");
333 }
334
335 typedef struct {
336     apr_bucket_brigade *bb;
337     h2_session *s;
338     int idx;
339 } stream_ctx_t;
340
341 static int add_stream(h2_stream *stream, void *ctx)
342 {
343     stream_ctx_t *x = ctx;
344     int32_t flowIn, flowOut;
345     
346     flowIn = nghttp2_session_get_stream_effective_local_window_size(x->s->ngh2, stream->id); 
347     flowOut = nghttp2_session_get_stream_remote_window_size(x->s->ngh2, stream->id);
348     bbout(x->bb, "%s\n    \"%d\": {\n", (x->idx? "," : ""), stream->id);
349     bbout(x->bb, "    \"state\": \"%s\",\n", h2_stream_state_str(stream));
350     bbout(x->bb, "    \"created\": %f,\n", ((double)stream->created)/APR_USEC_PER_SEC);
351     bbout(x->bb, "    \"flowIn\": %d,\n", flowIn);
352     bbout(x->bb, "    \"flowOut\": %d,\n", flowOut);
353     bbout(x->bb, "    \"dataIn\": %"APR_UINT64_T_FMT",\n", stream->in_data_octets);  
354     bbout(x->bb, "    \"dataOut\": %"APR_UINT64_T_FMT"\n", stream->out_data_octets);  
355     bbout(x->bb, "    }");
356     
357     ++x->idx;
358     return 1;
359
360
361 static void add_streams(apr_bucket_brigade *bb, h2_session *s, int last) 
362 {
363     stream_ctx_t x;
364     
365     x.bb = bb;
366     x.s = s;
367     x.idx = 0;
368     bbout(bb, "  \"streams\": {");
369     h2_mplx_stream_do(s->mplx, add_stream, &x);
370     bbout(bb, "\n  }%s\n", last? "" : ",");
371 }
372
373 static void add_push(apr_bucket_brigade *bb, h2_session *s, 
374                      h2_stream *stream, int last) 
375 {
376     h2_push_diary *diary;
377     apr_status_t status;
378     
379     bbout(bb, "    \"push\": {\n");
380     diary = s->push_diary;
381     if (diary) {
382         const char *data;
383         const char *base64_digest;
384         apr_size_t len;
385         
386         status = h2_push_diary_digest_get(diary, bb->p, 256, 
387                                           stream->request->authority, 
388                                           &data, &len);
389         if (status == APR_SUCCESS) {
390             base64_digest = h2_util_base64url_encode(data, len, bb->p);
391             bbout(bb, "      \"cacheDigest\": \"%s\",\n", base64_digest);
392         }
393     }
394     bbout(bb, "      \"promises\": %d,\n", s->pushes_promised);
395     bbout(bb, "      \"submits\": %d,\n", s->pushes_submitted);
396     bbout(bb, "      \"resets\": %d\n", s->pushes_reset);
397     bbout(bb, "    }%s\n", last? "" : ",");
398 }
399
400 static void add_in(apr_bucket_brigade *bb, h2_session *s, int last) 
401 {
402     bbout(bb, "    \"in\": {\n");
403     bbout(bb, "      \"requests\": %d,\n", s->remote.emitted_count);
404     bbout(bb, "      \"resets\": %d, \n", s->streams_reset);
405     bbout(bb, "      \"frames\": %ld,\n", (long)s->frames_received);
406     bbout(bb, "      \"octets\": %"APR_UINT64_T_FMT"\n", s->io.bytes_read);
407     bbout(bb, "    }%s\n", last? "" : ",");
408 }
409
410 static void add_out(apr_bucket_brigade *bb, h2_session *s, int last) 
411 {
412     bbout(bb, "    \"out\": {\n");
413     bbout(bb, "      \"responses\": %d,\n", s->responses_submitted);
414     bbout(bb, "      \"frames\": %ld,\n", (long)s->frames_sent);
415     bbout(bb, "      \"octets\": %"APR_UINT64_T_FMT"\n", s->io.bytes_written);
416     bbout(bb, "    }%s\n", last? "" : ",");
417 }
418
419 static void add_stats(apr_bucket_brigade *bb, h2_session *s, 
420                      h2_stream *stream, int last) 
421 {
422     bbout(bb, "  \"stats\": {\n");
423     add_in(bb, s, 0);
424     add_out(bb, s, 0);
425     add_push(bb, s, stream, 1);
426     bbout(bb, "  }%s\n", last? "" : ",");
427 }
428
429 static apr_status_t h2_status_insert(h2_task *task, apr_bucket *b)
430 {
431     h2_mplx *m = task->mplx;
432     h2_stream *stream = h2_mplx_stream_get(m, task->stream_id);
433     h2_session *s;
434     conn_rec *c;
435     
436     apr_bucket_brigade *bb;
437     apr_bucket *e;
438     int32_t connFlowIn, connFlowOut;
439     
440     if (!stream) {
441         /* stream already done */
442         return APR_SUCCESS;
443     }
444     s = stream->session;
445     c = s->c;
446     
447     bb = apr_brigade_create(stream->pool, c->bucket_alloc);
448     
449     connFlowIn = nghttp2_session_get_effective_local_window_size(s->ngh2); 
450     connFlowOut = nghttp2_session_get_remote_window_size(s->ngh2);
451      
452     bbout(bb, "{\n");
453     bbout(bb, "  \"version\": \"draft-01\",\n");
454     add_settings(bb, s, 0);
455     add_peer_settings(bb, s, 0);
456     bbout(bb, "  \"connFlowIn\": %d,\n", connFlowIn);
457     bbout(bb, "  \"connFlowOut\": %d,\n", connFlowOut);
458     bbout(bb, "  \"sentGoAway\": %d,\n", s->local.shutdown);
459
460     add_streams(bb, s, 0);
461     
462     add_stats(bb, s, stream, 1);
463     bbout(bb, "}\n");
464     
465     while ((e = APR_BRIGADE_FIRST(bb)) != APR_BRIGADE_SENTINEL(bb)) {
466         APR_BUCKET_REMOVE(e);
467         APR_BUCKET_INSERT_AFTER(b, e);
468         b = e;
469     }
470     apr_brigade_destroy(bb);
471     
472     return APR_SUCCESS;
473 }
474
475 static apr_status_t status_event(void *ctx, h2_bucket_event event, 
476                                  apr_bucket *b)
477 {
478     h2_task *task = ctx;
479     
480     ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, task->c->master, 
481                   "status_event(%s): %d", task->id, event);
482     switch (event) {
483         case H2_BUCKET_EV_BEFORE_MASTER_SEND:
484             h2_status_insert(task, b);
485             break;
486         default:
487             break;
488     }
489     return APR_SUCCESS;
490 }
491
492 int h2_filter_h2_status_handler(request_rec *r)
493 {
494     h2_ctx *ctx = h2_ctx_rget(r);
495     conn_rec *c = r->connection;
496     h2_task *task;
497     apr_bucket_brigade *bb;
498     apr_bucket *b;
499     apr_status_t status;
500     
501     if (strcmp(r->handler, "http2-status")) {
502         return DECLINED;
503     }
504     if (r->method_number != M_GET && r->method_number != M_POST) {
505         return DECLINED;
506     }
507
508     task = ctx? h2_ctx_get_task(ctx) : NULL;
509     if (task) {
510
511         if ((status = ap_discard_request_body(r)) != OK) {
512             return status;
513         }
514         
515         /* We need to handle the actual output on the main thread, as
516          * we need to access h2_session information. */
517         r->status = 200;
518         r->clength = -1;
519         r->chunked = 1;
520         apr_table_unset(r->headers_out, "Content-Length");
521         ap_set_content_type(r, "application/json");
522         apr_table_setn(r->notes, H2_FILTER_DEBUG_NOTE, "on");
523
524         bb = apr_brigade_create(r->pool, c->bucket_alloc);
525         b = h2_bucket_observer_create(c->bucket_alloc, status_event, task);
526         APR_BRIGADE_INSERT_TAIL(bb, b);
527         b = apr_bucket_eos_create(c->bucket_alloc);
528         APR_BRIGADE_INSERT_TAIL(bb, b);
529
530         ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r,
531                       "status_handler(%s): checking for incoming trailers", 
532                       task->id);
533         if (r->trailers_in && !apr_is_empty_table(r->trailers_in)) {
534             ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r,
535                           "status_handler(%s): seeing incoming trailers", 
536                           task->id);
537             apr_table_setn(r->trailers_out, "h2-trailers-in", 
538                            apr_itoa(r->pool, 1));
539         }
540         
541         status = ap_pass_brigade(r->output_filters, bb);
542         if (status == APR_SUCCESS
543             || r->status != HTTP_OK
544             || c->aborted) {
545             return OK;
546         }
547         else {
548             /* no way to know what type of error occurred */
549             ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
550                           "status_handler(%s): ap_pass_brigade failed", 
551                           task->id);
552             return AP_FILTER_ERROR;
553         }
554     }
555     return DECLINED;
556 }
557