]> granicus.if.org Git - apache/blobdiff - modules/http2/h2_filter.c
correct copyright/license headers
[apache] / modules / http2 / h2_filter.c
index ce94b52ed6e0ca229e0d932ca62b8ab863937793..23cfe68832b03706bca56ce32b1e6d2079d56e3b 100644 (file)
@@ -1,3 +1,19 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 /* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -18,6 +34,7 @@
 #include <apr_strings.h>
 #include <httpd.h>
 #include <http_core.h>
+#include <http_protocol.h>
 #include <http_log.h>
 #include <http_connection.h>
 #include <scoreboard.h>
@@ -32,7 +49,7 @@
 #include "h2_task.h"
 #include "h2_stream.h"
 #include "h2_request.h"
-#include "h2_response.h"
+#include "h2_headers.h"
 #include "h2_stream.h"
 #include "h2_session.h"
 #include "h2_util.h"
 #define UNSET       -1
 #define H2MIN(x,y) ((x) < (y) ? (x) : (y))
 
-static apr_status_t consume_brigade(h2_filter_cin *cin, 
-                                    apr_bucket_brigade *bb, 
-                                    apr_read_type_e block)
+static apr_status_t recv_RAW_DATA(conn_rec *c, h2_filter_cin *cin, 
+                                  apr_bucket *b, apr_read_type_e block)
 {
+    h2_session *session = cin->session;
     apr_status_t status = APR_SUCCESS;
-    apr_size_t readlen = 0;
+    apr_size_t len;
+    const char *data;
+    ssize_t n;
     
-    while (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) {
+    status = apr_bucket_read(b, &data, &len, block);
+    
+    while (status == APR_SUCCESS && len > 0) {
+        n = nghttp2_session_mem_recv(session->ngh2, (const uint8_t *)data, len);
         
-        apr_bucket* bucket = APR_BRIGADE_FIRST(bb);
-        if (APR_BUCKET_IS_METADATA(bucket)) {
-            /* we do nothing regarding any meta here */
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+                      H2_SSSN_MSG(session, "fed %ld bytes to nghttp2, %ld read"),
+                      (long)len, (long)n);
+        if (n < 0) {
+            if (nghttp2_is_fatal((int)n)) {
+                h2_session_event(session, H2_SESSION_EV_PROTO_ERROR, 
+                                 (int)n, nghttp2_strerror((int)n));
+                status = APR_EGENERAL;
+            }
         }
         else {
-            const char *bucket_data = NULL;
-            apr_size_t bucket_length = 0;
-            status = apr_bucket_read(bucket, &bucket_data,
-                                     &bucket_length, block);
-            
-            if (status == APR_SUCCESS && bucket_length > 0) {
-                apr_size_t consumed = 0;
-
-                status = cin->cb(cin->cb_ctx, bucket_data, bucket_length, &consumed);
-                if (status == APR_SUCCESS && bucket_length > consumed) {
-                    /* We have data left in the bucket. Split it. */
-                    status = apr_bucket_split(bucket, consumed);
-                }
-                readlen += consumed;
-                cin->start_read = apr_time_now();
+            session->io.bytes_read += n;
+            if (len <= n) {
+                break;
             }
+            len -= n;
+            data += n;
+        }
+    }
+    
+    return status;
+}
+
+static apr_status_t recv_RAW_brigade(conn_rec *c, h2_filter_cin *cin, 
+                                     apr_bucket_brigade *bb, 
+                                     apr_read_type_e block)
+{
+    apr_status_t status = APR_SUCCESS;
+    apr_bucket* b;
+    int consumed = 0;
+    
+    h2_util_bb_log(c, c->id, APLOG_TRACE2, "RAW_in", bb);
+    while (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) {
+        b = APR_BRIGADE_FIRST(bb);
+
+        if (APR_BUCKET_IS_METADATA(b)) {
+            /* nop */
         }
-        apr_bucket_delete(bucket);
+        else {
+            status = recv_RAW_DATA(c, cin, b, block);
+        }
+        consumed = 1;
+        apr_bucket_delete(b);
     }
     
-    if (readlen == 0 && status == APR_SUCCESS && block == APR_NONBLOCK_READ) {
+    if (!consumed && status == APR_SUCCESS && block == APR_NONBLOCK_READ) {
         return APR_EAGAIN;
     }
     return status;
 }
 
-h2_filter_cin *h2_filter_cin_create(apr_pool_t *p, h2_filter_cin_cb *cb, void *ctx)
+h2_filter_cin *h2_filter_cin_create(h2_session *session)
 {
     h2_filter_cin *cin;
     
-    cin = apr_pcalloc(p, sizeof(*cin));
-    cin->pool      = p;
-    cin->cb        = cb;
-    cin->cb_ctx    = ctx;
-    cin->start_read = UNSET;
+    cin = apr_pcalloc(session->pool, sizeof(*cin));
+    if (!cin) {
+        return NULL;
+    }
+    cin->session = session;
     return cin;
 }
 
@@ -109,11 +151,14 @@ apr_status_t h2_filter_core_input(ap_filter_t* f,
     h2_filter_cin *cin = f->ctx;
     apr_status_t status = APR_SUCCESS;
     apr_interval_time_t saved_timeout = UNSET;
+    const int trace1 = APLOGctrace1(f->c);
     
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
-                  "core_input(%ld): read, %s, mode=%d, readbytes=%ld", 
-                  (long)f->c->id, (block == APR_BLOCK_READ)? "BLOCK_READ" : "NONBLOCK_READ", 
-                  mode, (long)readbytes);
+    if (trace1) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
+                      "h2_session(%ld): read, %s, mode=%d, readbytes=%ld", 
+                      (long)f->c->id, (block == APR_BLOCK_READ)? 
+                      "BLOCK_READ" : "NONBLOCK_READ", mode, (long)readbytes);
+    }
     
     if (mode == AP_MODE_INIT || mode == AP_MODE_SPECULATIVE) {
         return ap_get_brigade(f->next, brigade, mode, block, readbytes);
@@ -124,20 +169,16 @@ apr_status_t h2_filter_core_input(ap_filter_t* f,
     }
     
     if (!cin->bb) {
-        cin->bb = apr_brigade_create(cin->pool, f->c->bucket_alloc);
+        cin->bb = apr_brigade_create(cin->session->pool, f->c->bucket_alloc);
     }
 
     if (!cin->socket) {
         cin->socket = ap_get_conn_socket(f->c);
     }
     
-    cin->start_read = apr_time_now();
     if (APR_BRIGADE_EMPTY(cin->bb)) {
         /* We only do a blocking read when we have no streams to process. So,
          * in httpd scoreboard lingo, we are in a KEEPALIVE connection state.
-         * When reading non-blocking, we do have streams to process and update
-         * child with NULL request. That way, any current request information
-         * in the scoreboard is preserved.
          */
         if (block == APR_BLOCK_READ) {
             if (cin->timeout > 0) {
@@ -154,17 +195,19 @@ apr_status_t h2_filter_core_input(ap_filter_t* f,
     
     switch (status) {
         case APR_SUCCESS:
-            status = consume_brigade(cin, cin->bb, block);
+            status = recv_RAW_brigade(f->c, cin, cin->bb, block);
             break;
         case APR_EOF:
         case APR_EAGAIN:
         case APR_TIMEUP:
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
-                          "core_input(%ld): read", (long)f->c->id);
+            if (trace1) {
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
+                              "h2_session(%ld): read", f->c->id);
+            }
             break;
         default:
             ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, f->c, APLOGNO(03046)
-                          "h2_conn_io: error reading");
+                          "h2_session(%ld): error reading", f->c->id);
             break;
     }
     return status;
@@ -174,30 +217,92 @@ apr_status_t h2_filter_core_input(ap_filter_t* f,
  * http2 connection status handler + stream out source
  ******************************************************************************/
 
-static const char *H2_SOS_H2_STATUS = "http2-status";
+typedef struct {
+    apr_bucket_refcount refcount;
+    h2_bucket_event_cb *cb;
+    void *ctx;
+} h2_bucket_observer;
+static apr_status_t bucket_read(apr_bucket *b, const char **str,
+                                apr_size_t *len, apr_read_type_e block)
+{
+    (void)b;
+    (void)block;
+    *str = NULL;
+    *len = 0;
+    return APR_SUCCESS;
+}
 
-int h2_filter_h2_status_handler(request_rec *r)
+static void bucket_destroy(void *data)
 {
-    h2_ctx *ctx = h2_ctx_rget(r);
-    h2_task *task;
-    
-    if (strcmp(r->handler, "http2-status")) {
-        return DECLINED;
+    h2_bucket_observer *h = data;
+    if (apr_bucket_shared_destroy(h)) {
+        if (h->cb) {
+            h->cb(h->ctx, H2_BUCKET_EV_BEFORE_DESTROY, NULL);
+        }
+        apr_bucket_free(h);
     }
-    if (r->method_number != M_GET) {
-        return DECLINED;
+}
+
+apr_bucket * h2_bucket_observer_make(apr_bucket *b, h2_bucket_event_cb *cb,
+                                 void *ctx)
+{
+    h2_bucket_observer *br;
+
+    br = apr_bucket_alloc(sizeof(*br), b->list);
+    br->cb = cb;
+    br->ctx = ctx;
+
+    b = apr_bucket_shared_make(b, br, 0, 0);
+    b->type = &h2_bucket_type_observer;
+    return b;
+} 
+
+apr_bucket * h2_bucket_observer_create(apr_bucket_alloc_t *list, 
+                                       h2_bucket_event_cb *cb, void *ctx)
+{
+    apr_bucket *b = apr_bucket_alloc(sizeof(*b), list);
+
+    APR_BUCKET_INIT(b);
+    b->free = apr_bucket_free;
+    b->list = list;
+    b = h2_bucket_observer_make(b, cb, ctx);
+    return b;
+}
+                                       
+apr_status_t h2_bucket_observer_fire(apr_bucket *b, h2_bucket_event event)
+{
+    if (H2_BUCKET_IS_OBSERVER(b)) {
+        h2_bucket_observer *l = (h2_bucket_observer *)b->data; 
+        return l->cb(l->ctx, event, b);
     }
+    return APR_EINVAL;
+}
 
-    task = ctx? h2_ctx_get_task(ctx) : NULL;
-    if (task) {
-        /* We need to handle the actual output on the main thread, as
-         * we need to access h2_session information. */
-        apr_table_setn(r->notes, H2_RESP_SOS_NOTE, H2_SOS_H2_STATUS);
-        apr_table_setn(r->headers_out, "Content-Type", "application/json");
-        r->status = 200;
-        return DONE;
+const apr_bucket_type_t h2_bucket_type_observer = {
+    "H2OBS", 5, APR_BUCKET_METADATA,
+    bucket_destroy,
+    bucket_read,
+    apr_bucket_setaside_noop,
+    apr_bucket_split_notimpl,
+    apr_bucket_shared_copy
+};
+
+apr_bucket *h2_bucket_observer_beam(struct h2_bucket_beam *beam,
+                                    apr_bucket_brigade *dest,
+                                    const apr_bucket *src)
+{
+    if (H2_BUCKET_IS_OBSERVER(src)) {
+        h2_bucket_observer *l = (h2_bucket_observer *)src->data; 
+        apr_bucket *b = h2_bucket_observer_create(dest->bucket_alloc, 
+                                                  l->cb, l->ctx);
+        APR_BRIGADE_INSERT_TAIL(dest, b);
+        l->cb = NULL;
+        l->ctx = NULL;
+        h2_bucket_observer_fire(b, H2_BUCKET_EV_BEFORE_MASTER_SEND);
+        return b;
     }
-    return DECLINED;
+    return NULL;
 }
 
 static apr_status_t bbout(apr_bucket_brigade *bb, const char *fmt, ...)
@@ -337,31 +442,28 @@ static void add_stats(apr_bucket_brigade *bb, h2_session *s,
     bbout(bb, "  }%s\n", last? "" : ",");
 }
 
-static apr_status_t h2_status_stream_filter(h2_stream *stream)
+static apr_status_t h2_status_insert(h2_task *task, apr_bucket *b)
 {
-    h2_session *s = stream->session;
-    conn_rec *c = s->c;
+    h2_mplx *m = task->mplx;
+    h2_stream *stream = h2_mplx_stream_get(m, task->stream_id);
+    h2_session *s;
+    conn_rec *c;
+    
     apr_bucket_brigade *bb;
+    apr_bucket *e;
     int32_t connFlowIn, connFlowOut;
     
-    if (!stream->response) {
-        return APR_EINVAL;
-    }
-    
-    if (!stream->buffer) {
-        stream->buffer = apr_brigade_create(stream->pool, c->bucket_alloc);
+    if (!stream) {
+        /* stream already done */
+        return APR_SUCCESS;
     }
-    bb = stream->buffer;
+    s = stream->session;
+    c = s->c;
     
-    apr_table_unset(stream->response->headers, "Content-Length");
-    stream->response->content_length = -1;
+    bb = apr_brigade_create(stream->pool, c->bucket_alloc);
     
     connFlowIn = nghttp2_session_get_effective_local_window_size(s->ngh2); 
     connFlowOut = nghttp2_session_get_remote_window_size(s->ngh2);
-    apr_table_setn(stream->response->headers, "conn-flow-in", 
-                   apr_itoa(stream->pool, connFlowIn));
-    apr_table_setn(stream->response->headers, "conn-flow-out", 
-                   apr_itoa(stream->pool, connFlowOut));
      
     bbout(bb, "{\n");
     bbout(bb, "  \"version\": \"draft-01\",\n");
@@ -376,15 +478,96 @@ static apr_status_t h2_status_stream_filter(h2_stream *stream)
     add_stats(bb, s, stream, 1);
     bbout(bb, "}\n");
     
+    while ((e = APR_BRIGADE_FIRST(bb)) != APR_BRIGADE_SENTINEL(bb)) {
+        APR_BUCKET_REMOVE(e);
+        APR_BUCKET_INSERT_AFTER(b, e);
+        b = e;
+    }
+    apr_brigade_destroy(bb);
+    
     return APR_SUCCESS;
 }
 
-apr_status_t h2_stream_filter(h2_stream *stream)
+static apr_status_t status_event(void *ctx, h2_bucket_event event, 
+                                 apr_bucket *b)
 {
-    const char *fname = stream->response? stream->response->sos_filter : NULL; 
-    if (fname && !strcmp(H2_SOS_H2_STATUS, fname)) {
-        return h2_status_stream_filter(stream);
+    h2_task *task = ctx;
+    
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, task->c->master, 
+                  "status_event(%s): %d", task->id, event);
+    switch (event) {
+        case H2_BUCKET_EV_BEFORE_MASTER_SEND:
+            h2_status_insert(task, b);
+            break;
+        default:
+            break;
     }
     return APR_SUCCESS;
 }
 
+int h2_filter_h2_status_handler(request_rec *r)
+{
+    h2_ctx *ctx = h2_ctx_rget(r);
+    conn_rec *c = r->connection;
+    h2_task *task;
+    apr_bucket_brigade *bb;
+    apr_bucket *b;
+    apr_status_t status;
+    
+    if (strcmp(r->handler, "http2-status")) {
+        return DECLINED;
+    }
+    if (r->method_number != M_GET && r->method_number != M_POST) {
+        return DECLINED;
+    }
+
+    task = ctx? h2_ctx_get_task(ctx) : NULL;
+    if (task) {
+
+        if ((status = ap_discard_request_body(r)) != OK) {
+            return status;
+        }
+        
+        /* We need to handle the actual output on the main thread, as
+         * we need to access h2_session information. */
+        r->status = 200;
+        r->clength = -1;
+        r->chunked = 1;
+        apr_table_unset(r->headers_out, "Content-Length");
+        ap_set_content_type(r, "application/json");
+        apr_table_setn(r->notes, H2_FILTER_DEBUG_NOTE, "on");
+
+        bb = apr_brigade_create(r->pool, c->bucket_alloc);
+        b = h2_bucket_observer_create(c->bucket_alloc, status_event, task);
+        APR_BRIGADE_INSERT_TAIL(bb, b);
+        b = apr_bucket_eos_create(c->bucket_alloc);
+        APR_BRIGADE_INSERT_TAIL(bb, b);
+
+        ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r,
+                      "status_handler(%s): checking for incoming trailers", 
+                      task->id);
+        if (r->trailers_in && !apr_is_empty_table(r->trailers_in)) {
+            ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r,
+                          "status_handler(%s): seeing incoming trailers", 
+                          task->id);
+            apr_table_setn(r->trailers_out, "h2-trailers-in", 
+                           apr_itoa(r->pool, 1));
+        }
+        
+        status = ap_pass_brigade(r->output_filters, bb);
+        if (status == APR_SUCCESS
+            || r->status != HTTP_OK
+            || c->aborted) {
+            return OK;
+        }
+        else {
+            /* no way to know what type of error occurred */
+            ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
+                          "status_handler(%s): ap_pass_brigade failed", 
+                          task->id);
+            return AP_FILTER_ERROR;
+        }
+    }
+    return DECLINED;
+}
+