X-Git-Url: https://granicus.if.org/sourcecode?a=blobdiff_plain;f=modules%2Fhttp2%2Fh2_session.c;h=34575a6b55a1e031a03d41a48be883cdb9977ccd;hb=17282190d0a2e4563ca2abac6baf614fa2a5f19e;hp=89a2eb1ffeebdca6680b6f027a3369f8c2684bca;hpb=364b3f79c4dd91ef7cc3041a015c6797296a172c;p=apache diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c index 89a2eb1ffe..34575a6b55 100644 --- a/modules/http2/h2_session.c +++ b/modules/http2/h2_session.c @@ -22,11 +22,14 @@ #include #include #include +#include #include "h2_private.h" #include "h2_bucket_eoc.h" #include "h2_bucket_eos.h" #include "h2_config.h" +#include "h2_ctx.h" +#include "h2_filter.h" #include "h2_h2.h" #include "h2_mplx.h" #include "h2_push.h" @@ -41,6 +44,7 @@ #include "h2_version.h" #include "h2_workers.h" + static int frame_print(const nghttp2_frame *frame, char *buffer, size_t maxlen); static int h2_session_status_from_apr_status(apr_status_t rv) @@ -57,13 +61,27 @@ static int h2_session_status_from_apr_status(apr_status_t rv) return NGHTTP2_ERR_PROTO; } +static void update_window(void *ctx, int stream_id, apr_off_t bytes_read) +{ + h2_session *session = (h2_session*)ctx; + nghttp2_session_consume(session->ngh2, stream_id, bytes_read); + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, + "h2_session(%ld-%d): consumed %ld bytes", + session->id, stream_id, (long)bytes_read); +} + +static apr_status_t h2_session_receive(void *ctx, + const char *data, apr_size_t len, + apr_size_t *readlen); + +static int is_accepting_streams(h2_session *session); +static void dispatch_event(h2_session *session, h2_session_event_t ev, + int err, const char *msg); + h2_stream *h2_session_open_stream(h2_session *session, int stream_id) { h2_stream * stream; apr_pool_t *stream_pool; - if (session->aborted) { - return NULL; - } if (session->spare) { stream_pool = session->spare; @@ -78,17 +96,14 @@ h2_stream *h2_session_open_stream(h2_session *session, int stream_id) h2_stream_set_add(session->streams, stream); if (H2_STREAM_CLIENT_INITIATED(stream_id) && stream_id > session->max_stream_received) { + ++session->requests_received; session->max_stream_received = stream->id; } return stream; } -static apr_status_t h2_session_flush(h2_session *session) -{ - session->flush = 0; - return h2_conn_io_flush(&session->io); -} +#ifdef H2_NG2_STREAM_API /** * Determine the importance of streams when scheduling tasks. @@ -143,11 +158,26 @@ static int stream_pri_cmp(int sid1, int sid2, void *ctx) return spri_cmp(sid1, s1, sid2, s2, session); } -static apr_status_t stream_end_headers(h2_session *session, - h2_stream *stream, int eos) +#else /* ifdef H2_NG2_STREAM_API */ + +/* In absence of nghttp2_stream API, which gives information about + * priorities since nghttp2 1.3.x, we just sort the streams by + * their identifier, aka. order of arrival. + */ +static int stream_pri_cmp(int sid1, int sid2, void *ctx) +{ + (void)ctx; + return sid1 - sid2; +} + +#endif /* (ifdef else) H2_NG2_STREAM_API */ + +static apr_status_t stream_schedule(h2_session *session, + h2_stream *stream, int eos) { (void)session; - return h2_stream_schedule(stream, eos, stream_pri_cmp, session); + return h2_stream_schedule(stream, eos, h2_session_push_enabled(session), + stream_pri_cmp, session); } /* @@ -181,16 +211,14 @@ static int on_invalid_frame_recv_cb(nghttp2_session *ngh2, h2_session *session = (h2_session *)userp; (void)ngh2; - if (session->aborted) { - return NGHTTP2_ERR_CALLBACK_FAILURE; - } - if (APLOGctrace2(session->c)) { + if (APLOGcdebug(session->c)) { char buffer[256]; frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0])); - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, - "h2_session: callback on_invalid_frame_recv error=%d %s", - error, buffer); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, + "h2_session(%ld): recv unknown FRAME[%s], frames=%ld/%ld (r/s)", + session->id, buffer, (long)session->frames_received, + (long)session->frames_sent); } return 0; } @@ -199,20 +227,21 @@ static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags, int32_t stream_id, const uint8_t *data, size_t len, void *userp) { - int rv; h2_session *session = (h2_session *)userp; + apr_status_t status = APR_SUCCESS; h2_stream * stream; - apr_status_t status; + int rv; (void)flags; - if (session->aborted) { - return NGHTTP2_ERR_CALLBACK_FAILURE; + if (!is_accepting_streams(session)) { + /* ignore */ + return 0; } + stream = h2_session_get_stream(session, stream_id); if (!stream) { - ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c, - APLOGNO(02919) - "h2_session: stream(%ld-%d): on_data_chunk for unknown stream", + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, + "h2_stream(%ld-%d): on_data_chunk for unknown stream", session->id, (int)stream_id); rv = nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE, stream_id, NGHTTP2_INTERNAL_ERROR); @@ -224,9 +253,10 @@ static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags, status = h2_stream_write_data(stream, (const char *)data, len); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c, - "h2_stream(%ld-%d): written DATA, length %d", - session->id, stream_id, (int)len); + "h2_stream(%ld-%d): data_chunk_recv, written %ld bytes", + session->id, stream_id, (long)len); if (status != APR_SUCCESS) { + update_window(session, stream_id, len); rv = nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE, stream_id, H2_STREAM_RST(stream, H2_ERR_INTERNAL_ERROR)); if (nghttp2_is_fatal(rv)) { @@ -236,78 +266,12 @@ static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags, return 0; } -static int before_frame_send_cb(nghttp2_session *ngh2, - const nghttp2_frame *frame, - void *userp) -{ - h2_session *session = (h2_session *)userp; - (void)ngh2; - - if (session->aborted) { - return NGHTTP2_ERR_CALLBACK_FAILURE; - } - /* Set the need to flush output when we have added one of the - * following frame types */ - switch (frame->hd.type) { - case NGHTTP2_RST_STREAM: - case NGHTTP2_PUSH_PROMISE: - case NGHTTP2_GOAWAY: - session->flush = 1; - break; - default: - break; - - } - if (APLOGctrace2(session->c)) { - char buffer[256]; - frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0])); - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, - "h2_session(%ld): before_frame_send %s", - session->id, buffer); - } - return 0; -} - -static int on_frame_send_cb(nghttp2_session *ngh2, - const nghttp2_frame *frame, - void *userp) -{ - h2_session *session = (h2_session *)userp; - (void)ngh2; - if (APLOGctrace2(session->c)) { - char buffer[256]; - frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0])); - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, - "h2_session(%ld): on_frame_send %s", - session->id, buffer); - } - return 0; -} - -static int on_frame_not_send_cb(nghttp2_session *ngh2, - const nghttp2_frame *frame, - int lib_error_code, void *userp) -{ - h2_session *session = (h2_session *)userp; - (void)ngh2; - - if (APLOGctrace2(session->c)) { - char buffer[256]; - - frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0])); - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, - "h2_session: callback on_frame_not_send error=%d %s", - lib_error_code, buffer); - } - return 0; -} - static apr_status_t stream_release(h2_session *session, h2_stream *stream, uint32_t error_code) { if (!error_code) { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, "h2_stream(%ld-%d): handled, closing", session->id, (int)stream->id); if (stream->id > session->max_stream_handled) { @@ -334,31 +298,29 @@ static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id, h2_stream *stream; (void)ngh2; - if (session->aborted) { - return NGHTTP2_ERR_CALLBACK_FAILURE; - } stream = h2_session_get_stream(session, stream_id); if (stream) { stream_release(session, stream, error_code); } - - if (error_code) { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, - "h2_stream(%ld-%d): closed, error=%d", - session->id, (int)stream_id, error_code); - } - return 0; } static int on_begin_headers_cb(nghttp2_session *ngh2, const nghttp2_frame *frame, void *userp) { + h2_session *session = (h2_session *)userp; h2_stream *s; - /* This starts a new stream. */ + /* We may see HEADERs at the start of a stream or after all DATA + * streams to carry trailers. */ (void)ngh2; - s = h2_session_open_stream((h2_session *)userp, frame->hd.stream_id); + s = h2_session_get_stream(session, frame->hd.stream_id); + if (s) { + /* nop */ + } + else { + s = h2_session_open_stream((h2_session *)userp, frame->hd.stream_id); + } return s? 0 : NGHTTP2_ERR_CALLBACK_FAILURE; } @@ -374,8 +336,9 @@ static int on_header_cb(nghttp2_session *ngh2, const nghttp2_frame *frame, (void)ngh2; (void)flags; - if (session->aborted) { - return NGHTTP2_ERR_CALLBACK_FAILURE; + if (!is_accepting_streams(session)) { + /* just ignore */ + return 0; } stream = h2_session_get_stream(session, frame->hd.stream_id); @@ -409,21 +372,40 @@ static int on_frame_recv_cb(nghttp2_session *ng2s, apr_status_t status = APR_SUCCESS; h2_stream *stream; - if (session->aborted) { - return NGHTTP2_ERR_CALLBACK_FAILURE; + if (APLOGcdebug(session->c)) { + char buffer[256]; + + frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0])); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, + "h2_session(%ld): recv FRAME[%s], frames=%ld/%ld (r/s)", + session->id, buffer, (long)session->frames_received, + (long)session->frames_sent); } - - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, - "h2_session(%ld): on_frame_rcv #%ld, type=%d", session->id, - (long)session->frames_received, frame->hd.type); ++session->frames_received; switch (frame->hd.type) { case NGHTTP2_HEADERS: + /* This can be HEADERS for a new stream, defining the request, + * or HEADER may come after DATA at the end of a stream as in + * trailers */ stream = h2_session_get_stream(session, frame->hd.stream_id); if (stream) { int eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM); - status = stream_end_headers(session, stream, eos); + + if (h2_stream_is_scheduled(stream)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, + "h2_stream(%ld-%d): TRAILER, eos=%d", + session->id, frame->hd.stream_id, eos); + if (eos) { + status = h2_stream_close_input(stream); + } + } + else { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, + "h2_stream(%ld-%d): HEADER, eos=%d", + session->id, frame->hd.stream_id, eos); + status = stream_schedule(session, stream, eos); + } } else { status = APR_EINVAL; @@ -433,6 +415,10 @@ static int on_frame_recv_cb(nghttp2_session *ng2s, stream = h2_session_get_stream(session, frame->hd.stream_id); if (stream) { int eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, + "h2_stream(%ld-%d): DATA, len=%ld, eos=%d", + session->id, frame->hd.stream_id, + (long)frame->hd.length, eos); if (eos) { status = h2_stream_close_input(stream); } @@ -458,6 +444,22 @@ static int on_frame_recv_cb(nghttp2_session *ng2s, session->id, (int)frame->hd.stream_id, frame->window_update.window_size_increment); break; + case NGHTTP2_RST_STREAM: + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, + "h2_session(%ld-%d): RST_STREAM by client, errror=%d", + session->id, (int)frame->hd.stream_id, + (int)frame->rst_stream.error_code); + stream = h2_session_get_stream(session, frame->hd.stream_id); + if (stream && stream->initiated_on) { + ++session->pushes_reset; + } + else { + ++session->streams_reset; + } + break; + case NGHTTP2_GOAWAY: + dispatch_event(session, H2_SESSION_EV_REMOTE_GOAWAY, 0, NULL); + break; default: if (APLOGctrace2(session->c)) { char buffer[256]; @@ -513,10 +515,6 @@ static int on_send_data_cb(nghttp2_session *ngh2, (void)ngh2; (void)source; - if (session->aborted) { - return NGHTTP2_ERR_CALLBACK_FAILURE; - } - if (frame->data.padlen > H2_MAX_PADLEN) { return NGHTTP2_ERR_PROTO; } @@ -599,6 +597,24 @@ static int on_send_data_cb(nghttp2_session *ngh2, return h2_session_status_from_apr_status(status); } +static int on_frame_send_cb(nghttp2_session *ngh2, + const nghttp2_frame *frame, + void *user_data) +{ + h2_session *session = user_data; + if (APLOGcdebug(session->c)) { + char buffer[256]; + + frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0])); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, + "h2_session(%ld): sent FRAME[%s], frames=%ld/%ld (r/s)", + session->id, buffer, (long)session->frames_received, + (long)session->frames_sent); + } + ++session->frames_sent; + return 0; +} + #define NGH2_SET_CALLBACK(callbacks, name, fn)\ nghttp2_session_callbacks_set_##name##_callback(callbacks, fn) @@ -616,27 +632,166 @@ static apr_status_t init_callbacks(conn_rec *c, nghttp2_session_callbacks **pcb) NGH2_SET_CALLBACK(*pcb, on_frame_recv, on_frame_recv_cb); NGH2_SET_CALLBACK(*pcb, on_invalid_frame_recv, on_invalid_frame_recv_cb); NGH2_SET_CALLBACK(*pcb, on_data_chunk_recv, on_data_chunk_recv_cb); - NGH2_SET_CALLBACK(*pcb, before_frame_send, before_frame_send_cb); - NGH2_SET_CALLBACK(*pcb, on_frame_send, on_frame_send_cb); - NGH2_SET_CALLBACK(*pcb, on_frame_not_send, on_frame_not_send_cb); NGH2_SET_CALLBACK(*pcb, on_stream_close, on_stream_close_cb); NGH2_SET_CALLBACK(*pcb, on_begin_headers, on_begin_headers_cb); NGH2_SET_CALLBACK(*pcb, on_header, on_header_cb); NGH2_SET_CALLBACK(*pcb, send_data, on_send_data_cb); + NGH2_SET_CALLBACK(*pcb, on_frame_send, on_frame_send_cb); + + return APR_SUCCESS; +} + +static void h2_session_cleanup(h2_session *session) +{ + AP_DEBUG_ASSERT(session); + /* This is an early cleanup of the session that may + * discard what is no longer necessary for *new* streams + * and general HTTP/2 processing. + * At this point, all frames are in transit or somehwere in + * our buffers or passed down output filters. + * h2 streams might still being written out. + */ + if (session->c) { + h2_ctx_clear(session->c); + } + if (session->ngh2) { + nghttp2_session_del(session->ngh2); + session->ngh2 = NULL; + } + if (session->spare) { + apr_pool_destroy(session->spare); + session->spare = NULL; + } +} + +static void h2_session_destroy(h2_session *session) +{ + AP_DEBUG_ASSERT(session); + h2_session_cleanup(session); + + if (APLOGctrace1(session->c)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, + "h2_session(%ld): destroy, %d streams open", + session->id, (int)h2_stream_set_size(session->streams)); + } + if (session->mplx) { + h2_mplx_set_consumed_cb(session->mplx, NULL, NULL); + h2_mplx_release_and_join(session->mplx, session->iowait); + session->mplx = NULL; + } + if (session->streams) { + h2_stream_set_destroy(session->streams); + session->streams = NULL; + } + if (session->pool) { + apr_pool_destroy(session->pool); + } +} + +static apr_status_t h2_session_shutdown(h2_session *session, int reason, const char *msg) +{ + apr_status_t status = APR_SUCCESS; + const char *err = msg; + + AP_DEBUG_ASSERT(session); + if (!err && reason) { + err = nghttp2_strerror(reason); + } + nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, + h2_mplx_get_max_stream_started(session->mplx), + reason, (uint8_t*)err, err? strlen(err):0); + status = nghttp2_session_send(session->ngh2); + h2_conn_io_flush(&session->io); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, + "session(%ld): sent GOAWAY, err=%d, msg=%s", + session->id, reason, err? err : ""); + dispatch_event(session, H2_SESSION_EV_LOCAL_GOAWAY, reason, err); + return status; +} + +static apr_status_t session_pool_cleanup(void *data) +{ + h2_session *session = data; + /* On a controlled connection shutdown, this gets never + * called as we deregister and destroy our pool manually. + * However when we have an async mpm, and handed it our idle + * connection, it will just cleanup once the connection is closed + * from the other side (and sometimes even from out side) and + * here we arrive then. + */ + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, + "session(%ld): pool_cleanup", session->id); + if (session->state != H2_SESSION_ST_DONE + && session->state != H2_SESSION_ST_LOCAL_SHUTDOWN) { + /* Not good. The connection is being torn down and we have + * not sent a goaway. This is considered a protocol error and + * the client has to assume that any streams "in flight" may have + * been processed and are not safe to retry. + * As clients with idle connection may only learn about a closed + * connection when sending the next request, this has the effect + * that at least this one request will fail. + */ + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, session->c, + "session(%ld): connection disappeared without proper " + "goodbye, clients will be confused, should not happen", + session->id); + } + /* keep us from destroying the pool, since that is already ongoing. */ + session->pool = NULL; + h2_session_destroy(session); return APR_SUCCESS; } +static void *session_malloc(size_t size, void *ctx) +{ + h2_session *session = ctx; + ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, session->c, + "h2_session(%ld): malloc(%ld)", + session->id, (long)size); + return malloc(size); +} + +static void session_free(void *p, void *ctx) +{ + h2_session *session = ctx; + + ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, session->c, + "h2_session(%ld): free()", + session->id); + free(p); +} + +static void *session_calloc(size_t n, size_t size, void *ctx) +{ + h2_session *session = ctx; + + ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, session->c, + "h2_session(%ld): calloc(%ld, %ld)", + session->id, (long)n, (long)size); + return calloc(n, size); +} + +static void *session_realloc(void *p, size_t size, void *ctx) +{ + h2_session *session = ctx; + ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, session->c, + "h2_session(%ld): realloc(%ld)", + session->id, (long)size); + return realloc(p, size); +} + static h2_session *h2_session_create_int(conn_rec *c, request_rec *r, - h2_config *config, + h2_ctx *ctx, h2_workers *workers) { nghttp2_session_callbacks *callbacks = NULL; nghttp2_option *options = NULL; + uint32_t n; apr_pool_t *pool = NULL; - apr_status_t status = apr_pool_create(&pool, r? r->pool : c->pool); + apr_status_t status = apr_pool_create(&pool, c->pool); h2_session *session; if (status != APR_SUCCESS) { return NULL; @@ -645,14 +800,29 @@ static h2_session *h2_session_create_int(conn_rec *c, session = apr_pcalloc(pool, sizeof(h2_session)); if (session) { int rv; + nghttp2_mem *mem; + session->id = c->id; session->c = c; session->r = r; + session->s = h2_ctx_server_get(ctx); + session->config = h2_config_sget(session->s); + + session->state = H2_SESSION_ST_INIT; - session->max_stream_count = h2_config_geti(config, H2_CONF_MAX_STREAMS); - session->max_stream_mem = h2_config_geti(config, H2_CONF_STREAM_MAX_MEM); - session->pool = pool; + apr_pool_pre_cleanup_register(pool, session, session_pool_cleanup); + + session->max_stream_count = h2_config_geti(session->config, H2_CONF_MAX_STREAMS); + session->max_stream_mem = h2_config_geti(session->config, H2_CONF_STREAM_MAX_MEM); + session->timeout_secs = h2_config_geti(session->config, H2_CONF_TIMEOUT_SECS); + if (session->timeout_secs <= 0) { + session->timeout_secs = apr_time_sec(session->s->timeout); + } + session->keepalive_secs = h2_config_geti(session->config, H2_CONF_KEEPALIVE_SECS); + if (session->keepalive_secs <= 0) { + session->keepalive_secs = apr_time_sec(session->s->keep_alive_timeout); + } status = apr_thread_cond_create(&session->iowait, session->pool); if (status != APR_SUCCESS) { @@ -662,9 +832,15 @@ static h2_session *h2_session_create_int(conn_rec *c, session->streams = h2_stream_set_create(session->pool, session->max_stream_count); session->workers = workers; - session->mplx = h2_mplx_create(c, session->pool, workers); + session->mplx = h2_mplx_create(c, session->pool, session->config, workers); - h2_conn_io_init(&session->io, c); + h2_mplx_set_consumed_cb(session->mplx, update_window, session); + + /* Install the connection input filter that feeds the session */ + session->cin = h2_filter_cin_create(session->pool, h2_session_receive, session); + ap_add_input_filter("H2_IN", session->cin, r, c); + + h2_conn_io_init(&session->io, c, session->config, session->pool); session->bbtmp = apr_brigade_create(session->pool, c->bucket_alloc); status = init_callbacks(c, &callbacks); @@ -683,16 +859,27 @@ static h2_session *h2_session_create_int(conn_rec *c, h2_session_destroy(session); return NULL; } - nghttp2_option_set_peer_max_concurrent_streams(options, (uint32_t)session->max_stream_count); - /* We need to handle window updates ourself, otherwise we * get flooded by nghttp2. */ nghttp2_option_set_no_auto_window_update(options, 1); - rv = nghttp2_session_server_new2(&session->ngh2, callbacks, - session, options); + if (APLOGctrace6(c)) { + mem = apr_pcalloc(session->pool, sizeof(nghttp2_mem)); + mem->mem_user_data = session; + mem->malloc = session_malloc; + mem->free = session_free; + mem->calloc = session_calloc; + mem->realloc = session_realloc; + + rv = nghttp2_session_server_new3(&session->ngh2, callbacks, + session, options, mem); + } + else { + rv = nghttp2_session_server_new2(&session->ngh2, callbacks, + session, options); + } nghttp2_session_callbacks_del(callbacks); nghttp2_option_del(options); @@ -703,144 +890,56 @@ static h2_session *h2_session_create_int(conn_rec *c, h2_session_destroy(session); return NULL; } + + n = h2_config_geti(session->config, H2_CONF_PUSH_DIARY_SIZE); + session->push_diary = h2_push_diary_create(session->pool, n); + if (APLOGcdebug(c)) { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, + "session(%ld) created, timeout=%d, keepalive_timeout=%d, " + "max_streams=%d, stream_mem=%d, push_diary(type=%d,N=%d)", + session->id, session->timeout_secs, session->keepalive_secs, + (int)session->max_stream_count, (int)session->max_stream_mem, + session->push_diary->dtype, + (int)session->push_diary->N); + } } return session; } -h2_session *h2_session_create(conn_rec *c, h2_config *config, - h2_workers *workers) -{ - return h2_session_create_int(c, NULL, config, workers); -} - -h2_session *h2_session_rcreate(request_rec *r, h2_config *config, - h2_workers *workers) +h2_session *h2_session_create(conn_rec *c, h2_ctx *ctx, h2_workers *workers) { - return h2_session_create_int(r->connection, r, config, workers); + return h2_session_create_int(c, NULL, ctx, workers); } -void h2_session_destroy(h2_session *session) +h2_session *h2_session_rcreate(request_rec *r, h2_ctx *ctx, h2_workers *workers) { - AP_DEBUG_ASSERT(session); - if (session->mplx) { - h2_mplx_release_and_join(session->mplx, session->iowait); - session->mplx = NULL; - } - if (session->streams) { - if (!h2_stream_set_is_empty(session->streams)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, - "h2_session(%ld): destroy, %d streams open", - session->id, (int)h2_stream_set_size(session->streams)); - } - h2_stream_set_destroy(session->streams); - session->streams = NULL; - } - if (session->ngh2) { - nghttp2_session_del(session->ngh2); - session->ngh2 = NULL; - } - if (session->spare) { - apr_pool_destroy(session->spare); - session->spare = NULL; - } - if (session->pool) { - apr_pool_destroy(session->pool); - } + return h2_session_create_int(r->connection, r, ctx, workers); } -void h2_session_cleanup(h2_session *session) +void h2_session_eoc_callback(h2_session *session) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, "session(%ld): cleanup and destroy", session->id); + apr_pool_cleanup_kill(session->pool, session, session_pool_cleanup); h2_session_destroy(session); } -static apr_status_t h2_session_abort_int(h2_session *session, int reason) -{ - AP_DEBUG_ASSERT(session); - if (!session->aborted) { - session->aborted = 1; - - if (session->ngh2) { - if (NGHTTP2_ERR_EOF == reason) { - /* This is our way of indication that the connection is - * gone. No use to send any GOAWAY frames. */ - nghttp2_session_terminate_session(session->ngh2, reason); - } - else if (!reason) { - nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, - session->max_stream_received, - reason, NULL, 0); - nghttp2_session_send(session->ngh2); - } - else { - const char *err = nghttp2_strerror(reason); - - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, - "session(%ld): aborting session, reason=%d %s", - session->id, reason, err); - - /* The connection might still be there and we shut down - * with GOAWAY and reason information. */ - nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, - session->max_stream_received, - reason, (const uint8_t *)err, - strlen(err)); - nghttp2_session_send(session->ngh2); - } - h2_session_flush(session); - } - h2_mplx_abort(session->mplx); - } - return APR_SUCCESS; -} - -apr_status_t h2_session_abort(h2_session *session, apr_status_t reason, int rv) +static apr_status_t h2_session_start(h2_session *session, int *rv) { - AP_DEBUG_ASSERT(session); - if (rv == 0) { - rv = NGHTTP2_ERR_PROTO; - switch (reason) { - case APR_ENOMEM: - rv = NGHTTP2_ERR_NOMEM; - break; - case APR_SUCCESS: /* all fine, just... */ - case APR_EOF: /* client closed its end... */ - case APR_TIMEUP: /* got bored waiting... */ - rv = 0; /* ...gracefully shut down */ - break; - case APR_EBADF: /* connection unusable, terminate silently */ - default: - if (APR_STATUS_IS_ECONNABORTED(reason) - || APR_STATUS_IS_ECONNRESET(reason) - || APR_STATUS_IS_EBADF(reason)) { - rv = NGHTTP2_ERR_EOF; - } - break; - } - } - return h2_session_abort_int(session, rv); -} - -apr_status_t h2_session_start(h2_session *session, int *rv) -{ - apr_status_t status = APR_SUCCESS; - h2_config *config; - nghttp2_settings_entry settings[3]; - + apr_status_t status = APR_SUCCESS; + nghttp2_settings_entry settings[3]; + size_t slen; + int win_size; + AP_DEBUG_ASSERT(session); /* Start the conversation by submitting our SETTINGS frame */ *rv = 0; - config = h2_config_get(session->c); if (session->r) { const char *s, *cs; apr_size_t dlen; h2_stream * stream; - /* better for vhost matching */ - config = h2_config_rget(session->r); - /* 'h2c' mode: we should have a 'HTTP2-Settings' header with * base64 encoded client settings. */ s = apr_table_get(session->r->headers_in, "HTTP2-Settings"); @@ -884,29 +983,54 @@ apr_status_t h2_session_start(h2_session *session, int *rv) if (status != APR_SUCCESS) { return status; } - status = stream_end_headers(session, stream, 1); + status = stream_schedule(session, stream, 1); if (status != APR_SUCCESS) { return status; } } - settings[0].settings_id = NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS; - settings[0].value = (uint32_t)session->max_stream_count; - settings[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE; - settings[1].value = h2_config_geti(config, H2_CONF_WIN_SIZE); - settings[2].settings_id = NGHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE; - settings[2].value = 64*1024; + slen = 0; + settings[slen].settings_id = NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS; + settings[slen].value = (uint32_t)session->max_stream_count; + ++slen; + win_size = h2_config_geti(session->config, H2_CONF_WIN_SIZE); + if (win_size != H2_INITIAL_WINDOW_SIZE) { + settings[slen].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE; + settings[slen].value = win_size; + ++slen; + } + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, + "h2_session(%ld): start, INITIAL_WINDOW_SIZE=%ld, " + "MAX_CONCURRENT_STREAMS=%d", + session->id, (long)win_size, (int)session->max_stream_count); *rv = nghttp2_submit_settings(session->ngh2, NGHTTP2_FLAG_NONE, - settings, - sizeof(settings)/sizeof(settings[0])); + settings, slen); if (*rv != 0) { status = APR_EGENERAL; ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c, APLOGNO(02935) "nghttp2_submit_settings: %s", nghttp2_strerror(*rv)); } - + else { + /* use maximum possible value for connection window size. We are only + * interested in per stream flow control. which have the initial window + * size configured above. + * Therefore, for our use, the connection window can only get in the + * way. Example: if we allow 100 streams with a 32KB window each, we + * buffer up to 3.2 MB of data. Unless we do separate connection window + * interim updates, any smaller connection window will lead to blocking + * in DATA flow. + */ + *rv = nghttp2_submit_window_update(session->ngh2, NGHTTP2_FLAG_NONE, + 0, NGHTTP2_MAX_WINDOW_SIZE - win_size); + if (*rv != 0) { + status = APR_EGENERAL; + ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c, + APLOGNO(02970) "nghttp2_submit_window_update: %s", + nghttp2_strerror(*rv)); + } + } return status; } @@ -915,7 +1039,8 @@ typedef struct { int resume_count; } resume_ctx; -static int resume_on_data(void *ctx, h2_stream *stream) { +static int resume_on_data(void *ctx, h2_stream *stream) +{ resume_ctx *rctx = (resume_ctx*)ctx; h2_session *session = rctx->session; AP_DEBUG_ASSERT(session); @@ -931,17 +1056,18 @@ static int resume_on_data(void *ctx, h2_stream *stream) { ap_log_cerror(APLOG_MARK, nghttp2_is_fatal(rv)? APLOG_ERR : APLOG_DEBUG, 0, session->c, APLOGNO(02936) - "h2_stream(%ld-%d): resuming stream %s", - session->id, stream->id, nghttp2_strerror(rv)); + "h2_stream(%ld-%d): resuming %s", + session->id, stream->id, rv? nghttp2_strerror(rv) : ""); } } return 1; } -static int h2_session_resume_streams_with_data(h2_session *session) { +static int h2_session_resume_streams_with_data(h2_session *session) +{ AP_DEBUG_ASSERT(session); if (!h2_stream_set_is_empty(session->streams) - && session->mplx && !session->aborted) { + && session->mplx && !session->mplx->aborted) { resume_ctx ctx; ctx.session = session; @@ -955,12 +1081,6 @@ static int h2_session_resume_streams_with_data(h2_session *session) { return 0; } -static void update_window(void *ctx, int stream_id, apr_off_t bytes_read) -{ - h2_session *session = (h2_session*)ctx; - nghttp2_session_consume(session->ngh2, stream_id, bytes_read); -} - h2_stream *h2_session_get_stream(h2_session *session, int stream_id) { if (!session->last_stream || stream_id != session->last_stream->id) { @@ -969,50 +1089,6 @@ h2_stream *h2_session_get_stream(h2_session *session, int stream_id) return session->last_stream; } -/* h2_io_on_read_cb implementation that offers the data read - * directly to the session for consumption. - */ -static apr_status_t session_receive(const char *data, apr_size_t len, - apr_size_t *readlen, int *done, - void *puser) -{ - h2_session *session = (h2_session *)puser; - AP_DEBUG_ASSERT(session); - if (len > 0) { - ssize_t n = nghttp2_session_mem_recv(session->ngh2, - (const uint8_t *)data, len); - if (n < 0) { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, APR_EGENERAL, - session->c, - "h2_session: nghttp2_session_mem_recv error %d", - (int)n); - if (nghttp2_is_fatal((int)n)) { - *done = 1; - h2_session_abort_int(session, (int)n); - return APR_EGENERAL; - } - } - else { - *readlen = n; - } - } - return APR_SUCCESS; -} - -apr_status_t h2_session_close(h2_session *session) -{ - AP_DEBUG_ASSERT(session); - if (!session->aborted) { - h2_session_abort_int(session, 0); - } - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0,session->c, - "h2_session: closing, writing eoc"); - h2_conn_io_writeb(&session->io, - h2_bucket_eoc_create(session->c->bucket_alloc, - session)); - return h2_session_flush(session); -} - static ssize_t stream_data_cb(nghttp2_session *ng2s, int32_t stream_id, uint8_t *buf, @@ -1032,9 +1108,7 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s, * to find out how much of the requested length we can send without * blocking. * Indicate EOS when we encounter it or DEFERRED if the stream - * should be suspended. - * TODO: for handling of TRAILERS, the EOF indication needs - * to be aware of that. + * should be suspended. Beware of trailers. */ (void)ng2s; @@ -1072,7 +1146,7 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s, nread = 0; h2_stream_set_suspended(stream, 1); ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, - "h2_stream(%ld-%d): suspending stream", + "h2_stream(%ld-%d): suspending", session->id, (int)stream_id); return NGHTTP2_ERR_DEFERRED; @@ -1090,6 +1164,22 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s, } if (eos) { + apr_table_t *trailers = h2_stream_get_trailers(stream); + if (trailers && !apr_is_empty_table(trailers)) { + h2_ngheader *nh; + int rv; + + nh = h2_util_ngheader_make(stream->pool, trailers); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, + "h2_stream(%ld-%d): submit %d trailers", + session->id, (int)stream_id,(int) nh->nvlen); + rv = nghttp2_submit_trailer(ng2s, stream->id, nh->nv, nh->nvlen); + if (rv < 0) { + nread = rv; + } + *data_flags |= NGHTTP2_DATA_FLAG_NO_END_STREAM; + } + *data_flags |= NGHTTP2_DATA_FLAG_EOF; } @@ -1110,18 +1200,19 @@ typedef struct { static apr_status_t submit_response(h2_session *session, h2_stream *stream) { apr_status_t status = APR_SUCCESS; + h2_response *response = h2_stream_get_response(stream); int rv = 0; AP_DEBUG_ASSERT(session); AP_DEBUG_ASSERT(stream); - AP_DEBUG_ASSERT(stream->response || stream->rst_error); + AP_DEBUG_ASSERT(response || stream->rst_error); if (stream->submitted) { rv = NGHTTP2_PROTOCOL_ERROR; } - else if (stream->response && stream->response->header) { + else if (response && response->headers) { nghttp2_data_provider provider; - h2_response *response = stream->response; h2_ngheader *ngh; + const h2_priority *prio; memset(&provider, 0, sizeof(provider)); provider.source.fd = stream->id; @@ -1131,17 +1222,14 @@ static apr_status_t submit_response(h2_session *session, h2_stream *stream) "h2_stream(%ld-%d): submit response %d", session->id, stream->id, response->http_status); - ngh = h2_util_ngheader_make_res(stream->pool, response->http_status, - response->header); - rv = nghttp2_submit_response(session->ngh2, response->stream_id, - ngh->nv, ngh->nvlen, &provider); - - /* If the submit worked, - * and this stream is not a pushed one itself, + /* If this stream is not a pushed one itself, * and HTTP/2 server push is enabled here, * and the response is in the range 200-299 *), * and the remote side has pushing enabled, * -> find and perform any pushes on this stream + * *before* we submit the stream response itself. + * This helps clients avoid opening new streams on Link + * headers that get pushed right afterwards. * * *) the response code is relevant, as we do not want to * make pushes on 401 or 403 codes, neiterh on 301/302 @@ -1149,14 +1237,23 @@ static apr_status_t submit_response(h2_session *session, h2_stream *stream) * as the client, having this resource in its cache, might * also have the pushed ones as well. */ - if (!rv - && !stream->initiated_on - && h2_config_geti(h2_config_get(session->c), H2_CONF_PUSH) + if (!stream->initiated_on && H2_HTTP_2XX(response->http_status) && h2_session_push_enabled(session)) { h2_stream_submit_pushes(stream); } + + prio = h2_stream_get_priority(stream); + if (prio) { + h2_session_set_prio(session, stream, prio); + /* no showstopper if that fails for some reason */ + } + + ngh = h2_util_ngheader_make_res(stream->pool, response->http_status, + response->headers); + rv = nghttp2_submit_response(session->ngh2, response->stream_id, + ngh->nv, ngh->nvlen, &provider); } else { int err = H2_STREAM_RST(stream, H2_ERR_PROTOCOL_ERROR); @@ -1170,10 +1267,16 @@ static apr_status_t submit_response(h2_session *session, h2_stream *stream) } stream->submitted = 1; + if (stream->initiated_on) { + ++session->pushes_submitted; + } + else { + ++session->responses_submitted; + } if (nghttp2_is_fatal(rv)) { status = APR_EGENERAL; - h2_session_abort_int(session, rv); + dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, rv, nghttp2_strerror(rv)); ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c, APLOGNO(02940) "submit_response: %s", nghttp2_strerror(rv)); @@ -1191,38 +1294,38 @@ struct h2_stream *h2_session_push(h2_session *session, h2_stream *is, int nid; ngh = h2_util_ngheader_make_req(is->pool, push->req); - nid = nghttp2_submit_push_promise(session->ngh2, 0, push->initiating_id, + nid = nghttp2_submit_push_promise(session->ngh2, 0, is->id, ngh->nv, ngh->nvlen, NULL); - if (nid <= 0) { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, "h2_stream(%ld-%d): submitting push promise fail: %s", - session->id, push->initiating_id, - nghttp2_strerror(nid)); + session->id, is->id, nghttp2_strerror(nid)); return NULL; } + ++session->pushes_promised; ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, - "h2_stream(%ld-%d): promised new stream %d for %s %s", - session->id, push->initiating_id, nid, - push->req->method, push->req->path); + "h2_stream(%ld-%d): SERVER_PUSH %d for %s %s on %d", + session->id, is->id, nid, + push->req->method, push->req->path, is->id); stream = h2_session_open_stream(session, nid); if (stream) { h2_stream_set_h2_request(stream, is->id, push->req); - status = stream_end_headers(session, stream, 1); + status = stream_schedule(session, stream, 1); if (status != APR_SUCCESS) { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c, "h2_stream(%ld-%d): scheduling push stream", session->id, stream->id); h2_stream_cleanup(stream); stream = NULL; } + ++session->unsent_promises; } else { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, "h2_stream(%ld-%d): failed to create stream obj %d", - session->id, push->initiating_id, nid); + session->id, is->id, nid); } if (!stream) { @@ -1234,15 +1337,125 @@ struct h2_stream *h2_session_push(h2_session *session, h2_stream *is, return stream; } +static int valid_weight(float f) +{ + int w = (int)f; + return (w < NGHTTP2_MIN_WEIGHT? NGHTTP2_MIN_WEIGHT : + (w > NGHTTP2_MAX_WEIGHT)? NGHTTP2_MAX_WEIGHT : w); +} + +apr_status_t h2_session_set_prio(h2_session *session, h2_stream *stream, + const h2_priority *prio) +{ + apr_status_t status = APR_SUCCESS; +#ifdef H2_NG2_CHANGE_PRIO + nghttp2_stream *s_grandpa, *s_parent, *s; + + s = nghttp2_session_find_stream(session->ngh2, stream->id); + if (!s) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, + "h2_stream(%ld-%d): lookup of nghttp2_stream failed", + session->id, stream->id); + return APR_EINVAL; + } + + s_parent = nghttp2_stream_get_parent(s); + if (s_parent) { + nghttp2_priority_spec ps; + int id_parent, id_grandpa, w_parent, w, rv = 0; + char *ptype = "AFTER"; + h2_dependency dep = prio->dependency; + + id_parent = nghttp2_stream_get_stream_id(s_parent); + s_grandpa = nghttp2_stream_get_parent(s_parent); + if (s_grandpa) { + id_grandpa = nghttp2_stream_get_stream_id(s_grandpa); + } + else { + /* parent of parent does not exist, + * only possible if parent == root */ + dep = H2_DEPENDANT_AFTER; + } + + switch (dep) { + case H2_DEPENDANT_INTERLEAVED: + /* PUSHed stream is to be interleaved with initiating stream. + * It is made a sibling of the initiating stream and gets a + * proportional weight [1, MAX_WEIGHT] of the initiaing + * stream weight. + */ + ptype = "INTERLEAVED"; + w_parent = nghttp2_stream_get_weight(s_parent); + w = valid_weight(w_parent * ((float)prio->weight / NGHTTP2_MAX_WEIGHT)); + nghttp2_priority_spec_init(&ps, id_grandpa, w, 0); + break; + + case H2_DEPENDANT_BEFORE: + /* PUSHed stream os to be sent BEFORE the initiating stream. + * It gets the same weight as the initiating stream, replaces + * that stream in the dependency tree and has the initiating + * stream as child. + */ + ptype = "BEFORE"; + w = w_parent = nghttp2_stream_get_weight(s_parent); + nghttp2_priority_spec_init(&ps, stream->id, w_parent, 0); + id_grandpa = nghttp2_stream_get_stream_id(s_grandpa); + rv = nghttp2_session_change_stream_priority(session->ngh2, id_parent, &ps); + if (rv < 0) { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, + "h2_stream(%ld-%d): PUSH BEFORE, weight=%d, " + "depends=%d, returned=%d", + session->id, id_parent, ps.weight, ps.stream_id, rv); + return APR_EGENERAL; + } + nghttp2_priority_spec_init(&ps, id_grandpa, w, 0); + break; + + case H2_DEPENDANT_AFTER: + /* The PUSHed stream is to be sent after the initiating stream. + * Give if the specified weight and let it depend on the intiating + * stream. + */ + /* fall through, it's the default */ + default: + nghttp2_priority_spec_init(&ps, id_parent, valid_weight(prio->weight), 0); + break; + } + + + rv = nghttp2_session_change_stream_priority(session->ngh2, stream->id, &ps); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, + "h2_stream(%ld-%d): PUSH %s, weight=%d, " + "depends=%d, returned=%d", + session->id, stream->id, ptype, + ps.weight, ps.stream_id, rv); + status = (rv < 0)? APR_EGENERAL : APR_SUCCESS; + } +#else + (void)session; + (void)stream; + (void)prio; + (void)valid_weight; +#endif + return status; +} + apr_status_t h2_session_stream_destroy(h2_session *session, h2_stream *stream) { apr_pool_t *pool = h2_stream_detach_pool(stream); - h2_mplx_stream_done(session->mplx, stream->id, stream->rst_error); - if (session->last_stream == stream) { - session->last_stream = NULL; + /* this may be called while the session has already freed + * some internal structures. */ + if (session->mplx) { + h2_mplx_stream_done(session->mplx, stream->id, stream->rst_error); + if (session->last_stream == stream) { + session->last_stream = NULL; + } + } + + if (session->streams) { + h2_stream_set_remove(session->streams, stream->id); } - h2_stream_set_remove(session->streams, stream->id); h2_stream_destroy(stream); if (pool) { @@ -1326,7 +1539,7 @@ static int frame_print(const nghttp2_frame *frame, char *buffer, size_t maxlen) } default: return apr_snprintf(buffer, maxlen, - "FRAME[type=%d, length=%d, flags=%d, stream=%d]", + "type=%d[length=%d, flags=%d, stream=%d]", frame->hd.type, (int)frame->hd.length, frame->hd.flags, frame->hd.stream_id); } @@ -1334,178 +1547,597 @@ static int frame_print(const nghttp2_frame *frame, char *buffer, size_t maxlen) int h2_session_push_enabled(h2_session *session) { - return nghttp2_session_get_remote_settings(session->ngh2, - NGHTTP2_SETTINGS_ENABLE_PUSH); + /* iff we can and they can */ + return (h2_config_geti(session->config, H2_CONF_PUSH) + && nghttp2_session_get_remote_settings(session->ngh2, + NGHTTP2_SETTINGS_ENABLE_PUSH)); } +static apr_status_t h2_session_send(h2_session *session) +{ + int rv = nghttp2_session_send(session->ngh2); + if (rv != 0) { + if (nghttp2_is_fatal(rv)) { + dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, rv, nghttp2_strerror(rv)); + return APR_EGENERAL; + } + } + + session->unsent_promises = 0; + session->unsent_submits = 0; + + return APR_SUCCESS; +} -apr_status_t h2_session_process(h2_session *session) +static apr_status_t h2_session_receive(void *ctx, const char *data, + apr_size_t len, apr_size_t *readlen) { - apr_status_t status = APR_SUCCESS; - apr_interval_time_t wait_micros = 0; - static const int MAX_WAIT_MICROS = 200 * 1000; - int got_streams = 0; - - while (!session->aborted && (nghttp2_session_want_read(session->ngh2) - || nghttp2_session_want_write(session->ngh2))) { - int have_written = 0; - int have_read = 0; - - /* Send data as long as we have it and window sizes allow. We are - * a server after all. - */ - if (nghttp2_session_want_write(session->ngh2)) { - int rv; - - rv = nghttp2_session_send(session->ngh2); - if (rv != 0) { - ap_log_cerror( APLOG_MARK, APLOG_DEBUG, 0, session->c, - "h2_session: send: %s", nghttp2_strerror(rv)); - if (nghttp2_is_fatal(rv)) { - h2_session_abort(session, status, rv); - goto end_process; - } - } - else { - have_written = 1; - wait_micros = 0; + h2_session *session = ctx; + ssize_t n; + + if (len > 0) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, + "h2_session(%ld): feeding %ld bytes to nghttp2", + session->id, (long)len); + n = nghttp2_session_mem_recv(session->ngh2, (const uint8_t *)data, len); + if (n < 0) { + if (nghttp2_is_fatal((int)n)) { + dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, (int)n, nghttp2_strerror(n)); + return APR_EGENERAL; } } - - if (have_written) { - h2_session_flush(session); - } - - if (wait_micros > 0) { - ap_log_cerror( APLOG_MARK, APLOG_TRACE3, 0, session->c, - "h2_session: wait for data, %ld micros", (long)(wait_micros)); - status = h2_mplx_out_trywait(session->mplx, wait_micros, session->iowait); - - if (status == APR_TIMEUP) { - if (wait_micros < MAX_WAIT_MICROS) { - wait_micros *= 2; - } - } + else { + *readlen = n; + session->io.bytes_read += n; } - - if (nghttp2_session_want_read(session->ngh2)) - { - /* When we - * - and have no streams at all - * - or have streams, but none is suspended or needs submit and - * have nothing written on the last try - * - * or, the other way around - * - have only streams where data can be sent, but could - * not send anything - * - * then we are waiting on frames from the client (for - * example WINDOW_UPDATE or HEADER) and without new frames - * from the client, we cannot make any progress, - * - * and *then* we can safely do a blocking read. - */ - int may_block = (session->frames_received <= 1); - if (!may_block) { - if (got_streams) { - may_block = (!have_written - && !h2_stream_set_has_unsubmitted(session->streams) - && !h2_stream_set_has_suspended(session->streams)); - } - else { - may_block = 1; - } - } - - if (may_block) { - h2_session_flush(session); - if (session->c->cs) { - session->c->cs->state = (got_streams? CONN_STATE_HANDLER - : CONN_STATE_WRITE_COMPLETION); - } - status = h2_conn_io_read(&session->io, APR_BLOCK_READ, - session_receive, session); - } - else { - if (session->c->cs) { - session->c->cs->state = CONN_STATE_HANDLER; - } - status = h2_conn_io_read(&session->io, APR_NONBLOCK_READ, - session_receive, session); - } + } + return APR_SUCCESS; +} - switch (status) { - case APR_SUCCESS: /* successful read, reset our idle timers */ - have_read = 1; - wait_micros = 0; - break; - case APR_EAGAIN: /* non-blocking read, nothing there */ - break; - default: +static apr_status_t h2_session_read(h2_session *session, int block, int loops) +{ + apr_status_t status, rstatus = APR_EAGAIN; + conn_rec *c = session->c; + int i; + + for (i = 0; i < loops; ++i) { + /* H2_IN filter handles all incoming data against the session. + * We just pull at the filter chain to make it happen */ + status = ap_get_brigade(c->input_filters, + session->bbtmp, AP_MODE_READBYTES, + block? APR_BLOCK_READ : APR_NONBLOCK_READ, + APR_BUCKET_BUFF_SIZE); + /* get rid of any possible data we do not expect to get */ + apr_brigade_cleanup(session->bbtmp); + + switch (status) { + case APR_SUCCESS: + /* successful read, reset our idle timers */ + rstatus = APR_SUCCESS; + if (block) { + /* successfull blocked read, try unblocked to + * get more. */ + block = 0; + } + break; + case APR_EAGAIN: + return rstatus; + case APR_TIMEUP: + return status; + default: + if (!i) { + /* first attempt failed */ if (APR_STATUS_IS_ETIMEDOUT(status) || APR_STATUS_IS_ECONNABORTED(status) || APR_STATUS_IS_ECONNRESET(status) || APR_STATUS_IS_EOF(status) || APR_STATUS_IS_EBADF(status)) { /* common status for a client that has left */ - ap_log_cerror( APLOG_MARK, APLOG_DEBUG, status, session->c, - "h2_session(%ld): terminating", - session->id); - /* Stolen from mod_reqtimeout to speed up lingering when - * a read timeout happened. - */ - apr_table_setn(session->c->notes, "short-lingering-close", "1"); + ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c, + "h2_session(%ld): input gone", session->id); } else { /* uncommon status, log on INFO so that we see this */ - ap_log_cerror( APLOG_MARK, APLOG_INFO, status, session->c, + ap_log_cerror( APLOG_MARK, APLOG_INFO, status, c, APLOGNO(02950) "h2_session(%ld): error reading, terminating", session->id); } - h2_session_abort(session, status, 0); - goto end_process; + return status; + } + /* subsequent failure after success(es), return initial + * status. */ + return rstatus; + } + if (!is_accepting_streams(session)) { + break; + } + } + return rstatus; +} + +static apr_status_t h2_session_submit(h2_session *session) +{ + apr_status_t status = APR_EAGAIN; + h2_stream *stream; + + if (h2_stream_set_has_unsubmitted(session->streams)) { + /* If we have responses ready, submit them now. */ + while ((stream = h2_mplx_next_submit(session->mplx, session->streams))) { + status = submit_response(session, stream); + ++session->unsent_submits; + + /* Unsent push promises are written immediately, as nghttp2 + * 1.5.0 realizes internal stream data structures only on + * send and we might need them for other submits. + * Also, to conserve memory, we send at least every 10 submits + * so that nghttp2 does not buffer all outbound items too + * long. + */ + if (status == APR_SUCCESS + && (session->unsent_promises || session->unsent_submits > 10)) { + status = h2_session_send(session); + if (status != APR_SUCCESS) { + break; + } } } + } + return status; +} + +static const char *StateNames[] = { + "INIT", /* H2_SESSION_ST_INIT */ + "DONE", /* H2_SESSION_ST_DONE */ + "IDLE", /* H2_SESSION_ST_IDLE */ + "BUSY", /* H2_SESSION_ST_BUSY */ + "WAIT", /* H2_SESSION_ST_WAIT */ + "LSHUTDOWN", /* H2_SESSION_ST_LOCAL_SHUTDOWN */ + "RSHUTDOWN", /* H2_SESSION_ST_REMOTE_SHUTDOWN */ +}; + +static const char *state_name(h2_session_state state) +{ + if (state >= (sizeof(StateNames)/sizeof(StateNames[0]))) { + return "unknown"; + } + return StateNames[state]; +} + +static int is_accepting_streams(h2_session *session) +{ + switch (session->state) { + case H2_SESSION_ST_IDLE: + case H2_SESSION_ST_BUSY: + case H2_SESSION_ST_WAIT: + return 1; + default: + return 0; + } +} + +static void transit(h2_session *session, const char *action, h2_session_state nstate) +{ + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, + "h2_session(%ld): transit [%s] -- %s --> [%s]", session->id, + state_name(session->state), action, state_name(nstate)); + session->state = nstate; +} + +static void h2_session_ev_init(h2_session *session, int arg, const char *msg) +{ + switch (session->state) { + case H2_SESSION_ST_INIT: + transit(session, "init", H2_SESSION_ST_BUSY); + break; + + default: + /* nop */ + break; + } +} + +static void h2_session_ev_local_goaway(h2_session *session, int arg, const char *msg) +{ + switch (session->state) { + case H2_SESSION_ST_LOCAL_SHUTDOWN: + /* already did that? */ + break; + case H2_SESSION_ST_IDLE: + case H2_SESSION_ST_REMOTE_SHUTDOWN: + /* all done */ + transit(session, "local goaway", H2_SESSION_ST_DONE); + break; + default: + transit(session, "local goaway", H2_SESSION_ST_LOCAL_SHUTDOWN); + break; + } +} + +static void h2_session_ev_remote_goaway(h2_session *session, int arg, const char *msg) +{ + switch (session->state) { + case H2_SESSION_ST_REMOTE_SHUTDOWN: + /* already received that? */ + break; + case H2_SESSION_ST_IDLE: + case H2_SESSION_ST_LOCAL_SHUTDOWN: + /* all done */ + transit(session, "remote goaway", H2_SESSION_ST_DONE); + break; + default: + transit(session, "remote goaway", H2_SESSION_ST_REMOTE_SHUTDOWN); + break; + } +} + +static void h2_session_ev_conn_error(h2_session *session, int arg, const char *msg) +{ + switch (session->state) { + case H2_SESSION_ST_INIT: + case H2_SESSION_ST_DONE: + case H2_SESSION_ST_LOCAL_SHUTDOWN: + /* just leave */ + transit(session, "conn error", H2_SESSION_ST_DONE); + break; - got_streams = !h2_stream_set_is_empty(session->streams); - if (got_streams) { - h2_stream *stream; - - if (session->reprioritize) { - h2_mplx_reprioritize(session->mplx, stream_pri_cmp, session); - session->reprioritize = 0; + default: + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, + "h2_session(%ld): conn error -> shutdown", session->id); + h2_session_shutdown(session, arg, msg); + break; + } +} + +static void h2_session_ev_proto_error(h2_session *session, int arg, const char *msg) +{ + switch (session->state) { + case H2_SESSION_ST_DONE: + case H2_SESSION_ST_LOCAL_SHUTDOWN: + /* just leave */ + transit(session, "proto error", H2_SESSION_ST_DONE); + break; + + default: + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, + "h2_session(%ld): proto error -> shutdown", session->id); + h2_session_shutdown(session, arg, msg); + break; + } +} + +static void h2_session_ev_conn_timeout(h2_session *session, int arg, const char *msg) +{ + switch (session->state) { + case H2_SESSION_ST_LOCAL_SHUTDOWN: + transit(session, "conn timeout", H2_SESSION_ST_DONE); + break; + default: + h2_session_shutdown(session, arg, msg); + transit(session, "conn timeout", H2_SESSION_ST_DONE); + break; + } +} + +static void h2_session_ev_no_io(h2_session *session, int arg, const char *msg) +{ + switch (session->state) { + case H2_SESSION_ST_BUSY: + /* nothing for input and output to do. If we remain + * in this state, we go into a tight loop and suck up + * CPU cycles. Ideally, we'd like to do a blocking read, but that + * is not possible if we have scheduled tasks and wait + * for them to produce something. */ + if (h2_stream_set_is_empty(session->streams)) { + /* When we have no streams, no task event are possible, + * switch to blocking reads */ + transit(session, "no io", H2_SESSION_ST_IDLE); } - - if (!have_read && !have_written) { - /* Nothing read or written. That means no data yet ready to - * be send out. Slowly back off... - */ - if (wait_micros == 0) { - wait_micros = 10; - } + else if (!h2_stream_set_has_unsubmitted(session->streams) + && !h2_stream_set_has_suspended(session->streams)) { + /* none of our streams is waiting for a response or + * new output data from task processing, + * switch to blocking reads. */ + transit(session, "no io", H2_SESSION_ST_IDLE); } - - if (h2_stream_set_has_open_input(session->streams)) { - /* Check that any pending window updates are sent. */ - status = h2_mplx_in_update_windows(session->mplx, update_window, session); - if (APR_STATUS_IS_EAGAIN(status)) { - status = APR_SUCCESS; - } + else { + /* Unable to do blocking reads, as we wait on events from + * task processing in other threads. Do a busy wait with + * backoff timer. */ + transit(session, "no io", H2_SESSION_ST_WAIT); } - - h2_session_resume_streams_with_data(session); - - if (h2_stream_set_has_unsubmitted(session->streams)) { - /* If we have responses ready, submit them now. */ - while ((stream = h2_mplx_next_submit(session->mplx, session->streams))) { - status = submit_response(session, stream); + break; + default: + /* nop */ + break; + } +} + +static void h2_session_ev_wait_timeout(h2_session *session, int arg, const char *msg) +{ + switch (session->state) { + case H2_SESSION_ST_WAIT: + transit(session, "wait timeout", H2_SESSION_ST_BUSY); + break; + default: + /* nop */ + break; + } +} + +static void h2_session_ev_stream_ready(h2_session *session, int arg, const char *msg) +{ + switch (session->state) { + case H2_SESSION_ST_WAIT: + transit(session, "stream ready", H2_SESSION_ST_BUSY); + break; + default: + /* nop */ + break; + } +} + +static void h2_session_ev_data_read(h2_session *session, int arg, const char *msg) +{ + switch (session->state) { + case H2_SESSION_ST_IDLE: + transit(session, "data read", H2_SESSION_ST_BUSY); + break; + /* fall through */ + default: + /* nop */ + break; + } +} + +static void h2_session_ev_ngh2_done(h2_session *session, int arg, const char *msg) +{ + switch (session->state) { + case H2_SESSION_ST_DONE: + /* nop */ + break; + default: + transit(session, "nghttp2 done", H2_SESSION_ST_DONE); + break; + } +} + +static void dispatch_event(h2_session *session, h2_session_event_t ev, + int arg, const char *msg) +{ + switch (ev) { + case H2_SESSION_EV_INIT: + h2_session_ev_init(session, arg, msg); + break; + case H2_SESSION_EV_LOCAL_GOAWAY: + h2_session_ev_local_goaway(session, arg, msg); + break; + case H2_SESSION_EV_REMOTE_GOAWAY: + h2_session_ev_remote_goaway(session, arg, msg); + break; + case H2_SESSION_EV_CONN_ERROR: + h2_session_ev_conn_error(session, arg, msg); + break; + case H2_SESSION_EV_PROTO_ERROR: + h2_session_ev_proto_error(session, arg, msg); + break; + case H2_SESSION_EV_CONN_TIMEOUT: + h2_session_ev_conn_timeout(session, arg, msg); + break; + case H2_SESSION_EV_NO_IO: + h2_session_ev_no_io(session, arg, msg); + break; + case H2_SESSION_EV_WAIT_TIMEOUT: + h2_session_ev_wait_timeout(session, arg, msg); + break; + case H2_SESSION_EV_STREAM_READY: + h2_session_ev_stream_ready(session, arg, msg); + break; + case H2_SESSION_EV_DATA_READ: + h2_session_ev_data_read(session, arg, msg); + break; + case H2_SESSION_EV_NGH2_DONE: + h2_session_ev_ngh2_done(session, arg, msg); + break; + default: + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, + "h2_session(%ld): unknown event %d", + session->id, ev); + break; + } + + if (session->state == H2_SESSION_ST_DONE) { + h2_mplx_abort(session->mplx); + } +} + +static const int MAX_WAIT_MICROS = 200 * 1000; + +apr_status_t h2_session_process(h2_session *session, int async) +{ + apr_status_t status = APR_SUCCESS; + conn_rec *c = session->c; + int rv, have_written, have_read; + + ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c, + "h2_session(%ld): process start, async=%d", session->id, async); + + while (1) { + have_read = have_written = 0; + + switch (session->state) { + case H2_SESSION_ST_INIT: + if (!h2_is_acceptable_connection(c, 1)) { + h2_session_shutdown(session, NGHTTP2_INADEQUATE_SECURITY, NULL); + } + else { + ap_update_child_status(c->sbh, SERVER_BUSY_READ, NULL); + status = h2_session_start(session, &rv); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c, + "h2_session(%ld): started on %s:%d", session->id, + session->s->server_hostname, + c->local_addr->port); + if (status != APR_SUCCESS) { + dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL); + } + dispatch_event(session, H2_SESSION_EV_INIT, 0, NULL); } - } + break; + + case H2_SESSION_ST_IDLE: + h2_filter_cin_timeout_set(session->cin, session->keepalive_secs); + ap_update_child_status(c->sbh, SERVER_BUSY_KEEPALIVE, NULL); + status = h2_session_read(session, 1, 10); + if (status == APR_SUCCESS) { + have_read = 1; + dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL); + } + else if (status == APR_EAGAIN) { + /* nothing to read */ + } + else if (APR_STATUS_IS_TIMEUP(status)) { + dispatch_event(session, H2_SESSION_EV_CONN_TIMEOUT, 0, NULL); + break; + } + else { + dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL); + } + break; + + case H2_SESSION_ST_BUSY: + case H2_SESSION_ST_LOCAL_SHUTDOWN: + case H2_SESSION_ST_REMOTE_SHUTDOWN: + if (nghttp2_session_want_read(session->ngh2)) { + ap_update_child_status(c->sbh, SERVER_BUSY_READ, NULL); + h2_filter_cin_timeout_set(session->cin, session->timeout_secs); + status = h2_session_read(session, 0, 10); + if (status == APR_SUCCESS) { + have_read = 1; + dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL); + } + else if (status == APR_EAGAIN) { + /* nothing to read */ + } + else if (APR_STATUS_IS_TIMEUP(status)) { + dispatch_event(session, H2_SESSION_EV_CONN_TIMEOUT, 0, NULL); + break; + } + else { + dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL); + } + } + + if (!h2_stream_set_is_empty(session->streams)) { + /* resume any streams for which data is available again */ + h2_session_resume_streams_with_data(session); + /* Submit any responses/push_promises that are ready */ + status = h2_session_submit(session); + if (status == APR_SUCCESS) { + have_written = 1; + } + else if (status != APR_EAGAIN) { + dispatch_event(session, H2_SESSION_EV_CONN_ERROR, + H2_ERR_INTERNAL_ERROR, "submit error"); + break; + } + /* send out window updates for our inputs */ + status = h2_mplx_in_update_windows(session->mplx); + if (status != APR_SUCCESS && status != APR_EAGAIN) { + dispatch_event(session, H2_SESSION_EV_CONN_ERROR, + H2_ERR_INTERNAL_ERROR, "window update error"); + break; + } + } + + if (nghttp2_session_want_write(session->ngh2)) { + status = h2_session_send(session); + if (status == APR_SUCCESS) { + have_written = 1; + } + else { + dispatch_event(session, H2_SESSION_EV_CONN_ERROR, + H2_ERR_INTERNAL_ERROR, "writing"); + break; + } + } + + if (have_read || have_written) { + session->wait_us = 0; + } + else { + dispatch_event(session, H2_SESSION_EV_NO_IO, 0, NULL); + } + break; + + case H2_SESSION_ST_WAIT: + session->wait_us = H2MAX(session->wait_us, 10); + if (APLOGctrace1(c)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, + "h2_session: wait for data, %ld micros", + (long)session->wait_us); + } + + ap_log_cerror( APLOG_MARK, APLOG_TRACE2, status, c, + "h2_session(%ld): process -> trywait", session->id); + status = h2_mplx_out_trywait(session->mplx, session->wait_us, + session->iowait); + if (status == APR_SUCCESS) { + dispatch_event(session, H2_SESSION_EV_STREAM_READY, 0, NULL); + } + else if (status == APR_TIMEUP) { + /* nothing, increase timer for graceful backup */ + session->wait_us = H2MIN(session->wait_us*2, MAX_WAIT_MICROS); + dispatch_event(session, H2_SESSION_EV_WAIT_TIMEOUT, 0, NULL); + } + else { + h2_session_shutdown(session, H2_ERR_INTERNAL_ERROR, "cond wait error"); + } + break; + + case H2_SESSION_ST_DONE: + status = APR_EOF; + goto out; + + default: + ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EGENERAL, c, + "h2_session(%ld): unknown state %d", session->id, session->state); + dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, 0, NULL); + break; + } + + if (have_written) { + h2_conn_io_flush(&session->io); + } + else if (!nghttp2_session_want_read(session->ngh2) + && !nghttp2_session_want_write(session->ngh2)) { + dispatch_event(session, H2_SESSION_EV_NGH2_DONE, 0, NULL); + } + } + +out: + if (have_written) { + h2_conn_io_flush(&session->io); + } + + ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c, + "h2_session(%ld): [%s] process returns", + session->id, state_name(session->state)); + + if ((session->state != H2_SESSION_ST_DONE) + && (APR_STATUS_IS_EOF(status) + || APR_STATUS_IS_ECONNRESET(status) + || APR_STATUS_IS_ECONNABORTED(status))) { + dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL); + } + + status = (session->state == H2_SESSION_ST_DONE)? APR_EOF : APR_SUCCESS; + if (session->state == H2_SESSION_ST_DONE) { + if (!session->eoc_written) { + session->eoc_written = 1; + h2_conn_io_write_eoc(&session->io, + h2_bucket_eoc_create(session->c->bucket_alloc, session)); } - } -end_process: return status; }