#include <http_core.h>
#include <http_config.h>
#include <http_log.h>
+#include <scoreboard.h>
#include "h2_private.h"
#include "h2_bucket_eoc.h"
#include "h2_bucket_eos.h"
#include "h2_config.h"
+#include "h2_ctx.h"
+#include "h2_filter.h"
#include "h2_h2.h"
#include "h2_mplx.h"
#include "h2_push.h"
#include "h2_version.h"
#include "h2_workers.h"
+
static int frame_print(const nghttp2_frame *frame, char *buffer, size_t maxlen);
static int h2_session_status_from_apr_status(apr_status_t rv)
return NGHTTP2_ERR_PROTO;
}
+static void update_window(void *ctx, int stream_id, apr_off_t bytes_read)
+{
+ h2_session *session = (h2_session*)ctx;
+ nghttp2_session_consume(session->ngh2, stream_id, bytes_read);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ "h2_session(%ld-%d): consumed %ld bytes",
+ session->id, stream_id, (long)bytes_read);
+}
+
+static apr_status_t h2_session_receive(void *ctx,
+ const char *data, apr_size_t len,
+ apr_size_t *readlen);
+
+static int is_accepting_streams(h2_session *session);
+static void dispatch_event(h2_session *session, h2_session_event_t ev,
+ int err, const char *msg);
+
h2_stream *h2_session_open_stream(h2_session *session, int stream_id)
{
h2_stream * stream;
apr_pool_t *stream_pool;
- if (session->aborted) {
- return NULL;
- }
if (session->spare) {
stream_pool = session->spare;
h2_stream_set_add(session->streams, stream);
if (H2_STREAM_CLIENT_INITIATED(stream_id)
&& stream_id > session->max_stream_received) {
+ ++session->requests_received;
session->max_stream_received = stream->id;
}
return stream;
}
-static apr_status_t h2_session_flush(h2_session *session)
-{
- session->flush = 0;
- return h2_conn_io_flush(&session->io);
-}
+#ifdef H2_NG2_STREAM_API
/**
* Determine the importance of streams when scheduling tasks.
return spri_cmp(sid1, s1, sid2, s2, session);
}
-static apr_status_t stream_end_headers(h2_session *session,
- h2_stream *stream, int eos)
+#else /* ifdef H2_NG2_STREAM_API */
+
+/* In absence of nghttp2_stream API, which gives information about
+ * priorities since nghttp2 1.3.x, we just sort the streams by
+ * their identifier, aka. order of arrival.
+ */
+static int stream_pri_cmp(int sid1, int sid2, void *ctx)
+{
+ (void)ctx;
+ return sid1 - sid2;
+}
+
+#endif /* (ifdef else) H2_NG2_STREAM_API */
+
+static apr_status_t stream_schedule(h2_session *session,
+ h2_stream *stream, int eos)
{
(void)session;
- return h2_stream_schedule(stream, eos, stream_pri_cmp, session);
+ return h2_stream_schedule(stream, eos, h2_session_push_enabled(session),
+ stream_pri_cmp, session);
}
/*
h2_session *session = (h2_session *)userp;
(void)ngh2;
- if (session->aborted) {
- return NGHTTP2_ERR_CALLBACK_FAILURE;
- }
- if (APLOGctrace2(session->c)) {
+ if (APLOGcdebug(session->c)) {
char buffer[256];
frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
- "h2_session: callback on_invalid_frame_recv error=%d %s",
- error, buffer);
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+ "h2_session(%ld): recv unknown FRAME[%s], frames=%ld/%ld (r/s)",
+ session->id, buffer, (long)session->frames_received,
+ (long)session->frames_sent);
}
return 0;
}
int32_t stream_id,
const uint8_t *data, size_t len, void *userp)
{
- int rv;
h2_session *session = (h2_session *)userp;
+ apr_status_t status = APR_SUCCESS;
h2_stream * stream;
- apr_status_t status;
+ int rv;
(void)flags;
- if (session->aborted) {
- return NGHTTP2_ERR_CALLBACK_FAILURE;
+ if (!is_accepting_streams(session)) {
+ /* ignore */
+ return 0;
}
+
stream = h2_session_get_stream(session, stream_id);
if (!stream) {
- ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c,
- APLOGNO(02919)
- "h2_session: stream(%ld-%d): on_data_chunk for unknown stream",
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+ "h2_stream(%ld-%d): on_data_chunk for unknown stream",
session->id, (int)stream_id);
rv = nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE, stream_id,
NGHTTP2_INTERNAL_ERROR);
status = h2_stream_write_data(stream, (const char *)data, len);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
- "h2_stream(%ld-%d): written DATA, length %d",
- session->id, stream_id, (int)len);
+ "h2_stream(%ld-%d): data_chunk_recv, written %ld bytes",
+ session->id, stream_id, (long)len);
if (status != APR_SUCCESS) {
+ update_window(session, stream_id, len);
rv = nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE, stream_id,
H2_STREAM_RST(stream, H2_ERR_INTERNAL_ERROR));
if (nghttp2_is_fatal(rv)) {
return 0;
}
-static int before_frame_send_cb(nghttp2_session *ngh2,
- const nghttp2_frame *frame,
- void *userp)
-{
- h2_session *session = (h2_session *)userp;
- (void)ngh2;
-
- if (session->aborted) {
- return NGHTTP2_ERR_CALLBACK_FAILURE;
- }
- /* Set the need to flush output when we have added one of the
- * following frame types */
- switch (frame->hd.type) {
- case NGHTTP2_RST_STREAM:
- case NGHTTP2_PUSH_PROMISE:
- case NGHTTP2_GOAWAY:
- session->flush = 1;
- break;
- default:
- break;
-
- }
- if (APLOGctrace2(session->c)) {
- char buffer[256];
- frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
- "h2_session(%ld): before_frame_send %s",
- session->id, buffer);
- }
- return 0;
-}
-
-static int on_frame_send_cb(nghttp2_session *ngh2,
- const nghttp2_frame *frame,
- void *userp)
-{
- h2_session *session = (h2_session *)userp;
- (void)ngh2;
- if (APLOGctrace2(session->c)) {
- char buffer[256];
- frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
- "h2_session(%ld): on_frame_send %s",
- session->id, buffer);
- }
- return 0;
-}
-
-static int on_frame_not_send_cb(nghttp2_session *ngh2,
- const nghttp2_frame *frame,
- int lib_error_code, void *userp)
-{
- h2_session *session = (h2_session *)userp;
- (void)ngh2;
-
- if (APLOGctrace2(session->c)) {
- char buffer[256];
-
- frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
- "h2_session: callback on_frame_not_send error=%d %s",
- lib_error_code, buffer);
- }
- return 0;
-}
-
static apr_status_t stream_release(h2_session *session,
h2_stream *stream,
uint32_t error_code)
{
if (!error_code) {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
"h2_stream(%ld-%d): handled, closing",
session->id, (int)stream->id);
if (stream->id > session->max_stream_handled) {
h2_stream *stream;
(void)ngh2;
- if (session->aborted) {
- return NGHTTP2_ERR_CALLBACK_FAILURE;
- }
stream = h2_session_get_stream(session, stream_id);
if (stream) {
stream_release(session, stream, error_code);
}
-
- if (error_code) {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
- "h2_stream(%ld-%d): closed, error=%d",
- session->id, (int)stream_id, error_code);
- }
-
return 0;
}
static int on_begin_headers_cb(nghttp2_session *ngh2,
const nghttp2_frame *frame, void *userp)
{
+ h2_session *session = (h2_session *)userp;
h2_stream *s;
- /* This starts a new stream. */
+ /* We may see HEADERs at the start of a stream or after all DATA
+ * streams to carry trailers. */
(void)ngh2;
- s = h2_session_open_stream((h2_session *)userp, frame->hd.stream_id);
+ s = h2_session_get_stream(session, frame->hd.stream_id);
+ if (s) {
+ /* nop */
+ }
+ else {
+ s = h2_session_open_stream((h2_session *)userp, frame->hd.stream_id);
+ }
return s? 0 : NGHTTP2_ERR_CALLBACK_FAILURE;
}
(void)ngh2;
(void)flags;
- if (session->aborted) {
- return NGHTTP2_ERR_CALLBACK_FAILURE;
+ if (!is_accepting_streams(session)) {
+ /* just ignore */
+ return 0;
}
stream = h2_session_get_stream(session, frame->hd.stream_id);
apr_status_t status = APR_SUCCESS;
h2_stream *stream;
- if (session->aborted) {
- return NGHTTP2_ERR_CALLBACK_FAILURE;
+ if (APLOGcdebug(session->c)) {
+ char buffer[256];
+
+ frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+ "h2_session(%ld): recv FRAME[%s], frames=%ld/%ld (r/s)",
+ session->id, buffer, (long)session->frames_received,
+ (long)session->frames_sent);
}
-
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
- "h2_session(%ld): on_frame_rcv #%ld, type=%d", session->id,
- (long)session->frames_received, frame->hd.type);
++session->frames_received;
switch (frame->hd.type) {
case NGHTTP2_HEADERS:
+ /* This can be HEADERS for a new stream, defining the request,
+ * or HEADER may come after DATA at the end of a stream as in
+ * trailers */
stream = h2_session_get_stream(session, frame->hd.stream_id);
if (stream) {
int eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM);
- status = stream_end_headers(session, stream, eos);
+
+ if (h2_stream_is_scheduled(stream)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+ "h2_stream(%ld-%d): TRAILER, eos=%d",
+ session->id, frame->hd.stream_id, eos);
+ if (eos) {
+ status = h2_stream_close_input(stream);
+ }
+ }
+ else {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+ "h2_stream(%ld-%d): HEADER, eos=%d",
+ session->id, frame->hd.stream_id, eos);
+ status = stream_schedule(session, stream, eos);
+ }
}
else {
status = APR_EINVAL;
stream = h2_session_get_stream(session, frame->hd.stream_id);
if (stream) {
int eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+ "h2_stream(%ld-%d): DATA, len=%ld, eos=%d",
+ session->id, frame->hd.stream_id,
+ (long)frame->hd.length, eos);
if (eos) {
status = h2_stream_close_input(stream);
}
session->id, (int)frame->hd.stream_id,
frame->window_update.window_size_increment);
break;
+ case NGHTTP2_RST_STREAM:
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+ "h2_session(%ld-%d): RST_STREAM by client, errror=%d",
+ session->id, (int)frame->hd.stream_id,
+ (int)frame->rst_stream.error_code);
+ stream = h2_session_get_stream(session, frame->hd.stream_id);
+ if (stream && stream->initiated_on) {
+ ++session->pushes_reset;
+ }
+ else {
+ ++session->streams_reset;
+ }
+ break;
+ case NGHTTP2_GOAWAY:
+ dispatch_event(session, H2_SESSION_EV_REMOTE_GOAWAY, 0, NULL);
+ break;
default:
if (APLOGctrace2(session->c)) {
char buffer[256];
(void)ngh2;
(void)source;
- if (session->aborted) {
- return NGHTTP2_ERR_CALLBACK_FAILURE;
- }
-
if (frame->data.padlen > H2_MAX_PADLEN) {
return NGHTTP2_ERR_PROTO;
}
return h2_session_status_from_apr_status(status);
}
+static int on_frame_send_cb(nghttp2_session *ngh2,
+ const nghttp2_frame *frame,
+ void *user_data)
+{
+ h2_session *session = user_data;
+ if (APLOGcdebug(session->c)) {
+ char buffer[256];
+
+ frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+ "h2_session(%ld): sent FRAME[%s], frames=%ld/%ld (r/s)",
+ session->id, buffer, (long)session->frames_received,
+ (long)session->frames_sent);
+ }
+ ++session->frames_sent;
+ return 0;
+}
+
#define NGH2_SET_CALLBACK(callbacks, name, fn)\
nghttp2_session_callbacks_set_##name##_callback(callbacks, fn)
NGH2_SET_CALLBACK(*pcb, on_frame_recv, on_frame_recv_cb);
NGH2_SET_CALLBACK(*pcb, on_invalid_frame_recv, on_invalid_frame_recv_cb);
NGH2_SET_CALLBACK(*pcb, on_data_chunk_recv, on_data_chunk_recv_cb);
- NGH2_SET_CALLBACK(*pcb, before_frame_send, before_frame_send_cb);
- NGH2_SET_CALLBACK(*pcb, on_frame_send, on_frame_send_cb);
- NGH2_SET_CALLBACK(*pcb, on_frame_not_send, on_frame_not_send_cb);
NGH2_SET_CALLBACK(*pcb, on_stream_close, on_stream_close_cb);
NGH2_SET_CALLBACK(*pcb, on_begin_headers, on_begin_headers_cb);
NGH2_SET_CALLBACK(*pcb, on_header, on_header_cb);
NGH2_SET_CALLBACK(*pcb, send_data, on_send_data_cb);
+ NGH2_SET_CALLBACK(*pcb, on_frame_send, on_frame_send_cb);
+
+ return APR_SUCCESS;
+}
+
+static void h2_session_cleanup(h2_session *session)
+{
+ AP_DEBUG_ASSERT(session);
+ /* This is an early cleanup of the session that may
+ * discard what is no longer necessary for *new* streams
+ * and general HTTP/2 processing.
+ * At this point, all frames are in transit or somehwere in
+ * our buffers or passed down output filters.
+ * h2 streams might still being written out.
+ */
+ if (session->c) {
+ h2_ctx_clear(session->c);
+ }
+ if (session->ngh2) {
+ nghttp2_session_del(session->ngh2);
+ session->ngh2 = NULL;
+ }
+ if (session->spare) {
+ apr_pool_destroy(session->spare);
+ session->spare = NULL;
+ }
+}
+
+static void h2_session_destroy(h2_session *session)
+{
+ AP_DEBUG_ASSERT(session);
+ h2_session_cleanup(session);
+
+ if (APLOGctrace1(session->c)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+ "h2_session(%ld): destroy, %d streams open",
+ session->id, (int)h2_stream_set_size(session->streams));
+ }
+ if (session->mplx) {
+ h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
+ h2_mplx_release_and_join(session->mplx, session->iowait);
+ session->mplx = NULL;
+ }
+ if (session->streams) {
+ h2_stream_set_destroy(session->streams);
+ session->streams = NULL;
+ }
+ if (session->pool) {
+ apr_pool_destroy(session->pool);
+ }
+}
+
+static apr_status_t h2_session_shutdown(h2_session *session, int reason, const char *msg)
+{
+ apr_status_t status = APR_SUCCESS;
+ const char *err = msg;
+
+ AP_DEBUG_ASSERT(session);
+ if (!err && reason) {
+ err = nghttp2_strerror(reason);
+ }
+ nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE,
+ h2_mplx_get_max_stream_started(session->mplx),
+ reason, (uint8_t*)err, err? strlen(err):0);
+ status = nghttp2_session_send(session->ngh2);
+ h2_conn_io_flush(&session->io);
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+ "session(%ld): sent GOAWAY, err=%d, msg=%s",
+ session->id, reason, err? err : "");
+ dispatch_event(session, H2_SESSION_EV_LOCAL_GOAWAY, reason, err);
+ return status;
+}
+
+static apr_status_t session_pool_cleanup(void *data)
+{
+ h2_session *session = data;
+ /* On a controlled connection shutdown, this gets never
+ * called as we deregister and destroy our pool manually.
+ * However when we have an async mpm, and handed it our idle
+ * connection, it will just cleanup once the connection is closed
+ * from the other side (and sometimes even from out side) and
+ * here we arrive then.
+ */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+ "session(%ld): pool_cleanup", session->id);
+ if (session->state != H2_SESSION_ST_DONE
+ && session->state != H2_SESSION_ST_LOCAL_SHUTDOWN) {
+ /* Not good. The connection is being torn down and we have
+ * not sent a goaway. This is considered a protocol error and
+ * the client has to assume that any streams "in flight" may have
+ * been processed and are not safe to retry.
+ * As clients with idle connection may only learn about a closed
+ * connection when sending the next request, this has the effect
+ * that at least this one request will fail.
+ */
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, session->c,
+ "session(%ld): connection disappeared without proper "
+ "goodbye, clients will be confused, should not happen",
+ session->id);
+ }
+ /* keep us from destroying the pool, since that is already ongoing. */
+ session->pool = NULL;
+ h2_session_destroy(session);
return APR_SUCCESS;
}
+static void *session_malloc(size_t size, void *ctx)
+{
+ h2_session *session = ctx;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, session->c,
+ "h2_session(%ld): malloc(%ld)",
+ session->id, (long)size);
+ return malloc(size);
+}
+
+static void session_free(void *p, void *ctx)
+{
+ h2_session *session = ctx;
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, session->c,
+ "h2_session(%ld): free()",
+ session->id);
+ free(p);
+}
+
+static void *session_calloc(size_t n, size_t size, void *ctx)
+{
+ h2_session *session = ctx;
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, session->c,
+ "h2_session(%ld): calloc(%ld, %ld)",
+ session->id, (long)n, (long)size);
+ return calloc(n, size);
+}
+
+static void *session_realloc(void *p, size_t size, void *ctx)
+{
+ h2_session *session = ctx;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, session->c,
+ "h2_session(%ld): realloc(%ld)",
+ session->id, (long)size);
+ return realloc(p, size);
+}
+
static h2_session *h2_session_create_int(conn_rec *c,
request_rec *r,
- h2_config *config,
+ h2_ctx *ctx,
h2_workers *workers)
{
nghttp2_session_callbacks *callbacks = NULL;
nghttp2_option *options = NULL;
+ uint32_t n;
apr_pool_t *pool = NULL;
- apr_status_t status = apr_pool_create(&pool, r? r->pool : c->pool);
+ apr_status_t status = apr_pool_create(&pool, c->pool);
h2_session *session;
if (status != APR_SUCCESS) {
return NULL;
session = apr_pcalloc(pool, sizeof(h2_session));
if (session) {
int rv;
+ nghttp2_mem *mem;
+
session->id = c->id;
session->c = c;
session->r = r;
+ session->s = h2_ctx_server_get(ctx);
+ session->config = h2_config_sget(session->s);
+
+ session->state = H2_SESSION_ST_INIT;
- session->max_stream_count = h2_config_geti(config, H2_CONF_MAX_STREAMS);
- session->max_stream_mem = h2_config_geti(config, H2_CONF_STREAM_MAX_MEM);
-
session->pool = pool;
+ apr_pool_pre_cleanup_register(pool, session, session_pool_cleanup);
+
+ session->max_stream_count = h2_config_geti(session->config, H2_CONF_MAX_STREAMS);
+ session->max_stream_mem = h2_config_geti(session->config, H2_CONF_STREAM_MAX_MEM);
+ session->timeout_secs = h2_config_geti(session->config, H2_CONF_TIMEOUT_SECS);
+ if (session->timeout_secs <= 0) {
+ session->timeout_secs = apr_time_sec(session->s->timeout);
+ }
+ session->keepalive_secs = h2_config_geti(session->config, H2_CONF_KEEPALIVE_SECS);
+ if (session->keepalive_secs <= 0) {
+ session->keepalive_secs = apr_time_sec(session->s->keep_alive_timeout);
+ }
status = apr_thread_cond_create(&session->iowait, session->pool);
if (status != APR_SUCCESS) {
session->streams = h2_stream_set_create(session->pool, session->max_stream_count);
session->workers = workers;
- session->mplx = h2_mplx_create(c, session->pool, workers);
+ session->mplx = h2_mplx_create(c, session->pool, session->config, workers);
- h2_conn_io_init(&session->io, c);
+ h2_mplx_set_consumed_cb(session->mplx, update_window, session);
+
+ /* Install the connection input filter that feeds the session */
+ session->cin = h2_filter_cin_create(session->pool, h2_session_receive, session);
+ ap_add_input_filter("H2_IN", session->cin, r, c);
+
+ h2_conn_io_init(&session->io, c, session->config, session->pool);
session->bbtmp = apr_brigade_create(session->pool, c->bucket_alloc);
status = init_callbacks(c, &callbacks);
h2_session_destroy(session);
return NULL;
}
-
nghttp2_option_set_peer_max_concurrent_streams(options,
(uint32_t)session->max_stream_count);
-
/* We need to handle window updates ourself, otherwise we
* get flooded by nghttp2. */
nghttp2_option_set_no_auto_window_update(options, 1);
- rv = nghttp2_session_server_new2(&session->ngh2, callbacks,
- session, options);
+ if (APLOGctrace6(c)) {
+ mem = apr_pcalloc(session->pool, sizeof(nghttp2_mem));
+ mem->mem_user_data = session;
+ mem->malloc = session_malloc;
+ mem->free = session_free;
+ mem->calloc = session_calloc;
+ mem->realloc = session_realloc;
+
+ rv = nghttp2_session_server_new3(&session->ngh2, callbacks,
+ session, options, mem);
+ }
+ else {
+ rv = nghttp2_session_server_new2(&session->ngh2, callbacks,
+ session, options);
+ }
nghttp2_session_callbacks_del(callbacks);
nghttp2_option_del(options);
h2_session_destroy(session);
return NULL;
}
+
+ n = h2_config_geti(session->config, H2_CONF_PUSH_DIARY_SIZE);
+ session->push_diary = h2_push_diary_create(session->pool, n);
+ if (APLOGcdebug(c)) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c,
+ "session(%ld) created, timeout=%d, keepalive_timeout=%d, "
+ "max_streams=%d, stream_mem=%d, push_diary(type=%d,N=%d)",
+ session->id, session->timeout_secs, session->keepalive_secs,
+ (int)session->max_stream_count, (int)session->max_stream_mem,
+ session->push_diary->dtype,
+ (int)session->push_diary->N);
+ }
}
return session;
}
-h2_session *h2_session_create(conn_rec *c, h2_config *config,
- h2_workers *workers)
-{
- return h2_session_create_int(c, NULL, config, workers);
-}
-
-h2_session *h2_session_rcreate(request_rec *r, h2_config *config,
- h2_workers *workers)
+h2_session *h2_session_create(conn_rec *c, h2_ctx *ctx, h2_workers *workers)
{
- return h2_session_create_int(r->connection, r, config, workers);
+ return h2_session_create_int(c, NULL, ctx, workers);
}
-void h2_session_destroy(h2_session *session)
+h2_session *h2_session_rcreate(request_rec *r, h2_ctx *ctx, h2_workers *workers)
{
- AP_DEBUG_ASSERT(session);
- if (session->mplx) {
- h2_mplx_release_and_join(session->mplx, session->iowait);
- session->mplx = NULL;
- }
- if (session->streams) {
- if (!h2_stream_set_is_empty(session->streams)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
- "h2_session(%ld): destroy, %d streams open",
- session->id, (int)h2_stream_set_size(session->streams));
- }
- h2_stream_set_destroy(session->streams);
- session->streams = NULL;
- }
- if (session->ngh2) {
- nghttp2_session_del(session->ngh2);
- session->ngh2 = NULL;
- }
- if (session->spare) {
- apr_pool_destroy(session->spare);
- session->spare = NULL;
- }
- if (session->pool) {
- apr_pool_destroy(session->pool);
- }
+ return h2_session_create_int(r->connection, r, ctx, workers);
}
-void h2_session_cleanup(h2_session *session)
+void h2_session_eoc_callback(h2_session *session)
{
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
"session(%ld): cleanup and destroy", session->id);
+ apr_pool_cleanup_kill(session->pool, session, session_pool_cleanup);
h2_session_destroy(session);
}
-static apr_status_t h2_session_abort_int(h2_session *session, int reason)
-{
- AP_DEBUG_ASSERT(session);
- if (!session->aborted) {
- session->aborted = 1;
-
- if (session->ngh2) {
- if (NGHTTP2_ERR_EOF == reason) {
- /* This is our way of indication that the connection is
- * gone. No use to send any GOAWAY frames. */
- nghttp2_session_terminate_session(session->ngh2, reason);
- }
- else if (!reason) {
- nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE,
- session->max_stream_received,
- reason, NULL, 0);
- nghttp2_session_send(session->ngh2);
- }
- else {
- const char *err = nghttp2_strerror(reason);
-
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
- "session(%ld): aborting session, reason=%d %s",
- session->id, reason, err);
-
- /* The connection might still be there and we shut down
- * with GOAWAY and reason information. */
- nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE,
- session->max_stream_received,
- reason, (const uint8_t *)err,
- strlen(err));
- nghttp2_session_send(session->ngh2);
- }
- h2_session_flush(session);
- }
- h2_mplx_abort(session->mplx);
- }
- return APR_SUCCESS;
-}
-
-apr_status_t h2_session_abort(h2_session *session, apr_status_t reason, int rv)
+static apr_status_t h2_session_start(h2_session *session, int *rv)
{
- AP_DEBUG_ASSERT(session);
- if (rv == 0) {
- rv = NGHTTP2_ERR_PROTO;
- switch (reason) {
- case APR_ENOMEM:
- rv = NGHTTP2_ERR_NOMEM;
- break;
- case APR_SUCCESS: /* all fine, just... */
- case APR_EOF: /* client closed its end... */
- case APR_TIMEUP: /* got bored waiting... */
- rv = 0; /* ...gracefully shut down */
- break;
- case APR_EBADF: /* connection unusable, terminate silently */
- default:
- if (APR_STATUS_IS_ECONNABORTED(reason)
- || APR_STATUS_IS_ECONNRESET(reason)
- || APR_STATUS_IS_EBADF(reason)) {
- rv = NGHTTP2_ERR_EOF;
- }
- break;
- }
- }
- return h2_session_abort_int(session, rv);
-}
-
-apr_status_t h2_session_start(h2_session *session, int *rv)
-{
- apr_status_t status = APR_SUCCESS;
- h2_config *config;
- nghttp2_settings_entry settings[3];
-
+ apr_status_t status = APR_SUCCESS;
+ nghttp2_settings_entry settings[3];
+ size_t slen;
+ int win_size;
+
AP_DEBUG_ASSERT(session);
/* Start the conversation by submitting our SETTINGS frame */
*rv = 0;
- config = h2_config_get(session->c);
if (session->r) {
const char *s, *cs;
apr_size_t dlen;
h2_stream * stream;
- /* better for vhost matching */
- config = h2_config_rget(session->r);
-
/* 'h2c' mode: we should have a 'HTTP2-Settings' header with
* base64 encoded client settings. */
s = apr_table_get(session->r->headers_in, "HTTP2-Settings");
if (status != APR_SUCCESS) {
return status;
}
- status = stream_end_headers(session, stream, 1);
+ status = stream_schedule(session, stream, 1);
if (status != APR_SUCCESS) {
return status;
}
}
- settings[0].settings_id = NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS;
- settings[0].value = (uint32_t)session->max_stream_count;
- settings[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
- settings[1].value = h2_config_geti(config, H2_CONF_WIN_SIZE);
- settings[2].settings_id = NGHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE;
- settings[2].value = 64*1024;
+ slen = 0;
+ settings[slen].settings_id = NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS;
+ settings[slen].value = (uint32_t)session->max_stream_count;
+ ++slen;
+ win_size = h2_config_geti(session->config, H2_CONF_WIN_SIZE);
+ if (win_size != H2_INITIAL_WINDOW_SIZE) {
+ settings[slen].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
+ settings[slen].value = win_size;
+ ++slen;
+ }
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
+ "h2_session(%ld): start, INITIAL_WINDOW_SIZE=%ld, "
+ "MAX_CONCURRENT_STREAMS=%d",
+ session->id, (long)win_size, (int)session->max_stream_count);
*rv = nghttp2_submit_settings(session->ngh2, NGHTTP2_FLAG_NONE,
- settings,
- sizeof(settings)/sizeof(settings[0]));
+ settings, slen);
if (*rv != 0) {
status = APR_EGENERAL;
ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c,
APLOGNO(02935) "nghttp2_submit_settings: %s",
nghttp2_strerror(*rv));
}
-
+ else {
+ /* use maximum possible value for connection window size. We are only
+ * interested in per stream flow control. which have the initial window
+ * size configured above.
+ * Therefore, for our use, the connection window can only get in the
+ * way. Example: if we allow 100 streams with a 32KB window each, we
+ * buffer up to 3.2 MB of data. Unless we do separate connection window
+ * interim updates, any smaller connection window will lead to blocking
+ * in DATA flow.
+ */
+ *rv = nghttp2_submit_window_update(session->ngh2, NGHTTP2_FLAG_NONE,
+ 0, NGHTTP2_MAX_WINDOW_SIZE - win_size);
+ if (*rv != 0) {
+ status = APR_EGENERAL;
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c,
+ APLOGNO(02970) "nghttp2_submit_window_update: %s",
+ nghttp2_strerror(*rv));
+ }
+ }
return status;
}
int resume_count;
} resume_ctx;
-static int resume_on_data(void *ctx, h2_stream *stream) {
+static int resume_on_data(void *ctx, h2_stream *stream)
+{
resume_ctx *rctx = (resume_ctx*)ctx;
h2_session *session = rctx->session;
AP_DEBUG_ASSERT(session);
ap_log_cerror(APLOG_MARK, nghttp2_is_fatal(rv)?
APLOG_ERR : APLOG_DEBUG, 0, session->c,
APLOGNO(02936)
- "h2_stream(%ld-%d): resuming stream %s",
- session->id, stream->id, nghttp2_strerror(rv));
+ "h2_stream(%ld-%d): resuming %s",
+ session->id, stream->id, rv? nghttp2_strerror(rv) : "");
}
}
return 1;
}
-static int h2_session_resume_streams_with_data(h2_session *session) {
+static int h2_session_resume_streams_with_data(h2_session *session)
+{
AP_DEBUG_ASSERT(session);
if (!h2_stream_set_is_empty(session->streams)
- && session->mplx && !session->aborted) {
+ && session->mplx && !session->mplx->aborted) {
resume_ctx ctx;
ctx.session = session;
return 0;
}
-static void update_window(void *ctx, int stream_id, apr_off_t bytes_read)
-{
- h2_session *session = (h2_session*)ctx;
- nghttp2_session_consume(session->ngh2, stream_id, bytes_read);
-}
-
h2_stream *h2_session_get_stream(h2_session *session, int stream_id)
{
if (!session->last_stream || stream_id != session->last_stream->id) {
return session->last_stream;
}
-/* h2_io_on_read_cb implementation that offers the data read
- * directly to the session for consumption.
- */
-static apr_status_t session_receive(const char *data, apr_size_t len,
- apr_size_t *readlen, int *done,
- void *puser)
-{
- h2_session *session = (h2_session *)puser;
- AP_DEBUG_ASSERT(session);
- if (len > 0) {
- ssize_t n = nghttp2_session_mem_recv(session->ngh2,
- (const uint8_t *)data, len);
- if (n < 0) {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, APR_EGENERAL,
- session->c,
- "h2_session: nghttp2_session_mem_recv error %d",
- (int)n);
- if (nghttp2_is_fatal((int)n)) {
- *done = 1;
- h2_session_abort_int(session, (int)n);
- return APR_EGENERAL;
- }
- }
- else {
- *readlen = n;
- }
- }
- return APR_SUCCESS;
-}
-
-apr_status_t h2_session_close(h2_session *session)
-{
- AP_DEBUG_ASSERT(session);
- if (!session->aborted) {
- h2_session_abort_int(session, 0);
- }
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0,session->c,
- "h2_session: closing, writing eoc");
- h2_conn_io_writeb(&session->io,
- h2_bucket_eoc_create(session->c->bucket_alloc,
- session));
- return h2_session_flush(session);
-}
-
static ssize_t stream_data_cb(nghttp2_session *ng2s,
int32_t stream_id,
uint8_t *buf,
* to find out how much of the requested length we can send without
* blocking.
* Indicate EOS when we encounter it or DEFERRED if the stream
- * should be suspended.
- * TODO: for handling of TRAILERS, the EOF indication needs
- * to be aware of that.
+ * should be suspended. Beware of trailers.
*/
(void)ng2s;
nread = 0;
h2_stream_set_suspended(stream, 1);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
- "h2_stream(%ld-%d): suspending stream",
+ "h2_stream(%ld-%d): suspending",
session->id, (int)stream_id);
return NGHTTP2_ERR_DEFERRED;
}
if (eos) {
+ apr_table_t *trailers = h2_stream_get_trailers(stream);
+ if (trailers && !apr_is_empty_table(trailers)) {
+ h2_ngheader *nh;
+ int rv;
+
+ nh = h2_util_ngheader_make(stream->pool, trailers);
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+ "h2_stream(%ld-%d): submit %d trailers",
+ session->id, (int)stream_id,(int) nh->nvlen);
+ rv = nghttp2_submit_trailer(ng2s, stream->id, nh->nv, nh->nvlen);
+ if (rv < 0) {
+ nread = rv;
+ }
+ *data_flags |= NGHTTP2_DATA_FLAG_NO_END_STREAM;
+ }
+
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
}
static apr_status_t submit_response(h2_session *session, h2_stream *stream)
{
apr_status_t status = APR_SUCCESS;
+ h2_response *response = h2_stream_get_response(stream);
int rv = 0;
AP_DEBUG_ASSERT(session);
AP_DEBUG_ASSERT(stream);
- AP_DEBUG_ASSERT(stream->response || stream->rst_error);
+ AP_DEBUG_ASSERT(response || stream->rst_error);
if (stream->submitted) {
rv = NGHTTP2_PROTOCOL_ERROR;
}
- else if (stream->response && stream->response->header) {
+ else if (response && response->headers) {
nghttp2_data_provider provider;
- h2_response *response = stream->response;
h2_ngheader *ngh;
+ const h2_priority *prio;
memset(&provider, 0, sizeof(provider));
provider.source.fd = stream->id;
"h2_stream(%ld-%d): submit response %d",
session->id, stream->id, response->http_status);
- ngh = h2_util_ngheader_make_res(stream->pool, response->http_status,
- response->header);
- rv = nghttp2_submit_response(session->ngh2, response->stream_id,
- ngh->nv, ngh->nvlen, &provider);
-
- /* If the submit worked,
- * and this stream is not a pushed one itself,
+ /* If this stream is not a pushed one itself,
* and HTTP/2 server push is enabled here,
* and the response is in the range 200-299 *),
* and the remote side has pushing enabled,
* -> find and perform any pushes on this stream
+ * *before* we submit the stream response itself.
+ * This helps clients avoid opening new streams on Link
+ * headers that get pushed right afterwards.
*
* *) the response code is relevant, as we do not want to
* make pushes on 401 or 403 codes, neiterh on 301/302
* as the client, having this resource in its cache, might
* also have the pushed ones as well.
*/
- if (!rv
- && !stream->initiated_on
- && h2_config_geti(h2_config_get(session->c), H2_CONF_PUSH)
+ if (!stream->initiated_on
&& H2_HTTP_2XX(response->http_status)
&& h2_session_push_enabled(session)) {
h2_stream_submit_pushes(stream);
}
+
+ prio = h2_stream_get_priority(stream);
+ if (prio) {
+ h2_session_set_prio(session, stream, prio);
+ /* no showstopper if that fails for some reason */
+ }
+
+ ngh = h2_util_ngheader_make_res(stream->pool, response->http_status,
+ response->headers);
+ rv = nghttp2_submit_response(session->ngh2, response->stream_id,
+ ngh->nv, ngh->nvlen, &provider);
}
else {
int err = H2_STREAM_RST(stream, H2_ERR_PROTOCOL_ERROR);
}
stream->submitted = 1;
+ if (stream->initiated_on) {
+ ++session->pushes_submitted;
+ }
+ else {
+ ++session->responses_submitted;
+ }
if (nghttp2_is_fatal(rv)) {
status = APR_EGENERAL;
- h2_session_abort_int(session, rv);
+ dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, rv, nghttp2_strerror(rv));
ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c,
APLOGNO(02940) "submit_response: %s",
nghttp2_strerror(rv));
int nid;
ngh = h2_util_ngheader_make_req(is->pool, push->req);
- nid = nghttp2_submit_push_promise(session->ngh2, 0, push->initiating_id,
+ nid = nghttp2_submit_push_promise(session->ngh2, 0, is->id,
ngh->nv, ngh->nvlen, NULL);
-
if (nid <= 0) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
"h2_stream(%ld-%d): submitting push promise fail: %s",
- session->id, push->initiating_id,
- nghttp2_strerror(nid));
+ session->id, is->id, nghttp2_strerror(nid));
return NULL;
}
+ ++session->pushes_promised;
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
- "h2_stream(%ld-%d): promised new stream %d for %s %s",
- session->id, push->initiating_id, nid,
- push->req->method, push->req->path);
+ "h2_stream(%ld-%d): SERVER_PUSH %d for %s %s on %d",
+ session->id, is->id, nid,
+ push->req->method, push->req->path, is->id);
stream = h2_session_open_stream(session, nid);
if (stream) {
h2_stream_set_h2_request(stream, is->id, push->req);
- status = stream_end_headers(session, stream, 1);
+ status = stream_schedule(session, stream, 1);
if (status != APR_SUCCESS) {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
"h2_stream(%ld-%d): scheduling push stream",
session->id, stream->id);
h2_stream_cleanup(stream);
stream = NULL;
}
+ ++session->unsent_promises;
}
else {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
"h2_stream(%ld-%d): failed to create stream obj %d",
- session->id, push->initiating_id, nid);
+ session->id, is->id, nid);
}
if (!stream) {
return stream;
}
+static int valid_weight(float f)
+{
+ int w = (int)f;
+ return (w < NGHTTP2_MIN_WEIGHT? NGHTTP2_MIN_WEIGHT :
+ (w > NGHTTP2_MAX_WEIGHT)? NGHTTP2_MAX_WEIGHT : w);
+}
+
+apr_status_t h2_session_set_prio(h2_session *session, h2_stream *stream,
+ const h2_priority *prio)
+{
+ apr_status_t status = APR_SUCCESS;
+#ifdef H2_NG2_CHANGE_PRIO
+ nghttp2_stream *s_grandpa, *s_parent, *s;
+
+ s = nghttp2_session_find_stream(session->ngh2, stream->id);
+ if (!s) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+ "h2_stream(%ld-%d): lookup of nghttp2_stream failed",
+ session->id, stream->id);
+ return APR_EINVAL;
+ }
+
+ s_parent = nghttp2_stream_get_parent(s);
+ if (s_parent) {
+ nghttp2_priority_spec ps;
+ int id_parent, id_grandpa, w_parent, w, rv = 0;
+ char *ptype = "AFTER";
+ h2_dependency dep = prio->dependency;
+
+ id_parent = nghttp2_stream_get_stream_id(s_parent);
+ s_grandpa = nghttp2_stream_get_parent(s_parent);
+ if (s_grandpa) {
+ id_grandpa = nghttp2_stream_get_stream_id(s_grandpa);
+ }
+ else {
+ /* parent of parent does not exist,
+ * only possible if parent == root */
+ dep = H2_DEPENDANT_AFTER;
+ }
+
+ switch (dep) {
+ case H2_DEPENDANT_INTERLEAVED:
+ /* PUSHed stream is to be interleaved with initiating stream.
+ * It is made a sibling of the initiating stream and gets a
+ * proportional weight [1, MAX_WEIGHT] of the initiaing
+ * stream weight.
+ */
+ ptype = "INTERLEAVED";
+ w_parent = nghttp2_stream_get_weight(s_parent);
+ w = valid_weight(w_parent * ((float)prio->weight / NGHTTP2_MAX_WEIGHT));
+ nghttp2_priority_spec_init(&ps, id_grandpa, w, 0);
+ break;
+
+ case H2_DEPENDANT_BEFORE:
+ /* PUSHed stream os to be sent BEFORE the initiating stream.
+ * It gets the same weight as the initiating stream, replaces
+ * that stream in the dependency tree and has the initiating
+ * stream as child.
+ */
+ ptype = "BEFORE";
+ w = w_parent = nghttp2_stream_get_weight(s_parent);
+ nghttp2_priority_spec_init(&ps, stream->id, w_parent, 0);
+ id_grandpa = nghttp2_stream_get_stream_id(s_grandpa);
+ rv = nghttp2_session_change_stream_priority(session->ngh2, id_parent, &ps);
+ if (rv < 0) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+ "h2_stream(%ld-%d): PUSH BEFORE, weight=%d, "
+ "depends=%d, returned=%d",
+ session->id, id_parent, ps.weight, ps.stream_id, rv);
+ return APR_EGENERAL;
+ }
+ nghttp2_priority_spec_init(&ps, id_grandpa, w, 0);
+ break;
+
+ case H2_DEPENDANT_AFTER:
+ /* The PUSHed stream is to be sent after the initiating stream.
+ * Give if the specified weight and let it depend on the intiating
+ * stream.
+ */
+ /* fall through, it's the default */
+ default:
+ nghttp2_priority_spec_init(&ps, id_parent, valid_weight(prio->weight), 0);
+ break;
+ }
+
+
+ rv = nghttp2_session_change_stream_priority(session->ngh2, stream->id, &ps);
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+ "h2_stream(%ld-%d): PUSH %s, weight=%d, "
+ "depends=%d, returned=%d",
+ session->id, stream->id, ptype,
+ ps.weight, ps.stream_id, rv);
+ status = (rv < 0)? APR_EGENERAL : APR_SUCCESS;
+ }
+#else
+ (void)session;
+ (void)stream;
+ (void)prio;
+ (void)valid_weight;
+#endif
+ return status;
+}
+
apr_status_t h2_session_stream_destroy(h2_session *session, h2_stream *stream)
{
apr_pool_t *pool = h2_stream_detach_pool(stream);
- h2_mplx_stream_done(session->mplx, stream->id, stream->rst_error);
- if (session->last_stream == stream) {
- session->last_stream = NULL;
+ /* this may be called while the session has already freed
+ * some internal structures. */
+ if (session->mplx) {
+ h2_mplx_stream_done(session->mplx, stream->id, stream->rst_error);
+ if (session->last_stream == stream) {
+ session->last_stream = NULL;
+ }
+ }
+
+ if (session->streams) {
+ h2_stream_set_remove(session->streams, stream->id);
}
- h2_stream_set_remove(session->streams, stream->id);
h2_stream_destroy(stream);
if (pool) {
}
default:
return apr_snprintf(buffer, maxlen,
- "FRAME[type=%d, length=%d, flags=%d, stream=%d]",
+ "type=%d[length=%d, flags=%d, stream=%d]",
frame->hd.type, (int)frame->hd.length,
frame->hd.flags, frame->hd.stream_id);
}
int h2_session_push_enabled(h2_session *session)
{
- return nghttp2_session_get_remote_settings(session->ngh2,
- NGHTTP2_SETTINGS_ENABLE_PUSH);
+ /* iff we can and they can */
+ return (h2_config_geti(session->config, H2_CONF_PUSH)
+ && nghttp2_session_get_remote_settings(session->ngh2,
+ NGHTTP2_SETTINGS_ENABLE_PUSH));
}
+static apr_status_t h2_session_send(h2_session *session)
+{
+ int rv = nghttp2_session_send(session->ngh2);
+ if (rv != 0) {
+ if (nghttp2_is_fatal(rv)) {
+ dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, rv, nghttp2_strerror(rv));
+ return APR_EGENERAL;
+ }
+ }
+
+ session->unsent_promises = 0;
+ session->unsent_submits = 0;
+
+ return APR_SUCCESS;
+}
-apr_status_t h2_session_process(h2_session *session)
+static apr_status_t h2_session_receive(void *ctx, const char *data,
+ apr_size_t len, apr_size_t *readlen)
{
- apr_status_t status = APR_SUCCESS;
- apr_interval_time_t wait_micros = 0;
- static const int MAX_WAIT_MICROS = 200 * 1000;
- int got_streams = 0;
-
- while (!session->aborted && (nghttp2_session_want_read(session->ngh2)
- || nghttp2_session_want_write(session->ngh2))) {
- int have_written = 0;
- int have_read = 0;
-
- /* Send data as long as we have it and window sizes allow. We are
- * a server after all.
- */
- if (nghttp2_session_want_write(session->ngh2)) {
- int rv;
-
- rv = nghttp2_session_send(session->ngh2);
- if (rv != 0) {
- ap_log_cerror( APLOG_MARK, APLOG_DEBUG, 0, session->c,
- "h2_session: send: %s", nghttp2_strerror(rv));
- if (nghttp2_is_fatal(rv)) {
- h2_session_abort(session, status, rv);
- goto end_process;
- }
- }
- else {
- have_written = 1;
- wait_micros = 0;
+ h2_session *session = ctx;
+ ssize_t n;
+
+ if (len > 0) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ "h2_session(%ld): feeding %ld bytes to nghttp2",
+ session->id, (long)len);
+ n = nghttp2_session_mem_recv(session->ngh2, (const uint8_t *)data, len);
+ if (n < 0) {
+ if (nghttp2_is_fatal((int)n)) {
+ dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, (int)n, nghttp2_strerror(n));
+ return APR_EGENERAL;
}
}
-
- if (have_written) {
- h2_session_flush(session);
- }
-
- if (wait_micros > 0) {
- ap_log_cerror( APLOG_MARK, APLOG_TRACE3, 0, session->c,
- "h2_session: wait for data, %ld micros", (long)(wait_micros));
- status = h2_mplx_out_trywait(session->mplx, wait_micros, session->iowait);
-
- if (status == APR_TIMEUP) {
- if (wait_micros < MAX_WAIT_MICROS) {
- wait_micros *= 2;
- }
- }
+ else {
+ *readlen = n;
+ session->io.bytes_read += n;
}
-
- if (nghttp2_session_want_read(session->ngh2))
- {
- /* When we
- * - and have no streams at all
- * - or have streams, but none is suspended or needs submit and
- * have nothing written on the last try
- *
- * or, the other way around
- * - have only streams where data can be sent, but could
- * not send anything
- *
- * then we are waiting on frames from the client (for
- * example WINDOW_UPDATE or HEADER) and without new frames
- * from the client, we cannot make any progress,
- *
- * and *then* we can safely do a blocking read.
- */
- int may_block = (session->frames_received <= 1);
- if (!may_block) {
- if (got_streams) {
- may_block = (!have_written
- && !h2_stream_set_has_unsubmitted(session->streams)
- && !h2_stream_set_has_suspended(session->streams));
- }
- else {
- may_block = 1;
- }
- }
-
- if (may_block) {
- h2_session_flush(session);
- if (session->c->cs) {
- session->c->cs->state = (got_streams? CONN_STATE_HANDLER
- : CONN_STATE_WRITE_COMPLETION);
- }
- status = h2_conn_io_read(&session->io, APR_BLOCK_READ,
- session_receive, session);
- }
- else {
- if (session->c->cs) {
- session->c->cs->state = CONN_STATE_HANDLER;
- }
- status = h2_conn_io_read(&session->io, APR_NONBLOCK_READ,
- session_receive, session);
- }
+ }
+ return APR_SUCCESS;
+}
- switch (status) {
- case APR_SUCCESS: /* successful read, reset our idle timers */
- have_read = 1;
- wait_micros = 0;
- break;
- case APR_EAGAIN: /* non-blocking read, nothing there */
- break;
- default:
+static apr_status_t h2_session_read(h2_session *session, int block, int loops)
+{
+ apr_status_t status, rstatus = APR_EAGAIN;
+ conn_rec *c = session->c;
+ int i;
+
+ for (i = 0; i < loops; ++i) {
+ /* H2_IN filter handles all incoming data against the session.
+ * We just pull at the filter chain to make it happen */
+ status = ap_get_brigade(c->input_filters,
+ session->bbtmp, AP_MODE_READBYTES,
+ block? APR_BLOCK_READ : APR_NONBLOCK_READ,
+ APR_BUCKET_BUFF_SIZE);
+ /* get rid of any possible data we do not expect to get */
+ apr_brigade_cleanup(session->bbtmp);
+
+ switch (status) {
+ case APR_SUCCESS:
+ /* successful read, reset our idle timers */
+ rstatus = APR_SUCCESS;
+ if (block) {
+ /* successfull blocked read, try unblocked to
+ * get more. */
+ block = 0;
+ }
+ break;
+ case APR_EAGAIN:
+ return rstatus;
+ case APR_TIMEUP:
+ return status;
+ default:
+ if (!i) {
+ /* first attempt failed */
if (APR_STATUS_IS_ETIMEDOUT(status)
|| APR_STATUS_IS_ECONNABORTED(status)
|| APR_STATUS_IS_ECONNRESET(status)
|| APR_STATUS_IS_EOF(status)
|| APR_STATUS_IS_EBADF(status)) {
/* common status for a client that has left */
- ap_log_cerror( APLOG_MARK, APLOG_DEBUG, status, session->c,
- "h2_session(%ld): terminating",
- session->id);
- /* Stolen from mod_reqtimeout to speed up lingering when
- * a read timeout happened.
- */
- apr_table_setn(session->c->notes, "short-lingering-close", "1");
+ ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
+ "h2_session(%ld): input gone", session->id);
}
else {
/* uncommon status, log on INFO so that we see this */
- ap_log_cerror( APLOG_MARK, APLOG_INFO, status, session->c,
+ ap_log_cerror( APLOG_MARK, APLOG_INFO, status, c,
APLOGNO(02950)
"h2_session(%ld): error reading, terminating",
session->id);
}
- h2_session_abort(session, status, 0);
- goto end_process;
+ return status;
+ }
+ /* subsequent failure after success(es), return initial
+ * status. */
+ return rstatus;
+ }
+ if (!is_accepting_streams(session)) {
+ break;
+ }
+ }
+ return rstatus;
+}
+
+static apr_status_t h2_session_submit(h2_session *session)
+{
+ apr_status_t status = APR_EAGAIN;
+ h2_stream *stream;
+
+ if (h2_stream_set_has_unsubmitted(session->streams)) {
+ /* If we have responses ready, submit them now. */
+ while ((stream = h2_mplx_next_submit(session->mplx, session->streams))) {
+ status = submit_response(session, stream);
+ ++session->unsent_submits;
+
+ /* Unsent push promises are written immediately, as nghttp2
+ * 1.5.0 realizes internal stream data structures only on
+ * send and we might need them for other submits.
+ * Also, to conserve memory, we send at least every 10 submits
+ * so that nghttp2 does not buffer all outbound items too
+ * long.
+ */
+ if (status == APR_SUCCESS
+ && (session->unsent_promises || session->unsent_submits > 10)) {
+ status = h2_session_send(session);
+ if (status != APR_SUCCESS) {
+ break;
+ }
}
}
+ }
+ return status;
+}
+
+static const char *StateNames[] = {
+ "INIT", /* H2_SESSION_ST_INIT */
+ "DONE", /* H2_SESSION_ST_DONE */
+ "IDLE", /* H2_SESSION_ST_IDLE */
+ "BUSY", /* H2_SESSION_ST_BUSY */
+ "WAIT", /* H2_SESSION_ST_WAIT */
+ "LSHUTDOWN", /* H2_SESSION_ST_LOCAL_SHUTDOWN */
+ "RSHUTDOWN", /* H2_SESSION_ST_REMOTE_SHUTDOWN */
+};
+
+static const char *state_name(h2_session_state state)
+{
+ if (state >= (sizeof(StateNames)/sizeof(StateNames[0]))) {
+ return "unknown";
+ }
+ return StateNames[state];
+}
+
+static int is_accepting_streams(h2_session *session)
+{
+ switch (session->state) {
+ case H2_SESSION_ST_IDLE:
+ case H2_SESSION_ST_BUSY:
+ case H2_SESSION_ST_WAIT:
+ return 1;
+ default:
+ return 0;
+ }
+}
+
+static void transit(h2_session *session, const char *action, h2_session_state nstate)
+{
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+ "h2_session(%ld): transit [%s] -- %s --> [%s]", session->id,
+ state_name(session->state), action, state_name(nstate));
+ session->state = nstate;
+}
+
+static void h2_session_ev_init(h2_session *session, int arg, const char *msg)
+{
+ switch (session->state) {
+ case H2_SESSION_ST_INIT:
+ transit(session, "init", H2_SESSION_ST_BUSY);
+ break;
+
+ default:
+ /* nop */
+ break;
+ }
+}
+
+static void h2_session_ev_local_goaway(h2_session *session, int arg, const char *msg)
+{
+ switch (session->state) {
+ case H2_SESSION_ST_LOCAL_SHUTDOWN:
+ /* already did that? */
+ break;
+ case H2_SESSION_ST_IDLE:
+ case H2_SESSION_ST_REMOTE_SHUTDOWN:
+ /* all done */
+ transit(session, "local goaway", H2_SESSION_ST_DONE);
+ break;
+ default:
+ transit(session, "local goaway", H2_SESSION_ST_LOCAL_SHUTDOWN);
+ break;
+ }
+}
+
+static void h2_session_ev_remote_goaway(h2_session *session, int arg, const char *msg)
+{
+ switch (session->state) {
+ case H2_SESSION_ST_REMOTE_SHUTDOWN:
+ /* already received that? */
+ break;
+ case H2_SESSION_ST_IDLE:
+ case H2_SESSION_ST_LOCAL_SHUTDOWN:
+ /* all done */
+ transit(session, "remote goaway", H2_SESSION_ST_DONE);
+ break;
+ default:
+ transit(session, "remote goaway", H2_SESSION_ST_REMOTE_SHUTDOWN);
+ break;
+ }
+}
+
+static void h2_session_ev_conn_error(h2_session *session, int arg, const char *msg)
+{
+ switch (session->state) {
+ case H2_SESSION_ST_INIT:
+ case H2_SESSION_ST_DONE:
+ case H2_SESSION_ST_LOCAL_SHUTDOWN:
+ /* just leave */
+ transit(session, "conn error", H2_SESSION_ST_DONE);
+ break;
- got_streams = !h2_stream_set_is_empty(session->streams);
- if (got_streams) {
- h2_stream *stream;
-
- if (session->reprioritize) {
- h2_mplx_reprioritize(session->mplx, stream_pri_cmp, session);
- session->reprioritize = 0;
+ default:
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+ "h2_session(%ld): conn error -> shutdown", session->id);
+ h2_session_shutdown(session, arg, msg);
+ break;
+ }
+}
+
+static void h2_session_ev_proto_error(h2_session *session, int arg, const char *msg)
+{
+ switch (session->state) {
+ case H2_SESSION_ST_DONE:
+ case H2_SESSION_ST_LOCAL_SHUTDOWN:
+ /* just leave */
+ transit(session, "proto error", H2_SESSION_ST_DONE);
+ break;
+
+ default:
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+ "h2_session(%ld): proto error -> shutdown", session->id);
+ h2_session_shutdown(session, arg, msg);
+ break;
+ }
+}
+
+static void h2_session_ev_conn_timeout(h2_session *session, int arg, const char *msg)
+{
+ switch (session->state) {
+ case H2_SESSION_ST_LOCAL_SHUTDOWN:
+ transit(session, "conn timeout", H2_SESSION_ST_DONE);
+ break;
+ default:
+ h2_session_shutdown(session, arg, msg);
+ transit(session, "conn timeout", H2_SESSION_ST_DONE);
+ break;
+ }
+}
+
+static void h2_session_ev_no_io(h2_session *session, int arg, const char *msg)
+{
+ switch (session->state) {
+ case H2_SESSION_ST_BUSY:
+ /* nothing for input and output to do. If we remain
+ * in this state, we go into a tight loop and suck up
+ * CPU cycles. Ideally, we'd like to do a blocking read, but that
+ * is not possible if we have scheduled tasks and wait
+ * for them to produce something. */
+ if (h2_stream_set_is_empty(session->streams)) {
+ /* When we have no streams, no task event are possible,
+ * switch to blocking reads */
+ transit(session, "no io", H2_SESSION_ST_IDLE);
}
-
- if (!have_read && !have_written) {
- /* Nothing read or written. That means no data yet ready to
- * be send out. Slowly back off...
- */
- if (wait_micros == 0) {
- wait_micros = 10;
- }
+ else if (!h2_stream_set_has_unsubmitted(session->streams)
+ && !h2_stream_set_has_suspended(session->streams)) {
+ /* none of our streams is waiting for a response or
+ * new output data from task processing,
+ * switch to blocking reads. */
+ transit(session, "no io", H2_SESSION_ST_IDLE);
}
-
- if (h2_stream_set_has_open_input(session->streams)) {
- /* Check that any pending window updates are sent. */
- status = h2_mplx_in_update_windows(session->mplx, update_window, session);
- if (APR_STATUS_IS_EAGAIN(status)) {
- status = APR_SUCCESS;
- }
+ else {
+ /* Unable to do blocking reads, as we wait on events from
+ * task processing in other threads. Do a busy wait with
+ * backoff timer. */
+ transit(session, "no io", H2_SESSION_ST_WAIT);
}
-
- h2_session_resume_streams_with_data(session);
-
- if (h2_stream_set_has_unsubmitted(session->streams)) {
- /* If we have responses ready, submit them now. */
- while ((stream = h2_mplx_next_submit(session->mplx, session->streams))) {
- status = submit_response(session, stream);
+ break;
+ default:
+ /* nop */
+ break;
+ }
+}
+
+static void h2_session_ev_wait_timeout(h2_session *session, int arg, const char *msg)
+{
+ switch (session->state) {
+ case H2_SESSION_ST_WAIT:
+ transit(session, "wait timeout", H2_SESSION_ST_BUSY);
+ break;
+ default:
+ /* nop */
+ break;
+ }
+}
+
+static void h2_session_ev_stream_ready(h2_session *session, int arg, const char *msg)
+{
+ switch (session->state) {
+ case H2_SESSION_ST_WAIT:
+ transit(session, "stream ready", H2_SESSION_ST_BUSY);
+ break;
+ default:
+ /* nop */
+ break;
+ }
+}
+
+static void h2_session_ev_data_read(h2_session *session, int arg, const char *msg)
+{
+ switch (session->state) {
+ case H2_SESSION_ST_IDLE:
+ transit(session, "data read", H2_SESSION_ST_BUSY);
+ break;
+ /* fall through */
+ default:
+ /* nop */
+ break;
+ }
+}
+
+static void h2_session_ev_ngh2_done(h2_session *session, int arg, const char *msg)
+{
+ switch (session->state) {
+ case H2_SESSION_ST_DONE:
+ /* nop */
+ break;
+ default:
+ transit(session, "nghttp2 done", H2_SESSION_ST_DONE);
+ break;
+ }
+}
+
+static void dispatch_event(h2_session *session, h2_session_event_t ev,
+ int arg, const char *msg)
+{
+ switch (ev) {
+ case H2_SESSION_EV_INIT:
+ h2_session_ev_init(session, arg, msg);
+ break;
+ case H2_SESSION_EV_LOCAL_GOAWAY:
+ h2_session_ev_local_goaway(session, arg, msg);
+ break;
+ case H2_SESSION_EV_REMOTE_GOAWAY:
+ h2_session_ev_remote_goaway(session, arg, msg);
+ break;
+ case H2_SESSION_EV_CONN_ERROR:
+ h2_session_ev_conn_error(session, arg, msg);
+ break;
+ case H2_SESSION_EV_PROTO_ERROR:
+ h2_session_ev_proto_error(session, arg, msg);
+ break;
+ case H2_SESSION_EV_CONN_TIMEOUT:
+ h2_session_ev_conn_timeout(session, arg, msg);
+ break;
+ case H2_SESSION_EV_NO_IO:
+ h2_session_ev_no_io(session, arg, msg);
+ break;
+ case H2_SESSION_EV_WAIT_TIMEOUT:
+ h2_session_ev_wait_timeout(session, arg, msg);
+ break;
+ case H2_SESSION_EV_STREAM_READY:
+ h2_session_ev_stream_ready(session, arg, msg);
+ break;
+ case H2_SESSION_EV_DATA_READ:
+ h2_session_ev_data_read(session, arg, msg);
+ break;
+ case H2_SESSION_EV_NGH2_DONE:
+ h2_session_ev_ngh2_done(session, arg, msg);
+ break;
+ default:
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+ "h2_session(%ld): unknown event %d",
+ session->id, ev);
+ break;
+ }
+
+ if (session->state == H2_SESSION_ST_DONE) {
+ h2_mplx_abort(session->mplx);
+ }
+}
+
+static const int MAX_WAIT_MICROS = 200 * 1000;
+
+apr_status_t h2_session_process(h2_session *session, int async)
+{
+ apr_status_t status = APR_SUCCESS;
+ conn_rec *c = session->c;
+ int rv, have_written, have_read;
+
+ ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
+ "h2_session(%ld): process start, async=%d", session->id, async);
+
+ while (1) {
+ have_read = have_written = 0;
+
+ switch (session->state) {
+ case H2_SESSION_ST_INIT:
+ if (!h2_is_acceptable_connection(c, 1)) {
+ h2_session_shutdown(session, NGHTTP2_INADEQUATE_SECURITY, NULL);
+ }
+ else {
+ ap_update_child_status(c->sbh, SERVER_BUSY_READ, NULL);
+ status = h2_session_start(session, &rv);
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c,
+ "h2_session(%ld): started on %s:%d", session->id,
+ session->s->server_hostname,
+ c->local_addr->port);
+ if (status != APR_SUCCESS) {
+ dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+ }
+ dispatch_event(session, H2_SESSION_EV_INIT, 0, NULL);
}
- }
+ break;
+
+ case H2_SESSION_ST_IDLE:
+ h2_filter_cin_timeout_set(session->cin, session->keepalive_secs);
+ ap_update_child_status(c->sbh, SERVER_BUSY_KEEPALIVE, NULL);
+ status = h2_session_read(session, 1, 10);
+ if (status == APR_SUCCESS) {
+ have_read = 1;
+ dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
+ }
+ else if (status == APR_EAGAIN) {
+ /* nothing to read */
+ }
+ else if (APR_STATUS_IS_TIMEUP(status)) {
+ dispatch_event(session, H2_SESSION_EV_CONN_TIMEOUT, 0, NULL);
+ break;
+ }
+ else {
+ dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+ }
+ break;
+
+ case H2_SESSION_ST_BUSY:
+ case H2_SESSION_ST_LOCAL_SHUTDOWN:
+ case H2_SESSION_ST_REMOTE_SHUTDOWN:
+ if (nghttp2_session_want_read(session->ngh2)) {
+ ap_update_child_status(c->sbh, SERVER_BUSY_READ, NULL);
+ h2_filter_cin_timeout_set(session->cin, session->timeout_secs);
+ status = h2_session_read(session, 0, 10);
+ if (status == APR_SUCCESS) {
+ have_read = 1;
+ dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
+ }
+ else if (status == APR_EAGAIN) {
+ /* nothing to read */
+ }
+ else if (APR_STATUS_IS_TIMEUP(status)) {
+ dispatch_event(session, H2_SESSION_EV_CONN_TIMEOUT, 0, NULL);
+ break;
+ }
+ else {
+ dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+ }
+ }
+
+ if (!h2_stream_set_is_empty(session->streams)) {
+ /* resume any streams for which data is available again */
+ h2_session_resume_streams_with_data(session);
+ /* Submit any responses/push_promises that are ready */
+ status = h2_session_submit(session);
+ if (status == APR_SUCCESS) {
+ have_written = 1;
+ }
+ else if (status != APR_EAGAIN) {
+ dispatch_event(session, H2_SESSION_EV_CONN_ERROR,
+ H2_ERR_INTERNAL_ERROR, "submit error");
+ break;
+ }
+ /* send out window updates for our inputs */
+ status = h2_mplx_in_update_windows(session->mplx);
+ if (status != APR_SUCCESS && status != APR_EAGAIN) {
+ dispatch_event(session, H2_SESSION_EV_CONN_ERROR,
+ H2_ERR_INTERNAL_ERROR, "window update error");
+ break;
+ }
+ }
+
+ if (nghttp2_session_want_write(session->ngh2)) {
+ status = h2_session_send(session);
+ if (status == APR_SUCCESS) {
+ have_written = 1;
+ }
+ else {
+ dispatch_event(session, H2_SESSION_EV_CONN_ERROR,
+ H2_ERR_INTERNAL_ERROR, "writing");
+ break;
+ }
+ }
+
+ if (have_read || have_written) {
+ session->wait_us = 0;
+ }
+ else {
+ dispatch_event(session, H2_SESSION_EV_NO_IO, 0, NULL);
+ }
+ break;
+
+ case H2_SESSION_ST_WAIT:
+ session->wait_us = H2MAX(session->wait_us, 10);
+ if (APLOGctrace1(c)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+ "h2_session: wait for data, %ld micros",
+ (long)session->wait_us);
+ }
+
+ ap_log_cerror( APLOG_MARK, APLOG_TRACE2, status, c,
+ "h2_session(%ld): process -> trywait", session->id);
+ status = h2_mplx_out_trywait(session->mplx, session->wait_us,
+ session->iowait);
+ if (status == APR_SUCCESS) {
+ dispatch_event(session, H2_SESSION_EV_STREAM_READY, 0, NULL);
+ }
+ else if (status == APR_TIMEUP) {
+ /* nothing, increase timer for graceful backup */
+ session->wait_us = H2MIN(session->wait_us*2, MAX_WAIT_MICROS);
+ dispatch_event(session, H2_SESSION_EV_WAIT_TIMEOUT, 0, NULL);
+ }
+ else {
+ h2_session_shutdown(session, H2_ERR_INTERNAL_ERROR, "cond wait error");
+ }
+ break;
+
+ case H2_SESSION_ST_DONE:
+ status = APR_EOF;
+ goto out;
+
+ default:
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EGENERAL, c,
+ "h2_session(%ld): unknown state %d", session->id, session->state);
+ dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, 0, NULL);
+ break;
+ }
+
+ if (have_written) {
+ h2_conn_io_flush(&session->io);
+ }
+ else if (!nghttp2_session_want_read(session->ngh2)
+ && !nghttp2_session_want_write(session->ngh2)) {
+ dispatch_event(session, H2_SESSION_EV_NGH2_DONE, 0, NULL);
+ }
+ }
+
+out:
+ if (have_written) {
+ h2_conn_io_flush(&session->io);
+ }
+
+ ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
+ "h2_session(%ld): [%s] process returns",
+ session->id, state_name(session->state));
+
+ if ((session->state != H2_SESSION_ST_DONE)
+ && (APR_STATUS_IS_EOF(status)
+ || APR_STATUS_IS_ECONNRESET(status)
+ || APR_STATUS_IS_ECONNABORTED(status))) {
+ dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+ }
+
+ status = (session->state == H2_SESSION_ST_DONE)? APR_EOF : APR_SUCCESS;
+ if (session->state == H2_SESSION_ST_DONE) {
+ if (!session->eoc_written) {
+ session->eoc_written = 1;
+ h2_conn_io_write_eoc(&session->io,
+ h2_bucket_eoc_create(session->c->bucket_alloc, session));
}
-
}
-end_process:
return status;
}