+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
*
* Licensed under the Apache License, Version 2.0 (the "License");
#include <apr_strings.h>
#include <httpd.h>
#include <http_core.h>
+#include <http_protocol.h>
#include <http_log.h>
#include <http_connection.h>
#include <scoreboard.h>
#include "h2_task.h"
#include "h2_stream.h"
#include "h2_request.h"
-#include "h2_response.h"
+#include "h2_headers.h"
#include "h2_stream.h"
#include "h2_session.h"
#include "h2_util.h"
#define UNSET -1
#define H2MIN(x,y) ((x) < (y) ? (x) : (y))
-static apr_status_t consume_brigade(h2_filter_cin *cin,
- apr_bucket_brigade *bb,
- apr_read_type_e block)
+static apr_status_t recv_RAW_DATA(conn_rec *c, h2_filter_cin *cin,
+ apr_bucket *b, apr_read_type_e block)
{
+ h2_session *session = cin->session;
apr_status_t status = APR_SUCCESS;
- apr_size_t readlen = 0;
+ apr_size_t len;
+ const char *data;
+ ssize_t n;
- while (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) {
+ status = apr_bucket_read(b, &data, &len, block);
+
+ while (status == APR_SUCCESS && len > 0) {
+ n = nghttp2_session_mem_recv(session->ngh2, (const uint8_t *)data, len);
- apr_bucket* bucket = APR_BRIGADE_FIRST(bb);
- if (APR_BUCKET_IS_METADATA(bucket)) {
- /* we do nothing regarding any meta here */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ H2_SSSN_MSG(session, "fed %ld bytes to nghttp2, %ld read"),
+ (long)len, (long)n);
+ if (n < 0) {
+ if (nghttp2_is_fatal((int)n)) {
+ h2_session_event(session, H2_SESSION_EV_PROTO_ERROR,
+ (int)n, nghttp2_strerror((int)n));
+ status = APR_EGENERAL;
+ }
}
else {
- const char *bucket_data = NULL;
- apr_size_t bucket_length = 0;
- status = apr_bucket_read(bucket, &bucket_data,
- &bucket_length, block);
-
- if (status == APR_SUCCESS && bucket_length > 0) {
- apr_size_t consumed = 0;
-
- status = cin->cb(cin->cb_ctx, bucket_data, bucket_length, &consumed);
- if (status == APR_SUCCESS && bucket_length > consumed) {
- /* We have data left in the bucket. Split it. */
- status = apr_bucket_split(bucket, consumed);
- }
- readlen += consumed;
- cin->start_read = apr_time_now();
+ session->io.bytes_read += n;
+ if (len <= n) {
+ break;
}
+ len -= n;
+ data += n;
+ }
+ }
+
+ return status;
+}
+
+static apr_status_t recv_RAW_brigade(conn_rec *c, h2_filter_cin *cin,
+ apr_bucket_brigade *bb,
+ apr_read_type_e block)
+{
+ apr_status_t status = APR_SUCCESS;
+ apr_bucket* b;
+ int consumed = 0;
+
+ h2_util_bb_log(c, c->id, APLOG_TRACE2, "RAW_in", bb);
+ while (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) {
+ b = APR_BRIGADE_FIRST(bb);
+
+ if (APR_BUCKET_IS_METADATA(b)) {
+ /* nop */
}
- apr_bucket_delete(bucket);
+ else {
+ status = recv_RAW_DATA(c, cin, b, block);
+ }
+ consumed = 1;
+ apr_bucket_delete(b);
}
- if (readlen == 0 && status == APR_SUCCESS && block == APR_NONBLOCK_READ) {
+ if (!consumed && status == APR_SUCCESS && block == APR_NONBLOCK_READ) {
return APR_EAGAIN;
}
return status;
}
-h2_filter_cin *h2_filter_cin_create(apr_pool_t *p, h2_filter_cin_cb *cb, void *ctx)
+h2_filter_cin *h2_filter_cin_create(h2_session *session)
{
h2_filter_cin *cin;
- cin = apr_pcalloc(p, sizeof(*cin));
- cin->pool = p;
- cin->cb = cb;
- cin->cb_ctx = ctx;
- cin->start_read = UNSET;
+ cin = apr_pcalloc(session->pool, sizeof(*cin));
+ if (!cin) {
+ return NULL;
+ }
+ cin->session = session;
return cin;
}
h2_filter_cin *cin = f->ctx;
apr_status_t status = APR_SUCCESS;
apr_interval_time_t saved_timeout = UNSET;
+ const int trace1 = APLOGctrace1(f->c);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
- "core_input(%ld): read, %s, mode=%d, readbytes=%ld",
- (long)f->c->id, (block == APR_BLOCK_READ)? "BLOCK_READ" : "NONBLOCK_READ",
- mode, (long)readbytes);
+ if (trace1) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
+ "h2_session(%ld): read, %s, mode=%d, readbytes=%ld",
+ (long)f->c->id, (block == APR_BLOCK_READ)?
+ "BLOCK_READ" : "NONBLOCK_READ", mode, (long)readbytes);
+ }
if (mode == AP_MODE_INIT || mode == AP_MODE_SPECULATIVE) {
return ap_get_brigade(f->next, brigade, mode, block, readbytes);
}
if (!cin->bb) {
- cin->bb = apr_brigade_create(cin->pool, f->c->bucket_alloc);
+ cin->bb = apr_brigade_create(cin->session->pool, f->c->bucket_alloc);
}
if (!cin->socket) {
cin->socket = ap_get_conn_socket(f->c);
}
- cin->start_read = apr_time_now();
if (APR_BRIGADE_EMPTY(cin->bb)) {
/* We only do a blocking read when we have no streams to process. So,
* in httpd scoreboard lingo, we are in a KEEPALIVE connection state.
- * When reading non-blocking, we do have streams to process and update
- * child with NULL request. That way, any current request information
- * in the scoreboard is preserved.
*/
if (block == APR_BLOCK_READ) {
if (cin->timeout > 0) {
switch (status) {
case APR_SUCCESS:
- status = consume_brigade(cin, cin->bb, block);
+ status = recv_RAW_brigade(f->c, cin, cin->bb, block);
break;
case APR_EOF:
case APR_EAGAIN:
case APR_TIMEUP:
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
- "core_input(%ld): read", (long)f->c->id);
+ if (trace1) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
+ "h2_session(%ld): read", f->c->id);
+ }
break;
default:
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, f->c, APLOGNO(03046)
- "h2_conn_io: error reading");
+ "h2_session(%ld): error reading", f->c->id);
break;
}
return status;
* http2 connection status handler + stream out source
******************************************************************************/
-static const char *H2_SOS_H2_STATUS = "http2-status";
+typedef struct {
+ apr_bucket_refcount refcount;
+ h2_bucket_event_cb *cb;
+ void *ctx;
+} h2_bucket_observer;
+
+static apr_status_t bucket_read(apr_bucket *b, const char **str,
+ apr_size_t *len, apr_read_type_e block)
+{
+ (void)b;
+ (void)block;
+ *str = NULL;
+ *len = 0;
+ return APR_SUCCESS;
+}
-int h2_filter_h2_status_handler(request_rec *r)
+static void bucket_destroy(void *data)
{
- h2_ctx *ctx = h2_ctx_rget(r);
- h2_task *task;
-
- if (strcmp(r->handler, "http2-status")) {
- return DECLINED;
+ h2_bucket_observer *h = data;
+ if (apr_bucket_shared_destroy(h)) {
+ if (h->cb) {
+ h->cb(h->ctx, H2_BUCKET_EV_BEFORE_DESTROY, NULL);
+ }
+ apr_bucket_free(h);
}
- if (r->method_number != M_GET) {
- return DECLINED;
+}
+
+apr_bucket * h2_bucket_observer_make(apr_bucket *b, h2_bucket_event_cb *cb,
+ void *ctx)
+{
+ h2_bucket_observer *br;
+
+ br = apr_bucket_alloc(sizeof(*br), b->list);
+ br->cb = cb;
+ br->ctx = ctx;
+
+ b = apr_bucket_shared_make(b, br, 0, 0);
+ b->type = &h2_bucket_type_observer;
+ return b;
+}
+
+apr_bucket * h2_bucket_observer_create(apr_bucket_alloc_t *list,
+ h2_bucket_event_cb *cb, void *ctx)
+{
+ apr_bucket *b = apr_bucket_alloc(sizeof(*b), list);
+
+ APR_BUCKET_INIT(b);
+ b->free = apr_bucket_free;
+ b->list = list;
+ b = h2_bucket_observer_make(b, cb, ctx);
+ return b;
+}
+
+apr_status_t h2_bucket_observer_fire(apr_bucket *b, h2_bucket_event event)
+{
+ if (H2_BUCKET_IS_OBSERVER(b)) {
+ h2_bucket_observer *l = (h2_bucket_observer *)b->data;
+ return l->cb(l->ctx, event, b);
}
+ return APR_EINVAL;
+}
- task = ctx? h2_ctx_get_task(ctx) : NULL;
- if (task) {
- /* We need to handle the actual output on the main thread, as
- * we need to access h2_session information. */
- apr_table_setn(r->notes, H2_RESP_SOS_NOTE, H2_SOS_H2_STATUS);
- apr_table_setn(r->headers_out, "Content-Type", "application/json");
- r->status = 200;
- return DONE;
+const apr_bucket_type_t h2_bucket_type_observer = {
+ "H2OBS", 5, APR_BUCKET_METADATA,
+ bucket_destroy,
+ bucket_read,
+ apr_bucket_setaside_noop,
+ apr_bucket_split_notimpl,
+ apr_bucket_shared_copy
+};
+
+apr_bucket *h2_bucket_observer_beam(struct h2_bucket_beam *beam,
+ apr_bucket_brigade *dest,
+ const apr_bucket *src)
+{
+ if (H2_BUCKET_IS_OBSERVER(src)) {
+ h2_bucket_observer *l = (h2_bucket_observer *)src->data;
+ apr_bucket *b = h2_bucket_observer_create(dest->bucket_alloc,
+ l->cb, l->ctx);
+ APR_BRIGADE_INSERT_TAIL(dest, b);
+ l->cb = NULL;
+ l->ctx = NULL;
+ h2_bucket_observer_fire(b, H2_BUCKET_EV_BEFORE_MASTER_SEND);
+ return b;
}
- return DECLINED;
+ return NULL;
}
static apr_status_t bbout(apr_bucket_brigade *bb, const char *fmt, ...)
bbout(bb, " }%s\n", last? "" : ",");
}
-static apr_status_t h2_status_stream_filter(h2_stream *stream)
+static apr_status_t h2_status_insert(h2_task *task, apr_bucket *b)
{
- h2_session *s = stream->session;
- conn_rec *c = s->c;
+ h2_mplx *m = task->mplx;
+ h2_stream *stream = h2_mplx_stream_get(m, task->stream_id);
+ h2_session *s;
+ conn_rec *c;
+
apr_bucket_brigade *bb;
+ apr_bucket *e;
int32_t connFlowIn, connFlowOut;
- if (!stream->response) {
- return APR_EINVAL;
- }
-
- if (!stream->buffer) {
- stream->buffer = apr_brigade_create(stream->pool, c->bucket_alloc);
+ if (!stream) {
+ /* stream already done */
+ return APR_SUCCESS;
}
- bb = stream->buffer;
+ s = stream->session;
+ c = s->c;
- apr_table_unset(stream->response->headers, "Content-Length");
- stream->response->content_length = -1;
+ bb = apr_brigade_create(stream->pool, c->bucket_alloc);
connFlowIn = nghttp2_session_get_effective_local_window_size(s->ngh2);
connFlowOut = nghttp2_session_get_remote_window_size(s->ngh2);
- apr_table_setn(stream->response->headers, "conn-flow-in",
- apr_itoa(stream->pool, connFlowIn));
- apr_table_setn(stream->response->headers, "conn-flow-out",
- apr_itoa(stream->pool, connFlowOut));
bbout(bb, "{\n");
bbout(bb, " \"version\": \"draft-01\",\n");
add_stats(bb, s, stream, 1);
bbout(bb, "}\n");
+ while ((e = APR_BRIGADE_FIRST(bb)) != APR_BRIGADE_SENTINEL(bb)) {
+ APR_BUCKET_REMOVE(e);
+ APR_BUCKET_INSERT_AFTER(b, e);
+ b = e;
+ }
+ apr_brigade_destroy(bb);
+
return APR_SUCCESS;
}
-apr_status_t h2_stream_filter(h2_stream *stream)
+static apr_status_t status_event(void *ctx, h2_bucket_event event,
+ apr_bucket *b)
{
- const char *fname = stream->response? stream->response->sos_filter : NULL;
- if (fname && !strcmp(H2_SOS_H2_STATUS, fname)) {
- return h2_status_stream_filter(stream);
+ h2_task *task = ctx;
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, task->c->master,
+ "status_event(%s): %d", task->id, event);
+ switch (event) {
+ case H2_BUCKET_EV_BEFORE_MASTER_SEND:
+ h2_status_insert(task, b);
+ break;
+ default:
+ break;
}
return APR_SUCCESS;
}
+int h2_filter_h2_status_handler(request_rec *r)
+{
+ h2_ctx *ctx = h2_ctx_rget(r);
+ conn_rec *c = r->connection;
+ h2_task *task;
+ apr_bucket_brigade *bb;
+ apr_bucket *b;
+ apr_status_t status;
+
+ if (strcmp(r->handler, "http2-status")) {
+ return DECLINED;
+ }
+ if (r->method_number != M_GET && r->method_number != M_POST) {
+ return DECLINED;
+ }
+
+ task = ctx? h2_ctx_get_task(ctx) : NULL;
+ if (task) {
+
+ if ((status = ap_discard_request_body(r)) != OK) {
+ return status;
+ }
+
+ /* We need to handle the actual output on the main thread, as
+ * we need to access h2_session information. */
+ r->status = 200;
+ r->clength = -1;
+ r->chunked = 1;
+ apr_table_unset(r->headers_out, "Content-Length");
+ ap_set_content_type(r, "application/json");
+ apr_table_setn(r->notes, H2_FILTER_DEBUG_NOTE, "on");
+
+ bb = apr_brigade_create(r->pool, c->bucket_alloc);
+ b = h2_bucket_observer_create(c->bucket_alloc, status_event, task);
+ APR_BRIGADE_INSERT_TAIL(bb, b);
+ b = apr_bucket_eos_create(c->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(bb, b);
+
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r,
+ "status_handler(%s): checking for incoming trailers",
+ task->id);
+ if (r->trailers_in && !apr_is_empty_table(r->trailers_in)) {
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r,
+ "status_handler(%s): seeing incoming trailers",
+ task->id);
+ apr_table_setn(r->trailers_out, "h2-trailers-in",
+ apr_itoa(r->pool, 1));
+ }
+
+ status = ap_pass_brigade(r->output_filters, bb);
+ if (status == APR_SUCCESS
+ || r->status != HTTP_OK
+ || c->aborted) {
+ return OK;
+ }
+ else {
+ /* no way to know what type of error occurred */
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
+ "status_handler(%s): ap_pass_brigade failed",
+ task->id);
+ return AP_FILTER_ERROR;
+ }
+ }
+ return DECLINED;
+}
+