*/
#include <assert.h>
+#include <stddef.h>
#include <apr_thread_cond.h>
#include <apr_base64.h>
#include <apr_strings.h>
+#include <ap_mpm.h>
+
#include <httpd.h>
#include <http_core.h>
#include <http_config.h>
#include <scoreboard.h>
#include "h2_private.h"
+#include "h2.h"
#include "h2_bucket_eoc.h"
#include "h2_bucket_eos.h"
#include "h2_config.h"
#include "h2_request.h"
#include "h2_response.h"
#include "h2_stream.h"
-#include "h2_stream_set.h"
#include "h2_from_h1.h"
#include "h2_task.h"
#include "h2_session.h"
#include "h2_version.h"
#include "h2_workers.h"
-#define H2MAX(x,y) ((x) > (y) ? (x) : (y))
-#define H2MIN(x,y) ((x) < (y) ? (x) : (y))
-
-static int frame_print(const nghttp2_frame *frame, char *buffer, size_t maxlen);
static int h2_session_status_from_apr_status(apr_status_t rv)
{
return NGHTTP2_ERR_WOULDBLOCK;
}
else if (APR_STATUS_IS_EOF(rv)) {
- return NGHTTP2_ERR_EOF;
+ return NGHTTP2_ERR_EOF;
}
return NGHTTP2_ERR_PROTO;
}
const char *data, apr_size_t len,
apr_size_t *readlen);
-h2_stream *h2_session_open_stream(h2_session *session, int stream_id)
+static int is_accepting_streams(h2_session *session);
+static void dispatch_event(h2_session *session, h2_session_event_t ev,
+ int err, const char *msg);
+
+apr_status_t h2_session_stream_done(h2_session *session, h2_stream *stream)
{
- h2_stream * stream;
- apr_pool_t *stream_pool;
- if (session->aborted) {
- return NULL;
- }
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ "h2_stream(%ld-%d): EOS bucket cleanup -> done",
+ session->id, stream->id);
+ h2_ihash_remove(session->streams, stream->id);
+ h2_mplx_stream_done(session->mplx, stream);
- if (session->spare) {
- stream_pool = session->spare;
- session->spare = NULL;
+ dispatch_event(session, H2_SESSION_EV_STREAM_DONE, 0, NULL);
+ return APR_SUCCESS;
+}
+
+typedef struct stream_sel_ctx {
+ h2_session *session;
+ h2_stream *candidate;
+} stream_sel_ctx;
+
+static int find_cleanup_stream(void *udata, void *sdata)
+{
+ stream_sel_ctx *ctx = udata;
+ h2_stream *stream = sdata;
+ if (H2_STREAM_CLIENT_INITIATED(stream->id)) {
+ if (!ctx->session->local.accepting
+ && stream->id > ctx->session->local.accepted_max) {
+ ctx->candidate = stream;
+ return 0;
+ }
}
else {
- apr_pool_create(&stream_pool, session->pool);
+ if (!ctx->session->remote.accepting
+ && stream->id > ctx->session->remote.accepted_max) {
+ ctx->candidate = stream;
+ return 0;
+ }
}
+ return 1;
+}
+
+static void cleanup_streams(h2_session *session)
+{
+ stream_sel_ctx ctx;
+ ctx.session = session;
+ ctx.candidate = NULL;
+ while (1) {
+ h2_ihash_iter(session->streams, find_cleanup_stream, &ctx);
+ if (ctx.candidate) {
+ h2_session_stream_done(session, ctx.candidate);
+ ctx.candidate = NULL;
+ }
+ else {
+ break;
+ }
+ }
+}
+
+h2_stream *h2_session_open_stream(h2_session *session, int stream_id,
+ int initiated_on, const h2_request *req)
+{
+ h2_stream * stream;
+ apr_pool_t *stream_pool;
+
+ apr_pool_create(&stream_pool, session->pool);
+ apr_pool_tag(stream_pool, "h2_stream");
- stream = h2_stream_open(stream_id, stream_pool, session);
+ stream = h2_stream_open(stream_id, stream_pool, session,
+ initiated_on, req);
+ nghttp2_session_set_stream_user_data(session->ngh2, stream_id, stream);
+ h2_ihash_add(session->streams, stream);
- h2_stream_set_add(session->streams, stream);
- if (H2_STREAM_CLIENT_INITIATED(stream_id)
- && stream_id > session->max_stream_received) {
- session->max_stream_received = stream->id;
+ if (H2_STREAM_CLIENT_INITIATED(stream_id)) {
+ if (stream_id > session->remote.emitted_max) {
+ ++session->remote.emitted_count;
+ session->remote.emitted_max = stream->id;
+ session->local.accepted_max = stream->id;
+ }
+ }
+ else {
+ if (stream_id > session->local.emitted_max) {
+ ++session->local.emitted_count;
+ session->remote.emitted_max = stream->id;
+ }
}
+ dispatch_event(session, H2_SESSION_EV_STREAM_OPEN, 0, NULL);
return stream;
}
-#ifdef H2_NG2_STREAM_API
-
/**
* Determine the importance of streams when scheduling tasks.
* - if both stream depend on the same one, compare weights
return spri_cmp(sid1, s1, sid2, s2, session);
}
-#else /* ifdef H2_NG2_STREAM_API */
-
-/* In absence of nghttp2_stream API, which gives information about
- * priorities since nghttp2 1.3.x, we just sort the streams by
- * their identifier, aka. order of arrival.
- */
-static int stream_pri_cmp(int sid1, int sid2, void *ctx)
-{
- (void)ctx;
- return sid1 - sid2;
-}
-
-#endif /* (ifdef else) H2_NG2_STREAM_API */
-
static apr_status_t stream_schedule(h2_session *session,
h2_stream *stream, int eos)
{
(void)session;
- ++session->requests_received;
return h2_stream_schedule(stream, eos, h2_session_push_enabled(session),
stream_pri_cmp, session);
}
if (APR_STATUS_IS_EAGAIN(status)) {
return NGHTTP2_ERR_WOULDBLOCK;
}
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, APLOGNO(03062)
"h2_session: send error");
return h2_session_status_from_apr_status(status);
}
h2_session *session = (h2_session *)userp;
(void)ngh2;
- if (session->aborted) {
- return NGHTTP2_ERR_CALLBACK_FAILURE;
- }
- if (APLOGctrace2(session->c)) {
+ if (APLOGcdebug(session->c)) {
char buffer[256];
- frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
- "h2_session: callback on_invalid_frame_recv error=%d %s",
- error, buffer);
+ h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03063)
+ "h2_session(%ld): recv invalid FRAME[%s], frames=%ld/%ld (r/s)",
+ session->id, buffer, (long)session->frames_received,
+ (long)session->frames_sent);
}
return 0;
}
+static h2_stream *get_stream(h2_session *session, int stream_id)
+{
+ return nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
+}
+
static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags,
int32_t stream_id,
const uint8_t *data, size_t len, void *userp)
int rv;
(void)flags;
- if (session->aborted) {
- return NGHTTP2_ERR_CALLBACK_FAILURE;
+ if (!is_accepting_streams(session)) {
+ /* ignore */
+ return 0;
}
- stream = h2_session_get_stream(session, stream_id);
+ stream = get_stream(session, stream_id);
if (!stream) {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03064)
"h2_stream(%ld-%d): on_data_chunk for unknown stream",
session->id, (int)stream_id);
rv = nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE, stream_id,
}
return 0;
}
-
- status = h2_stream_write_data(stream, (const char *)data, len);
+
+ /* FIXME: enabling setting EOS this way seems to break input handling
+ * in mod_proxy_http2. why? */
+ status = h2_stream_write_data(stream, (const char *)data, len,
+ 0 /*flags & NGHTTP2_FLAG_END_STREAM*/);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
"h2_stream(%ld-%d): data_chunk_recv, written %ld bytes",
session->id, stream_id, (long)len);
h2_stream *stream,
uint32_t error_code)
{
+ conn_rec *c = session->c;
+ apr_bucket *b;
+ apr_status_t status;
+
if (!error_code) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_stream(%ld-%d): handled, closing",
session->id, (int)stream->id);
- if (stream->id > session->max_stream_handled) {
- session->max_stream_handled = stream->id;
+ if (H2_STREAM_CLIENT_INITIATED(stream->id)) {
+ if (stream->id > session->local.completed_max) {
+ session->local.completed_max = stream->id;
+ }
}
}
else {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(03065)
"h2_stream(%ld-%d): closing with err=%d %s",
session->id, (int)stream->id, (int)error_code,
h2_h2_err_description(error_code));
h2_stream_rst(stream, error_code);
}
- return h2_conn_io_writeb(&session->io,
- h2_bucket_eos_create(session->c->bucket_alloc,
- stream));
+ b = h2_bucket_eos_create(c->bucket_alloc, stream);
+ APR_BRIGADE_INSERT_TAIL(session->bbtmp, b);
+ status = h2_conn_io_pass(&session->io, session->bbtmp);
+ apr_brigade_cleanup(session->bbtmp);
+ return status;
}
static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id,
h2_stream *stream;
(void)ngh2;
- if (session->aborted) {
- return NGHTTP2_ERR_CALLBACK_FAILURE;
- }
- stream = h2_session_get_stream(session, stream_id);
+ stream = get_stream(session, stream_id);
if (stream) {
stream_release(session, stream, error_code);
}
/* We may see HEADERs at the start of a stream or after all DATA
* streams to carry trailers. */
(void)ngh2;
- s = h2_session_get_stream(session, frame->hd.stream_id);
+ s = get_stream(session, frame->hd.stream_id);
if (s) {
/* nop */
}
else {
- s = h2_session_open_stream((h2_session *)userp, frame->hd.stream_id);
+ s = h2_session_open_stream(userp, frame->hd.stream_id, 0, NULL);
}
- return s? 0 : NGHTTP2_ERR_CALLBACK_FAILURE;
+ return s? 0 : NGHTTP2_ERR_START_STREAM_NOT_ALLOWED;
}
static int on_header_cb(nghttp2_session *ngh2, const nghttp2_frame *frame,
h2_stream * stream;
apr_status_t status;
- (void)ngh2;
(void)flags;
- if (session->aborted) {
- return NGHTTP2_ERR_CALLBACK_FAILURE;
+ if (!is_accepting_streams(session)) {
+ /* just ignore */
+ return 0;
}
- stream = h2_session_get_stream(session, frame->hd.stream_id);
+ stream = get_stream(session, frame->hd.stream_id);
if (!stream) {
ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c,
APLOGNO(02920)
- "h2_session: stream(%ld-%d): on_header for unknown stream",
+ "h2_session: stream(%ld-%d): on_header unknown stream",
session->id, (int)frame->hd.stream_id);
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
}
status = h2_stream_add_header(stream, (const char *)name, namelen,
(const char *)value, valuelen);
-
- if (status != APR_SUCCESS) {
+ if (status != APR_SUCCESS && !stream->response) {
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
}
return 0;
apr_status_t status = APR_SUCCESS;
h2_stream *stream;
- if (session->aborted) {
- return NGHTTP2_ERR_CALLBACK_FAILURE;
- }
-
if (APLOGcdebug(session->c)) {
char buffer[256];
- frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+ h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03066)
"h2_session(%ld): recv FRAME[%s], frames=%ld/%ld (r/s)",
session->id, buffer, (long)session->frames_received,
(long)session->frames_sent);
/* This can be HEADERS for a new stream, defining the request,
* or HEADER may come after DATA at the end of a stream as in
* trailers */
- stream = h2_session_get_stream(session, frame->hd.stream_id);
+ stream = get_stream(session, frame->hd.stream_id);
if (stream) {
int eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM);
}
break;
case NGHTTP2_DATA:
- stream = h2_session_get_stream(session, frame->hd.stream_id);
+ stream = get_stream(session, frame->hd.stream_id);
if (stream) {
int eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
frame->window_update.window_size_increment);
break;
case NGHTTP2_RST_STREAM:
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03067)
"h2_session(%ld-%d): RST_STREAM by client, errror=%d",
session->id, (int)frame->hd.stream_id,
(int)frame->rst_stream.error_code);
- ++session->streams_reset;
+ stream = get_stream(session, frame->hd.stream_id);
+ if (stream && stream->request && stream->request->initiated_on) {
+ ++session->pushes_reset;
+ }
+ else {
+ ++session->streams_reset;
+ }
break;
case NGHTTP2_GOAWAY:
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
- "h2_session(%ld): GOAWAY errror=%d",
- session->id, (int)frame->goaway.error_code);
- session->client_goaway = 1;
+ session->remote.accepted_max = frame->goaway.last_stream_id;
+ session->remote.error = frame->goaway.error_code;
+ dispatch_event(session, H2_SESSION_EV_REMOTE_GOAWAY, 0, NULL);
break;
default:
if (APLOGctrace2(session->c)) {
char buffer[256];
- frame_print(frame, buffer,
- sizeof(buffer)/sizeof(buffer[0]));
+ h2_util_frame_print(frame, buffer,
+ sizeof(buffer)/sizeof(buffer[0]));
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
"h2_session: on_frame_rcv %s", buffer);
}
return 0;
}
-static apr_status_t pass_data(void *ctx,
- const char *data, apr_off_t length)
-{
- return h2_conn_io_write(&((h2_session*)ctx)->io, data, length);
-}
-
-
static char immortal_zeros[H2_MAX_PADLEN];
static int on_send_data_cb(nghttp2_session *ngh2,
unsigned char padlen;
int eos;
h2_stream *stream;
+ apr_bucket *b;
+ apr_off_t len = length;
(void)ngh2;
(void)source;
- if (session->aborted) {
- return NGHTTP2_ERR_CALLBACK_FAILURE;
- }
-
if (frame->data.padlen > H2_MAX_PADLEN) {
return NGHTTP2_ERR_PROTO;
}
padlen = (unsigned char)frame->data.padlen;
- stream = h2_session_get_stream(session, stream_id);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ "h2_stream(%ld-%d): send_data_cb for %ld bytes",
+ session->id, (int)stream_id, (long)length);
+
+ stream = get_stream(session, stream_id);
if (!stream) {
ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_NOTFOUND, session->c,
APLOGNO(02924)
- "h2_stream(%ld-%d): send_data",
+ "h2_stream(%ld-%d): send_data, lookup stream",
session->id, (int)stream_id);
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
- "h2_stream(%ld-%d): send_data_cb for %ld bytes",
- session->id, (int)stream_id, (long)length);
-
- if (h2_conn_io_is_buffered(&session->io)) {
- status = h2_conn_io_write(&session->io, (const char *)framehd, 9);
- if (status == APR_SUCCESS) {
- if (padlen) {
- status = h2_conn_io_write(&session->io, (const char *)&padlen, 1);
- }
-
- if (status == APR_SUCCESS) {
- apr_off_t len = length;
- status = h2_stream_readx(stream, pass_data, session, &len, &eos);
- if (status == APR_SUCCESS && len != length) {
- status = APR_EINVAL;
- }
- }
-
- if (status == APR_SUCCESS && padlen) {
- if (padlen) {
- status = h2_conn_io_write(&session->io, immortal_zeros, padlen);
- }
- }
- }
+ status = h2_conn_io_write(&session->io, (const char *)framehd, 9);
+ if (padlen && status == APR_SUCCESS) {
+ status = h2_conn_io_write(&session->io, (const char *)&padlen, 1);
}
- else {
- apr_bucket *b;
- char *header = apr_pcalloc(stream->pool, 10);
- memcpy(header, (const char *)framehd, 9);
- if (padlen) {
- header[9] = (char)padlen;
- }
- b = apr_bucket_pool_create(header, padlen? 10 : 9,
- stream->pool, session->c->bucket_alloc);
- status = h2_conn_io_writeb(&session->io, b);
-
- if (status == APR_SUCCESS) {
- apr_off_t len = length;
- status = h2_stream_read_to(stream, session->io.output, &len, &eos);
- if (status == APR_SUCCESS && len != length) {
- status = APR_EINVAL;
- }
- }
-
- if (status == APR_SUCCESS && padlen) {
- b = apr_bucket_immortal_create(immortal_zeros, padlen,
- session->c->bucket_alloc);
- status = h2_conn_io_writeb(&session->io, b);
- }
+
+ if (status != APR_SUCCESS) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
+ "h2_stream(%ld-%d): writing frame header",
+ session->id, (int)stream_id);
+ return NGHTTP2_ERR_CALLBACK_FAILURE;
}
+ status = h2_stream_read_to(stream, session->bbtmp, &len, &eos);
+ if (status != APR_SUCCESS) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
+ "h2_stream(%ld-%d): send_data_cb, reading stream",
+ session->id, (int)stream_id);
+ return NGHTTP2_ERR_CALLBACK_FAILURE;
+ }
+ else if (len != length) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
+ "h2_stream(%ld-%d): send_data_cb, wanted %ld bytes, "
+ "got %ld from stream",
+ session->id, (int)stream_id, (long)length, (long)len);
+ return NGHTTP2_ERR_CALLBACK_FAILURE;
+ }
+
+ if (padlen) {
+ b = apr_bucket_immortal_create(immortal_zeros, padlen,
+ session->c->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(session->bbtmp, b);
+ }
+ status = h2_conn_io_pass(&session->io, session->bbtmp);
+
+ apr_brigade_cleanup(session->bbtmp);
if (status == APR_SUCCESS) {
- stream->data_frames_sent++;
- h2_conn_io_consider_flush(&session->io);
+ stream->out_data_frames++;
+ stream->out_data_octets += length;
return 0;
}
else {
APLOGNO(02925)
"h2_stream(%ld-%d): failed send_data_cb",
session->id, (int)stream_id);
+ return NGHTTP2_ERR_CALLBACK_FAILURE;
}
-
- return h2_session_status_from_apr_status(status);
}
static int on_frame_send_cb(nghttp2_session *ngh2,
if (APLOGcdebug(session->c)) {
char buffer[256];
- frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+ h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03068)
"h2_session(%ld): sent FRAME[%s], frames=%ld/%ld (r/s)",
session->id, buffer, (long)session->frames_received,
(long)session->frames_sent);
return APR_SUCCESS;
}
-static void h2_session_cleanup(h2_session *session)
+static void h2_session_destroy(h2_session *session)
{
- AP_DEBUG_ASSERT(session);
- /* This is an early cleanup of the session that may
- * discard what is no longer necessary for *new* streams
- * and general HTTP/2 processing.
- * At this point, all frames are in transit or somehwere in
- * our buffers or passed down output filters.
- * h2 streams might still being written out.
- */
- if (session->c) {
- h2_ctx_clear(session->c);
+ AP_DEBUG_ASSERT(session);
+
+ h2_ihash_clear(session->streams);
+ if (session->mplx) {
+ h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
+ h2_mplx_release_and_join(session->mplx, session->iowait);
+ session->mplx = NULL;
}
+
+ ap_remove_input_filter_byhandle((session->r? session->r->input_filters :
+ session->c->input_filters), "H2_IN");
if (session->ngh2) {
nghttp2_session_del(session->ngh2);
session->ngh2 = NULL;
}
- if (session->spare) {
- apr_pool_destroy(session->spare);
- session->spare = NULL;
+ if (session->c) {
+ h2_ctx_clear(session->c);
+ }
+
+ if (APLOGctrace1(session->c)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+ "h2_session(%ld): destroy", session->id);
+ }
+ if (session->pool) {
+ apr_pool_destroy(session->pool);
}
}
-static void h2_session_destroy(h2_session *session)
+static apr_status_t h2_session_shutdown(h2_session *session, int error,
+ const char *msg, int force_close)
{
+ apr_status_t status = APR_SUCCESS;
+
AP_DEBUG_ASSERT(session);
- h2_session_cleanup(session);
-
- if (APLOGctrace1(session->c)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
- "h2_session(%ld): destroy, %d streams open",
- session->id, (int)h2_stream_set_size(session->streams));
+ if (!msg && error) {
+ msg = nghttp2_strerror(error);
}
- if (session->mplx) {
- h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
- h2_mplx_release_and_join(session->mplx, session->iowait);
- session->mplx = NULL;
+
+ if (error || force_close) {
+ /* not a graceful shutdown, we want to leave...
+ * Do not start further streams that are waiting to be scheduled.
+ * Find out the max stream id that we habe been processed or
+ * are still actively working on.
+ * Remove all streams greater than this number without submitting
+ * a RST_STREAM frame, since that should be clear from the GOAWAY
+ * we send. */
+ session->local.accepted_max = h2_mplx_shutdown(session->mplx);
+ session->local.error = error;
}
- if (session->streams) {
- h2_stream_set_destroy(session->streams);
- session->streams = NULL;
+ else {
+ /* graceful shutdown. we will continue processing all streams
+ * we have, but no longer accept new ones. Report the max stream
+ * we have received and discard all new ones. */
+ }
+ nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE,
+ session->local.accepted_max,
+ error, (uint8_t*)msg, msg? strlen(msg):0);
+ status = nghttp2_session_send(session->ngh2);
+ if (status == APR_SUCCESS) {
+ status = h2_conn_io_flush(&session->io);
}
- if (session->pool) {
- apr_pool_destroy(session->pool);
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03069)
+ "session(%ld): sent GOAWAY, err=%d, msg=%s",
+ session->id, error, msg? msg : "");
+ dispatch_event(session, H2_SESSION_EV_LOCAL_GOAWAY, error, msg);
+
+ if (force_close) {
+ h2_mplx_abort(session->mplx);
}
+
+ return status;
}
static apr_status_t session_pool_cleanup(void *data)
*/
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
"session(%ld): pool_cleanup", session->id);
+
+ if (session->state != H2_SESSION_ST_DONE
+ && session->state != H2_SESSION_ST_LOCAL_SHUTDOWN) {
+ /* Not good. The connection is being torn down and we have
+ * not sent a goaway. This is considered a protocol error and
+ * the client has to assume that any streams "in flight" may have
+ * been processed and are not safe to retry.
+ * As clients with idle connection may only learn about a closed
+ * connection when sending the next request, this has the effect
+ * that at least this one request will fail.
+ */
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, session->c, APLOGNO(03199)
+ "session(%ld): connection disappeared without proper "
+ "goodbye, clients will be confused, should not happen",
+ session->id);
+ }
/* keep us from destroying the pool, since that is already ongoing. */
session->pool = NULL;
h2_session_destroy(session);
{
nghttp2_session_callbacks *callbacks = NULL;
nghttp2_option *options = NULL;
+ uint32_t n;
apr_pool_t *pool = NULL;
apr_status_t status = apr_pool_create(&pool, c->pool);
if (status != APR_SUCCESS) {
return NULL;
}
+ apr_pool_tag(pool, "h2_session");
session = apr_pcalloc(pool, sizeof(h2_session));
if (session) {
session->c = c;
session->r = r;
session->s = h2_ctx_server_get(ctx);
+ session->pool = pool;
session->config = h2_config_sget(session->s);
+ session->workers = workers;
session->state = H2_SESSION_ST_INIT;
+ session->local.accepting = 1;
+ session->remote.accepting = 1;
- session->pool = pool;
apr_pool_pre_cleanup_register(pool, session, session_pool_cleanup);
- session->max_stream_count = h2_config_geti(session->config, H2_CONF_MAX_STREAMS);
- session->max_stream_mem = h2_config_geti(session->config, H2_CONF_STREAM_MAX_MEM);
- session->timeout_secs = h2_config_geti(session->config, H2_CONF_TIMEOUT_SECS);
- if (session->timeout_secs <= 0) {
- session->timeout_secs = apr_time_sec(session->s->timeout);
- }
- session->keepalive_secs = h2_config_geti(session->config, H2_CONF_KEEPALIVE_SECS);
- if (session->keepalive_secs <= 0) {
- session->keepalive_secs = apr_time_sec(session->s->keep_alive_timeout);
- }
-
+ session->max_stream_count = h2_config_geti(session->config,
+ H2_CONF_MAX_STREAMS);
+ session->max_stream_mem = h2_config_geti(session->config,
+ H2_CONF_STREAM_MAX_MEM);
+
status = apr_thread_cond_create(&session->iowait, session->pool);
if (status != APR_SUCCESS) {
return NULL;
}
- session->streams = h2_stream_set_create(session->pool, session->max_stream_count);
-
- session->workers = workers;
- session->mplx = h2_mplx_create(c, session->pool, session->config, workers);
+ session->streams = h2_ihash_create(session->pool,
+ offsetof(h2_stream, id));
+ session->mplx = h2_mplx_create(c, session->pool, session->config,
+ session->s->timeout, workers);
h2_mplx_set_consumed_cb(session->mplx, update_window, session);
/* Install the connection input filter that feeds the session */
- session->cin = h2_filter_cin_create(session->pool, h2_session_receive, session);
+ session->cin = h2_filter_cin_create(session->pool,
+ h2_session_receive, session);
ap_add_input_filter("H2_IN", session->cin, r, c);
- h2_conn_io_init(&session->io, c, session->config, session->pool);
+ h2_conn_io_init(&session->io, c, session->config);
session->bbtmp = apr_brigade_create(session->pool, c->bucket_alloc);
status = init_callbacks(c, &callbacks);
h2_session_destroy(session);
return NULL;
}
- nghttp2_option_set_peer_max_concurrent_streams(options,
- (uint32_t)session->max_stream_count);
+ nghttp2_option_set_peer_max_concurrent_streams(
+ options, (uint32_t)session->max_stream_count);
/* We need to handle window updates ourself, otherwise we
* get flooded by nghttp2. */
nghttp2_option_set_no_auto_window_update(options, 1);
h2_session_destroy(session);
return NULL;
}
-
+
+ n = h2_config_geti(session->config, H2_CONF_PUSH_DIARY_SIZE);
+ session->push_diary = h2_push_diary_create(session->pool, n);
+
if (APLOGcdebug(c)) {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c,
- "session(%ld) created, timeout=%d, keepalive_timeout=%d, "
- "max_streams=%d, stream_mem=%d",
- session->id, session->timeout_secs, session->keepalive_secs,
- (int)session->max_stream_count, (int)session->max_stream_mem);
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(03200)
+ "h2_session(%ld) created, max_streams=%d, "
+ "stream_mem=%d, workers_limit=%d, workers_max=%d, "
+ "push_diary(type=%d,N=%d)",
+ session->id, (int)session->max_stream_count,
+ (int)session->max_stream_mem,
+ session->mplx->workers_limit,
+ session->mplx->workers_max,
+ session->push_diary->dtype,
+ (int)session->push_diary->N);
}
}
return session;
h2_session_destroy(session);
}
-static apr_status_t h2_session_shutdown(h2_session *session, int reason)
-{
- AP_DEBUG_ASSERT(session);
- session->aborted = 1;
- if (session->state != H2_SESSION_ST_CLOSING
- && session->state != H2_SESSION_ST_ABORTED) {
- if (session->client_goaway) {
- /* client sent us a GOAWAY, just terminate */
- nghttp2_session_terminate_session(session->ngh2, NGHTTP2_ERR_EOF);
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
- "session(%ld): shutdown, GOAWAY from client", session->id);
- }
- else if (!reason) {
- nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE,
- session->max_stream_received,
- reason, NULL, 0);
- nghttp2_session_send(session->ngh2);
- session->server_goaway = 1;
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
- "session(%ld): shutdown, no err", session->id);
- }
- else {
- const char *err = nghttp2_strerror(reason);
- nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE,
- session->max_stream_received,
- reason, (const uint8_t *)err,
- strlen(err));
- nghttp2_session_send(session->ngh2);
- session->server_goaway = 1;
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
- "session(%ld): shutdown, err=%d '%s'",
- session->id, reason, err);
- }
- session->state = H2_SESSION_ST_CLOSING;
- h2_mplx_abort(session->mplx);
- }
- return APR_SUCCESS;
-}
-
-void h2_session_abort(h2_session *session, apr_status_t status)
-{
- AP_DEBUG_ASSERT(session);
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
- "h2_session(%ld): aborting", session->id);
- session->state = H2_SESSION_ST_ABORTED;
- session->aborted = 1;
-}
-
static apr_status_t h2_session_start(h2_session *session, int *rv)
{
apr_status_t status = APR_SUCCESS;
if (APLOGrdebug(session->r)) {
char buffer[128];
h2_util_hex_dump(buffer, 128, (char*)cs, dlen);
- ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, session->r,
+ ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, session->r, APLOGNO(03070)
"upgrading h2c session with HTTP2-Settings: %s -> %s (%d)",
s, buffer, (int)dlen);
}
}
/* Now we need to auto-open stream 1 for the request we got. */
- stream = h2_session_open_stream(session, 1);
+ stream = h2_session_open_stream(session, 1, 0, NULL);
if (!stream) {
status = APR_EGENERAL;
ap_log_rerror(APLOG_MARK, APLOG_ERR, status, session->r,
++slen;
}
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, APLOGNO(03201)
+ "h2_session(%ld): start, INITIAL_WINDOW_SIZE=%ld, "
+ "MAX_CONCURRENT_STREAMS=%d",
+ session->id, (long)win_size, (int)session->max_stream_count);
*rv = nghttp2_submit_settings(session->ngh2, NGHTTP2_FLAG_NONE,
settings, slen);
if (*rv != 0) {
nghttp2_strerror(*rv));
}
}
- return status;
-}
-
-typedef struct {
- h2_session *session;
- int resume_count;
-} resume_ctx;
-
-static int resume_on_data(void *ctx, h2_stream *stream)
-{
- resume_ctx *rctx = (resume_ctx*)ctx;
- h2_session *session = rctx->session;
- AP_DEBUG_ASSERT(session);
- AP_DEBUG_ASSERT(stream);
-
- if (h2_stream_is_suspended(stream)) {
- if (h2_mplx_out_has_data_for(stream->session->mplx, stream->id)) {
- int rv;
- h2_stream_set_suspended(stream, 0);
- ++rctx->resume_count;
-
- rv = nghttp2_session_resume_data(session->ngh2, stream->id);
- ap_log_cerror(APLOG_MARK, nghttp2_is_fatal(rv)?
- APLOG_ERR : APLOG_DEBUG, 0, session->c,
- APLOGNO(02936)
- "h2_stream(%ld-%d): resuming %s",
- session->id, stream->id, rv? nghttp2_strerror(rv) : "");
- }
- }
- return 1;
-}
-
-static int h2_session_resume_streams_with_data(h2_session *session)
-{
- AP_DEBUG_ASSERT(session);
- if (!h2_stream_set_is_empty(session->streams)
- && session->mplx && !session->aborted) {
- resume_ctx ctx;
-
- ctx.session = session;
- ctx.resume_count = 0;
-
- /* Resume all streams where we have data in the out queue and
- * which had been suspended before. */
- h2_stream_set_iter(session->streams, resume_on_data, &ctx);
- return ctx.resume_count;
- }
- return 0;
-}
-
-h2_stream *h2_session_get_stream(h2_session *session, int stream_id)
-{
- if (!session->last_stream || stream_id != session->last_stream->id) {
- session->last_stream = h2_stream_set_get(session->streams, stream_id);
- }
- return session->last_stream;
-}
-
-void h2_session_close(h2_session *session)
-{
- apr_bucket *b;
- conn_rec *c = session->c;
- apr_status_t status;
- AP_DEBUG_ASSERT(session);
- if (!session->aborted) {
- h2_session_shutdown(session, 0);
- }
- h2_session_cleanup(session);
-
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c,
- "h2_session(%ld): writing eoc", c->id);
- b = h2_bucket_eoc_create(c->bucket_alloc, session);
- status = h2_conn_io_write_eoc(&session->io, b);
- if (status != APR_SUCCESS) {
- ap_log_cerror(APLOG_MARK, APLOG_ERR, status, c,
- "h2_session(%ld): flushed eoc bucket", c->id);
- }
- /* and all is or will be destroyed */
+ return status;
}
static ssize_t stream_data_cb(nghttp2_session *ng2s,
(void)ng2s;
(void)buf;
(void)source;
- stream = h2_session_get_stream(session, stream_id);
+ stream = get_stream(session, stream_id);
if (!stream) {
ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c,
APLOGNO(02937)
AP_DEBUG_ASSERT(!h2_stream_is_suspended(stream));
- status = h2_stream_prep_read(stream, &nread, &eos);
+ status = h2_stream_out_prepare(stream, &nread, &eos);
if (nread) {
*data_flags |= NGHTTP2_DATA_FLAG_NO_COPY;
}
* it. Remember at our h2_stream that we need to do this.
*/
nread = 0;
- h2_stream_set_suspended(stream, 1);
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+ h2_mplx_suspend_stream(session->mplx, stream->id);
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03071)
"h2_stream(%ld-%d): suspending",
session->id, (int)stream_id);
return NGHTTP2_ERR_DEFERRED;
- case APR_EOF:
- nread = 0;
- eos = 1;
- break;
-
default:
nread = 0;
ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c,
int rv;
nh = h2_util_ngheader_make(stream->pool, trailers);
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03072)
"h2_stream(%ld-%d): submit %d trailers",
session->id, (int)stream_id,(int) nh->nvlen);
rv = nghttp2_submit_trailer(ng2s, stream->id, nh->nv, nh->nvlen);
size_t offset;
} nvctx_t;
-/**
- * Start submitting the response to a stream request. This is possible
- * once we have all the response headers. The response body will be
- * read by the session using the callback we supply.
- */
-static apr_status_t submit_response(h2_session *session, h2_stream *stream)
+struct h2_stream *h2_session_push(h2_session *session, h2_stream *is,
+ h2_push *push)
{
- apr_status_t status = APR_SUCCESS;
- int rv = 0;
- AP_DEBUG_ASSERT(session);
- AP_DEBUG_ASSERT(stream);
- AP_DEBUG_ASSERT(stream->response || stream->rst_error);
+ apr_status_t status;
+ h2_stream *stream;
+ h2_ngheader *ngh;
+ int nid;
- if (stream->submitted) {
- rv = NGHTTP2_PROTOCOL_ERROR;
- }
- else if (stream->response && stream->response->headers) {
- nghttp2_data_provider provider;
- h2_response *response = stream->response;
- h2_ngheader *ngh;
- const h2_priority *prio;
-
- memset(&provider, 0, sizeof(provider));
- provider.source.fd = stream->id;
- provider.read_callback = stream_data_cb;
-
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
- "h2_stream(%ld-%d): submit response %d",
- session->id, stream->id, response->http_status);
-
- /* If this stream is not a pushed one itself,
- * and HTTP/2 server push is enabled here,
- * and the response is in the range 200-299 *),
- * and the remote side has pushing enabled,
- * -> find and perform any pushes on this stream
- * *before* we submit the stream response itself.
- * This helps clients avoid opening new streams on Link
- * headers that get pushed right afterwards.
- *
- * *) the response code is relevant, as we do not want to
- * make pushes on 401 or 403 codes, neiterh on 301/302
- * and friends. And if we see a 304, we do not push either
- * as the client, having this resource in its cache, might
- * also have the pushed ones as well.
- */
- if (!stream->initiated_on
- && H2_HTTP_2XX(response->http_status)
- && h2_session_push_enabled(session)) {
-
- h2_stream_submit_pushes(stream);
- }
-
- prio = h2_stream_get_priority(stream);
- if (prio) {
- h2_session_set_prio(session, stream, prio);
- /* no showstopper if that fails for some reason */
- }
-
- ngh = h2_util_ngheader_make_res(stream->pool, response->http_status,
- response->headers);
- rv = nghttp2_submit_response(session->ngh2, response->stream_id,
- ngh->nv, ngh->nvlen, &provider);
- ++session->responses_sent;
- }
- else {
- int err = H2_STREAM_RST(stream, H2_ERR_PROTOCOL_ERROR);
-
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
- "h2_stream(%ld-%d): RST_STREAM, err=%d",
- session->id, stream->id, err);
-
- rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
- stream->id, err);
- ++session->responses_sent;
+ ngh = h2_util_ngheader_make_req(is->pool, push->req);
+ nid = nghttp2_submit_push_promise(session->ngh2, 0, is->id,
+ ngh->nv, ngh->nvlen, NULL);
+ if (nid <= 0) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03075)
+ "h2_stream(%ld-%d): submitting push promise fail: %s",
+ session->id, is->id, nghttp2_strerror(nid));
+ return NULL;
}
+ ++session->pushes_promised;
- stream->submitted = 1;
-
- if (nghttp2_is_fatal(rv)) {
- status = APR_EGENERAL;
- h2_session_shutdown(session, rv);
- ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c,
- APLOGNO(02940) "submit_response: %s",
- nghttp2_strerror(rv));
- }
-
- return status;
-}
-
-struct h2_stream *h2_session_push(h2_session *session, h2_stream *is,
- h2_push *push)
-{
- apr_status_t status;
- h2_stream *stream;
- h2_ngheader *ngh;
- int nid;
-
- ngh = h2_util_ngheader_make_req(is->pool, push->req);
- nid = nghttp2_submit_push_promise(session->ngh2, 0, is->id,
- ngh->nv, ngh->nvlen, NULL);
- if (nid <= 0) {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
- "h2_stream(%ld-%d): submitting push promise fail: %s",
- session->id, is->id, nghttp2_strerror(nid));
- return NULL;
- }
- ++session->streams_pushed;
-
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03076)
"h2_stream(%ld-%d): SERVER_PUSH %d for %s %s on %d",
session->id, is->id, nid,
push->req->method, push->req->path, is->id);
- stream = h2_session_open_stream(session, nid);
+ stream = h2_session_open_stream(session, nid, is->id, push->req);
if (stream) {
- h2_stream_set_h2_request(stream, is->id, push->req);
status = stream_schedule(session, stream, 1);
if (status != APR_SUCCESS) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
"h2_stream(%ld-%d): scheduling push stream",
session->id, stream->id);
- h2_stream_cleanup(stream);
stream = NULL;
}
++session->unsent_promises;
}
else {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03077)
"h2_stream(%ld-%d): failed to create stream obj %d",
session->id, is->id, nid);
}
id_grandpa = nghttp2_stream_get_stream_id(s_grandpa);
rv = nghttp2_session_change_stream_priority(session->ngh2, id_parent, &ps);
if (rv < 0) {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03202)
"h2_stream(%ld-%d): PUSH BEFORE, weight=%d, "
"depends=%d, returned=%d",
session->id, id_parent, ps.weight, ps.stream_id, rv);
rv = nghttp2_session_change_stream_priority(session->ngh2, stream->id, &ps);
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03203)
"h2_stream(%ld-%d): PUSH %s, weight=%d, "
"depends=%d, returned=%d",
session->id, stream->id, ptype,
return status;
}
-apr_status_t h2_session_stream_destroy(h2_session *session, h2_stream *stream)
+int h2_session_push_enabled(h2_session *session)
{
- apr_pool_t *pool = h2_stream_detach_pool(stream);
+ /* iff we can and they can and want */
+ return (session->remote.accepting /* remote GOAWAY received */
+ && h2_config_geti(session->config, H2_CONF_PUSH)
+ && nghttp2_session_get_remote_settings(session->ngh2,
+ NGHTTP2_SETTINGS_ENABLE_PUSH));
+}
- /* this may be called while the session has already freed
- * some internal structures. */
- if (session->mplx) {
- h2_mplx_stream_done(session->mplx, stream->id, stream->rst_error);
- if (session->last_stream == stream) {
- session->last_stream = NULL;
- }
- }
+static apr_status_t h2_session_send(h2_session *session)
+{
+ apr_interval_time_t saved_timeout;
+ int rv;
+ apr_socket_t *socket;
- if (session->streams) {
- h2_stream_set_remove(session->streams, stream->id);
+ socket = ap_get_conn_socket(session->c);
+ if (socket) {
+ apr_socket_timeout_get(socket, &saved_timeout);
+ apr_socket_timeout_set(socket, session->s->timeout);
}
- h2_stream_destroy(stream);
- if (pool) {
- apr_pool_clear(pool);
- if (session->spare) {
- apr_pool_destroy(session->spare);
+ rv = nghttp2_session_send(session->ngh2);
+
+ if (socket) {
+ apr_socket_timeout_set(socket, saved_timeout);
+ }
+ session->have_written = 1;
+ if (rv != 0) {
+ if (nghttp2_is_fatal(rv)) {
+ dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, rv, nghttp2_strerror(rv));
+ return APR_EGENERAL;
}
- session->spare = pool;
}
+
+ session->unsent_promises = 0;
+ session->unsent_submits = 0;
+
return APR_SUCCESS;
}
-static int frame_print(const nghttp2_frame *frame, char *buffer, size_t maxlen)
+/**
+ * A stream was resumed as new output data arrived.
+ */
+static apr_status_t on_stream_resume(void *ctx, int stream_id)
+{
+ h2_session *session = ctx;
+ h2_stream *stream = get_stream(session, stream_id);
+ apr_status_t status = APR_SUCCESS;
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ "h2_stream(%ld-%d): on_resume", session->id, stream_id);
+ if (stream) {
+ int rv = nghttp2_session_resume_data(session->ngh2, stream_id);
+ session->have_written = 1;
+ ap_log_cerror(APLOG_MARK, nghttp2_is_fatal(rv)?
+ APLOG_ERR : APLOG_DEBUG, 0, session->c,
+ APLOGNO(02936)
+ "h2_stream(%ld-%d): resuming %s",
+ session->id, stream->id, rv? nghttp2_strerror(rv) : "");
+ }
+ return status;
+}
+
+/**
+ * A response for the stream is ready.
+ */
+static apr_status_t on_stream_response(void *ctx, int stream_id)
{
- char scratch[128];
- size_t s_len = sizeof(scratch)/sizeof(scratch[0]);
+ h2_session *session = ctx;
+ h2_stream *stream = get_stream(session, stream_id);
+ apr_status_t status = APR_SUCCESS;
+ h2_response *response;
+ int rv = 0;
+
+ AP_DEBUG_ASSERT(session);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ "h2_stream(%ld-%d): on_response", session->id, stream_id);
+ if (!stream) {
+ return APR_NOTFOUND;
+ }
+ else if (!stream->response) {
+ int err = H2_STREAM_RST(stream, H2_ERR_PROTOCOL_ERROR);
+
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03074)
+ "h2_stream(%ld-%d): RST_STREAM, err=%d",
+ session->id, stream->id, err);
+
+ rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
+ stream->id, err);
+ goto leave;
+ }
- switch (frame->hd.type) {
- case NGHTTP2_DATA: {
- return apr_snprintf(buffer, maxlen,
- "DATA[length=%d, flags=%d, stream=%d, padlen=%d]",
- (int)frame->hd.length, frame->hd.flags,
- frame->hd.stream_id, (int)frame->data.padlen);
- }
- case NGHTTP2_HEADERS: {
- return apr_snprintf(buffer, maxlen,
- "HEADERS[length=%d, hend=%d, stream=%d, eos=%d]",
- (int)frame->hd.length,
- !!(frame->hd.flags & NGHTTP2_FLAG_END_HEADERS),
- frame->hd.stream_id,
- !!(frame->hd.flags & NGHTTP2_FLAG_END_STREAM));
- }
- case NGHTTP2_PRIORITY: {
- return apr_snprintf(buffer, maxlen,
- "PRIORITY[length=%d, flags=%d, stream=%d]",
- (int)frame->hd.length,
- frame->hd.flags, frame->hd.stream_id);
- }
- case NGHTTP2_RST_STREAM: {
- return apr_snprintf(buffer, maxlen,
- "RST_STREAM[length=%d, flags=%d, stream=%d]",
- (int)frame->hd.length,
- frame->hd.flags, frame->hd.stream_id);
+ while ((response = h2_stream_get_unsent_response(stream)) != NULL) {
+ nghttp2_data_provider provider, *pprovider = NULL;
+ h2_ngheader *ngh;
+ const h2_priority *prio;
+
+ if (stream->submitted) {
+ rv = NGHTTP2_PROTOCOL_ERROR;
+ goto leave;
}
- case NGHTTP2_SETTINGS: {
- if (frame->hd.flags & NGHTTP2_FLAG_ACK) {
- return apr_snprintf(buffer, maxlen,
- "SETTINGS[ack=1, stream=%d]",
- frame->hd.stream_id);
- }
- return apr_snprintf(buffer, maxlen,
- "SETTINGS[length=%d, stream=%d]",
- (int)frame->hd.length, frame->hd.stream_id);
+
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03073)
+ "h2_stream(%ld-%d): submit response %d, REMOTE_WINDOW_SIZE=%u",
+ session->id, stream->id, response->http_status,
+ (unsigned int)nghttp2_session_get_stream_remote_window_size(session->ngh2, stream->id));
+
+ if (response->content_length != 0) {
+ memset(&provider, 0, sizeof(provider));
+ provider.source.fd = stream->id;
+ provider.read_callback = stream_data_cb;
+ pprovider = &provider;
}
- case NGHTTP2_PUSH_PROMISE: {
- return apr_snprintf(buffer, maxlen,
- "PUSH_PROMISE[length=%d, hend=%d, stream=%d]",
- (int)frame->hd.length,
- !!(frame->hd.flags & NGHTTP2_FLAG_END_HEADERS),
- frame->hd.stream_id);
+
+ /* If this stream is not a pushed one itself,
+ * and HTTP/2 server push is enabled here,
+ * and the response is in the range 200-299 *),
+ * and the remote side has pushing enabled,
+ * -> find and perform any pushes on this stream
+ * *before* we submit the stream response itself.
+ * This helps clients avoid opening new streams on Link
+ * headers that get pushed right afterwards.
+ *
+ * *) the response code is relevant, as we do not want to
+ * make pushes on 401 or 403 codes, neiterh on 301/302
+ * and friends. And if we see a 304, we do not push either
+ * as the client, having this resource in its cache, might
+ * also have the pushed ones as well.
+ */
+ if (stream->request
+ && !stream->request->initiated_on
+ && h2_response_is_final(response)
+ && H2_HTTP_2XX(response->http_status)
+ && h2_session_push_enabled(session)) {
+
+ h2_stream_submit_pushes(stream);
}
- case NGHTTP2_PING: {
- return apr_snprintf(buffer, maxlen,
- "PING[length=%d, ack=%d, stream=%d]",
- (int)frame->hd.length,
- frame->hd.flags&NGHTTP2_FLAG_ACK,
- frame->hd.stream_id);
+
+ prio = h2_stream_get_priority(stream);
+ if (prio) {
+ h2_session_set_prio(session, stream, prio);
}
- case NGHTTP2_GOAWAY: {
- size_t len = (frame->goaway.opaque_data_len < s_len)?
- frame->goaway.opaque_data_len : s_len-1;
- memcpy(scratch, frame->goaway.opaque_data, len);
- scratch[len+1] = '\0';
- return apr_snprintf(buffer, maxlen, "GOAWAY[error=%d, reason='%s']",
- frame->goaway.error_code, scratch);
+
+ ngh = h2_util_ngheader_make_res(stream->pool, response->http_status,
+ response->headers);
+ rv = nghttp2_submit_response(session->ngh2, response->stream_id,
+ ngh->nv, ngh->nvlen, pprovider);
+ stream->submitted = h2_response_is_final(response);
+ session->have_written = 1;
+
+ if (stream->request && stream->request->initiated_on) {
+ ++session->pushes_submitted;
}
- case NGHTTP2_WINDOW_UPDATE: {
- return apr_snprintf(buffer, maxlen,
- "WINDOW_UPDATE[length=%d, stream=%d]",
- (int)frame->hd.length, frame->hd.stream_id);
+ else {
+ ++session->responses_submitted;
}
- default:
- return apr_snprintf(buffer, maxlen,
- "type=%d[length=%d, flags=%d, stream=%d]",
- frame->hd.type, (int)frame->hd.length,
- frame->hd.flags, frame->hd.stream_id);
}
-}
-
-int h2_session_push_enabled(h2_session *session)
-{
- /* iff we can and they can */
- return (h2_config_geti(session->config, H2_CONF_PUSH)
- && nghttp2_session_get_remote_settings(session->ngh2,
- NGHTTP2_SETTINGS_ENABLE_PUSH));
-}
-
-static apr_status_t h2_session_send(h2_session *session)
-{
- int rv = nghttp2_session_send(session->ngh2);
- if (rv != 0) {
- if (nghttp2_is_fatal(rv)) {
- ap_log_cerror( APLOG_MARK, APLOG_DEBUG, 0, session->c,
- "h2_session: send gave error=%s", nghttp2_strerror(rv));
- h2_session_shutdown(session, rv);
- return APR_EGENERAL;
- }
+
+leave:
+ if (nghttp2_is_fatal(rv)) {
+ status = APR_EGENERAL;
+ dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, rv, nghttp2_strerror(rv));
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c,
+ APLOGNO(02940) "submit_response: %s",
+ nghttp2_strerror(rv));
}
- session->unsent_promises = 0;
- session->unsent_submits = 0;
+ ++session->unsent_submits;
- return APR_SUCCESS;
+ /* Unsent push promises are written immediately, as nghttp2
+ * 1.5.0 realizes internal stream data structures only on
+ * send and we might need them for other submits.
+ * Also, to conserve memory, we send at least every 10 submits
+ * so that nghttp2 does not buffer all outbound items too
+ * long.
+ */
+ if (status == APR_SUCCESS
+ && (session->unsent_promises || session->unsent_submits > 10)) {
+ status = h2_session_send(session);
+ }
+ return status;
}
static apr_status_t h2_session_receive(void *ctx, const char *data,
apr_size_t len, apr_size_t *readlen)
{
h2_session *session = ctx;
+ ssize_t n;
+
if (len > 0) {
- ssize_t n = nghttp2_session_mem_recv(session->ngh2,
- (const uint8_t *)data, len);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ "h2_session(%ld): feeding %ld bytes to nghttp2",
+ session->id, (long)len);
+ n = nghttp2_session_mem_recv(session->ngh2, (const uint8_t *)data, len);
if (n < 0) {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, APR_EGENERAL,
- session->c,
- "h2_session: nghttp2_session_mem_recv error=%d",
- (int)n);
if (nghttp2_is_fatal((int)n)) {
- h2_session_shutdown(session, (int)n);
+ dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, (int)n, nghttp2_strerror(n));
return APR_EGENERAL;
}
}
else {
*readlen = n;
+ session->io.bytes_read += n;
}
}
return APR_SUCCESS;
}
-static apr_status_t h2_session_read(h2_session *session, int block, int loops)
+static apr_status_t h2_session_read(h2_session *session, int block)
{
apr_status_t status, rstatus = APR_EAGAIN;
conn_rec *c = session->c;
- int i;
+ apr_off_t read_start = session->io.bytes_read;
- for (i = 0; i < loops; ++i) {
+ while (1) {
/* H2_IN filter handles all incoming data against the session.
* We just pull at the filter chain to make it happen */
status = ap_get_brigade(c->input_filters,
/* successful read, reset our idle timers */
rstatus = APR_SUCCESS;
if (block) {
- /* successfull blocked read, try unblocked to
+ /* successful blocked read, try unblocked to
* get more. */
block = 0;
}
case APR_TIMEUP:
return status;
default:
- if (!i) {
+ if (session->io.bytes_read == read_start) {
/* first attempt failed */
if (APR_STATUS_IS_ETIMEDOUT(status)
|| APR_STATUS_IS_ECONNABORTED(status)
}
else {
/* uncommon status, log on INFO so that we see this */
- ap_log_cerror( APLOG_MARK, APLOG_INFO, status, c,
+ ap_log_cerror( APLOG_MARK, APLOG_DEBUG, status, c,
APLOGNO(02950)
"h2_session(%ld): error reading, terminating",
session->id);
* status. */
return rstatus;
}
+ if (!is_accepting_streams(session)) {
+ break;
+ }
+ if ((session->io.bytes_read - read_start) > (64*1024)) {
+ /* read enough in one go, give write a chance */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c,
+ "h2_session(%ld): read 64k, returning", session->id);
+ break;
+ }
}
return rstatus;
}
-static apr_status_t h2_session_submit(h2_session *session)
+static int unsubmitted_iter(void *ctx, void *val)
{
- apr_status_t status = APR_EAGAIN;
- h2_stream *stream;
-
- if (h2_stream_set_has_unsubmitted(session->streams)) {
- /* If we have responses ready, submit them now. */
- while ((stream = h2_mplx_next_submit(session->mplx, session->streams))) {
- status = submit_response(session, stream);
- ++session->unsent_submits;
-
- /* Unsent push promises are written immediately, as nghttp2
- * 1.5.0 realizes internal stream data structures only on
- * send and we might need them for other submits.
- * Also, to conserve memory, we send at least every 10 submits
- * so that nghttp2 does not buffer all outbound items too
- * long.
+ h2_stream *stream = val;
+ if (h2_stream_needs_submit(stream)) {
+ *((int *)ctx) = 1;
+ return 0;
+ }
+ return 1;
+}
+
+static int has_unsubmitted_streams(h2_session *session)
+{
+ int has_unsubmitted = 0;
+ h2_ihash_iter(session->streams, unsubmitted_iter, &has_unsubmitted);
+ return has_unsubmitted;
+}
+
+static int suspended_iter(void *ctx, void *val)
+{
+ h2_stream *stream = val;
+ if (h2_stream_is_suspended(stream)) {
+ *((int *)ctx) = 1;
+ return 0;
+ }
+ return 1;
+}
+
+static int has_suspended_streams(h2_session *session)
+{
+ int has_suspended = 0;
+ h2_ihash_iter(session->streams, suspended_iter, &has_suspended);
+ return has_suspended;
+}
+
+static const char *StateNames[] = {
+ "INIT", /* H2_SESSION_ST_INIT */
+ "DONE", /* H2_SESSION_ST_DONE */
+ "IDLE", /* H2_SESSION_ST_IDLE */
+ "BUSY", /* H2_SESSION_ST_BUSY */
+ "WAIT", /* H2_SESSION_ST_WAIT */
+ "LSHUTDOWN", /* H2_SESSION_ST_LOCAL_SHUTDOWN */
+ "RSHUTDOWN", /* H2_SESSION_ST_REMOTE_SHUTDOWN */
+};
+
+static const char *state_name(h2_session_state state)
+{
+ if (state >= (sizeof(StateNames)/sizeof(StateNames[0]))) {
+ return "unknown";
+ }
+ return StateNames[state];
+}
+
+static int is_accepting_streams(h2_session *session)
+{
+ switch (session->state) {
+ case H2_SESSION_ST_IDLE:
+ case H2_SESSION_ST_BUSY:
+ case H2_SESSION_ST_WAIT:
+ return 1;
+ default:
+ return 0;
+ }
+}
+
+static void update_child_status(h2_session *session, int status, const char *msg)
+{
+ /* Assume that we also change code/msg when something really happened and
+ * avoid updating the scoreboard in between */
+ if (session->last_status_code != status
+ || session->last_status_msg != msg) {
+ apr_snprintf(session->status, sizeof(session->status),
+ "%s, streams: %d/%d/%d/%d/%d (open/recv/resp/push/rst)",
+ msg? msg : "-",
+ (int)session->open_streams,
+ (int)session->remote.emitted_count,
+ (int)session->responses_submitted,
+ (int)session->pushes_submitted,
+ (int)session->pushes_reset + session->streams_reset);
+ ap_update_child_status_descr(session->c->sbh, status, session->status);
+ }
+}
+
+static void transit(h2_session *session, const char *action, h2_session_state nstate)
+{
+ if (session->state != nstate) {
+ int loglvl = APLOG_DEBUG;
+ if ((session->state == H2_SESSION_ST_BUSY && nstate == H2_SESSION_ST_WAIT)
+ || (session->state == H2_SESSION_ST_WAIT && nstate == H2_SESSION_ST_BUSY)){
+ loglvl = APLOG_TRACE1;
+ }
+ ap_log_cerror(APLOG_MARK, loglvl, 0, session->c, APLOGNO(03078)
+ "h2_session(%ld): transit [%s] -- %s --> [%s]", session->id,
+ state_name(session->state), action, state_name(nstate));
+ session->state = nstate;
+ switch (session->state) {
+ case H2_SESSION_ST_IDLE:
+ update_child_status(session, (session->open_streams == 0?
+ SERVER_BUSY_KEEPALIVE
+ : SERVER_BUSY_READ), "idle");
+ break;
+ case H2_SESSION_ST_REMOTE_SHUTDOWN:
+ update_child_status(session, SERVER_CLOSING, "remote goaway");
+ break;
+ case H2_SESSION_ST_LOCAL_SHUTDOWN:
+ update_child_status(session, SERVER_CLOSING, "local goaway");
+ break;
+ case H2_SESSION_ST_DONE:
+ update_child_status(session, SERVER_CLOSING, "done");
+ break;
+ default:
+ /* nop */
+ break;
+ }
+ }
+}
+
+static void h2_session_ev_init(h2_session *session, int arg, const char *msg)
+{
+ switch (session->state) {
+ case H2_SESSION_ST_INIT:
+ transit(session, "init", H2_SESSION_ST_BUSY);
+ break;
+ default:
+ /* nop */
+ break;
+ }
+}
+
+static void h2_session_ev_local_goaway(h2_session *session, int arg, const char *msg)
+{
+ session->local.accepting = 0;
+ cleanup_streams(session);
+ switch (session->state) {
+ case H2_SESSION_ST_LOCAL_SHUTDOWN:
+ /* already did that? */
+ break;
+ case H2_SESSION_ST_IDLE:
+ case H2_SESSION_ST_REMOTE_SHUTDOWN:
+ /* all done */
+ transit(session, "local goaway", H2_SESSION_ST_DONE);
+ break;
+ default:
+ transit(session, "local goaway", H2_SESSION_ST_LOCAL_SHUTDOWN);
+ break;
+ }
+}
+
+static void h2_session_ev_remote_goaway(h2_session *session, int arg, const char *msg)
+{
+ session->remote.accepting = 0;
+ cleanup_streams(session);
+ switch (session->state) {
+ case H2_SESSION_ST_REMOTE_SHUTDOWN:
+ /* already received that? */
+ break;
+ case H2_SESSION_ST_IDLE:
+ case H2_SESSION_ST_LOCAL_SHUTDOWN:
+ /* all done */
+ transit(session, "remote goaway", H2_SESSION_ST_DONE);
+ break;
+ default:
+ transit(session, "remote goaway", H2_SESSION_ST_REMOTE_SHUTDOWN);
+ break;
+ }
+}
+
+static void h2_session_ev_conn_error(h2_session *session, int arg, const char *msg)
+{
+ switch (session->state) {
+ case H2_SESSION_ST_INIT:
+ case H2_SESSION_ST_DONE:
+ case H2_SESSION_ST_LOCAL_SHUTDOWN:
+ /* just leave */
+ transit(session, "conn error", H2_SESSION_ST_DONE);
+ break;
+
+ default:
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03401)
+ "h2_session(%ld): conn error -> shutdown", session->id);
+ h2_session_shutdown(session, arg, msg, 0);
+ break;
+ }
+}
+
+static void h2_session_ev_proto_error(h2_session *session, int arg, const char *msg)
+{
+ switch (session->state) {
+ case H2_SESSION_ST_DONE:
+ case H2_SESSION_ST_LOCAL_SHUTDOWN:
+ /* just leave */
+ transit(session, "proto error", H2_SESSION_ST_DONE);
+ break;
+
+ default:
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03402)
+ "h2_session(%ld): proto error -> shutdown", session->id);
+ h2_session_shutdown(session, arg, msg, 0);
+ break;
+ }
+}
+
+static void h2_session_ev_conn_timeout(h2_session *session, int arg, const char *msg)
+{
+ switch (session->state) {
+ case H2_SESSION_ST_LOCAL_SHUTDOWN:
+ transit(session, "conn timeout", H2_SESSION_ST_DONE);
+ break;
+ default:
+ h2_session_shutdown(session, arg, msg, 1);
+ transit(session, "conn timeout", H2_SESSION_ST_DONE);
+ break;
+ }
+}
+
+static void h2_session_ev_no_io(h2_session *session, int arg, const char *msg)
+{
+ switch (session->state) {
+ case H2_SESSION_ST_BUSY:
+ case H2_SESSION_ST_LOCAL_SHUTDOWN:
+ case H2_SESSION_ST_REMOTE_SHUTDOWN:
+ /* Nothing to READ, nothing to WRITE on the master connection.
+ * Possible causes:
+ * - we wait for the client to send us sth
+ * - we wait for started tasks to produce output
+ * - we have finished all streams and the client has sent GO_AWAY
*/
- if (status == APR_SUCCESS
- && (session->unsent_promises || session->unsent_submits > 10)) {
- status = h2_session_send(session);
- if (status != APR_SUCCESS) {
- break;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ "h2_session(%ld): NO_IO event, %d streams open",
+ session->id, session->open_streams);
+ if (session->open_streams > 0) {
+ if (has_unsubmitted_streams(session)
+ || has_suspended_streams(session)) {
+ /* waiting for at least one stream to produce data */
+ transit(session, "no io", H2_SESSION_ST_WAIT);
+ }
+ else {
+ /* we have streams open, and all are submitted and none
+ * is suspended. The only thing keeping us from WRITEing
+ * more must be the flow control.
+ * This means we only wait for WINDOW_UPDATE from the
+ * client and can block on READ. */
+ transit(session, "no io (flow wait)", H2_SESSION_ST_IDLE);
+ session->idle_until = apr_time_now() + session->s->timeout;
+ session->keep_sync_until = session->idle_until;
+ /* Make sure we have flushed all previously written output
+ * so that the client will react. */
+ if (h2_conn_io_flush(&session->io) != APR_SUCCESS) {
+ dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+ return;
+ }
}
}
- }
+ else if (is_accepting_streams(session)) {
+ /* When we have no streams, but accept new, switch to idle */
+ apr_time_t now = apr_time_now();
+ transit(session, "no io (keepalive)", H2_SESSION_ST_IDLE);
+ session->idle_until = (session->remote.emitted_count?
+ session->s->keep_alive_timeout :
+ session->s->timeout) + now;
+ session->keep_sync_until = now + apr_time_from_sec(1);
+ }
+ else {
+ /* We are no longer accepting new streams and there are
+ * none left. Time to leave. */
+ h2_session_shutdown(session, arg, msg, 0);
+ transit(session, "no io", H2_SESSION_ST_DONE);
+ }
+ break;
+ default:
+ /* nop */
+ break;
+ }
+}
+
+static void h2_session_ev_stream_ready(h2_session *session, int arg, const char *msg)
+{
+ switch (session->state) {
+ case H2_SESSION_ST_WAIT:
+ transit(session, "stream ready", H2_SESSION_ST_BUSY);
+ break;
+ default:
+ /* nop */
+ break;
+ }
+}
+
+static void h2_session_ev_data_read(h2_session *session, int arg, const char *msg)
+{
+ switch (session->state) {
+ case H2_SESSION_ST_IDLE:
+ case H2_SESSION_ST_WAIT:
+ transit(session, "data read", H2_SESSION_ST_BUSY);
+ break;
+ default:
+ /* nop */
+ break;
+ }
+}
+
+static void h2_session_ev_ngh2_done(h2_session *session, int arg, const char *msg)
+{
+ switch (session->state) {
+ case H2_SESSION_ST_DONE:
+ /* nop */
+ break;
+ default:
+ transit(session, "nghttp2 done", H2_SESSION_ST_DONE);
+ break;
+ }
+}
+
+static void h2_session_ev_mpm_stopping(h2_session *session, int arg, const char *msg)
+{
+ switch (session->state) {
+ case H2_SESSION_ST_DONE:
+ case H2_SESSION_ST_LOCAL_SHUTDOWN:
+ /* nop */
+ break;
+ default:
+ h2_session_shutdown(session, arg, msg, 0);
+ break;
+ }
+}
+
+static void h2_session_ev_pre_close(h2_session *session, int arg, const char *msg)
+{
+ switch (session->state) {
+ case H2_SESSION_ST_DONE:
+ case H2_SESSION_ST_LOCAL_SHUTDOWN:
+ /* nop */
+ break;
+ default:
+ h2_session_shutdown(session, arg, msg, 1);
+ break;
+ }
+}
+
+static void h2_session_ev_stream_open(h2_session *session, int arg, const char *msg)
+{
+ ++session->open_streams;
+ switch (session->state) {
+ case H2_SESSION_ST_IDLE:
+ if (session->open_streams == 1) {
+ /* enter tiomeout, since we have a stream again */
+ session->idle_until = (session->s->timeout + apr_time_now());
+ }
+ break;
+ default:
+ break;
+ }
+}
+
+static void h2_session_ev_stream_done(h2_session *session, int arg, const char *msg)
+{
+ --session->open_streams;
+ switch (session->state) {
+ case H2_SESSION_ST_IDLE:
+ if (session->open_streams == 0) {
+ /* enter keepalive timeout, since we no longer have streams */
+ session->idle_until = (session->s->keep_alive_timeout
+ + apr_time_now());
+ }
+ break;
+ default:
+ break;
+ }
+}
+
+static void dispatch_event(h2_session *session, h2_session_event_t ev,
+ int arg, const char *msg)
+{
+ switch (ev) {
+ case H2_SESSION_EV_INIT:
+ h2_session_ev_init(session, arg, msg);
+ break;
+ case H2_SESSION_EV_LOCAL_GOAWAY:
+ h2_session_ev_local_goaway(session, arg, msg);
+ break;
+ case H2_SESSION_EV_REMOTE_GOAWAY:
+ h2_session_ev_remote_goaway(session, arg, msg);
+ break;
+ case H2_SESSION_EV_CONN_ERROR:
+ h2_session_ev_conn_error(session, arg, msg);
+ break;
+ case H2_SESSION_EV_PROTO_ERROR:
+ h2_session_ev_proto_error(session, arg, msg);
+ break;
+ case H2_SESSION_EV_CONN_TIMEOUT:
+ h2_session_ev_conn_timeout(session, arg, msg);
+ break;
+ case H2_SESSION_EV_NO_IO:
+ h2_session_ev_no_io(session, arg, msg);
+ break;
+ case H2_SESSION_EV_STREAM_READY:
+ h2_session_ev_stream_ready(session, arg, msg);
+ break;
+ case H2_SESSION_EV_DATA_READ:
+ h2_session_ev_data_read(session, arg, msg);
+ break;
+ case H2_SESSION_EV_NGH2_DONE:
+ h2_session_ev_ngh2_done(session, arg, msg);
+ break;
+ case H2_SESSION_EV_MPM_STOPPING:
+ h2_session_ev_mpm_stopping(session, arg, msg);
+ break;
+ case H2_SESSION_EV_PRE_CLOSE:
+ h2_session_ev_pre_close(session, arg, msg);
+ break;
+ case H2_SESSION_EV_STREAM_OPEN:
+ h2_session_ev_stream_open(session, arg, msg);
+ break;
+ case H2_SESSION_EV_STREAM_DONE:
+ h2_session_ev_stream_done(session, arg, msg);
+ break;
+ default:
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+ "h2_session(%ld): unknown event %d",
+ session->id, ev);
+ break;
+ }
+
+ if (session->state == H2_SESSION_ST_DONE) {
+ h2_mplx_abort(session->mplx);
}
- return status;
}
static const int MAX_WAIT_MICROS = 200 * 1000;
{
apr_status_t status = APR_SUCCESS;
conn_rec *c = session->c;
- int rv, have_written, have_read, remain_secs;
- const char *reason = "";
+ int rv, mpm_state, trace = APLOGctrace3(c);
- ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
- "h2_session(%ld): process start, async=%d", session->id, async);
+ if (trace) {
+ ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+ "h2_session(%ld): process start, async=%d",
+ session->id, async);
+ }
+ if (c->cs) {
+ c->cs->state = CONN_STATE_WRITE_COMPLETION;
+ }
+
while (1) {
- have_read = have_written = 0;
+ trace = APLOGctrace3(c);
+ session->have_read = session->have_written = 0;
- if (session->aborted) {
- reason = "aborted";
- status = APR_ECONNABORTED;
- goto out;
+ if (!ap_mpm_query(AP_MPMQ_MPM_STATE, &mpm_state)) {
+ if (mpm_state == AP_MPMQ_STOPPING) {
+ dispatch_event(session, H2_SESSION_EV_MPM_STOPPING, 0, NULL);
+ break;
+ }
}
+ session->status[0] = '\0';
+
switch (session->state) {
case H2_SESSION_ST_INIT:
+ ap_update_child_status_from_conn(c->sbh, SERVER_BUSY_READ, c);
if (!h2_is_acceptable_connection(c, 1)) {
- nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, 0,
- NGHTTP2_INADEQUATE_SECURITY, NULL, 0);
- nghttp2_session_send(session->ngh2);
- session->server_goaway = 1;
+ update_child_status(session, SERVER_BUSY_READ, "inadequate security");
+ h2_session_shutdown(session, NGHTTP2_INADEQUATE_SECURITY, NULL, 1);
}
-
- status = h2_session_start(session, &rv);
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c,
- "h2_session(%ld): started on %s:%d", session->id,
- session->s->server_hostname,
- c->local_addr->port);
- if (status != APR_SUCCESS) {
- reason = "start failed";
- goto out;
+ else {
+ update_child_status(session, SERVER_BUSY_READ, "init");
+ status = h2_session_start(session, &rv);
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c, APLOGNO(03079)
+ "h2_session(%ld): started on %s:%d", session->id,
+ session->s->server_hostname,
+ c->local_addr->port);
+ if (status != APR_SUCCESS) {
+ dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+ }
+ dispatch_event(session, H2_SESSION_EV_INIT, 0, NULL);
}
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
- "h2_session(%ld): INIT -> BUSY", session->id);
- session->state = H2_SESSION_ST_BUSY;
break;
- case H2_SESSION_ST_IDLE_READ:
- h2_filter_cin_timeout_set(session->cin, session->timeout_secs);
- ap_update_child_status(c->sbh, SERVER_BUSY_READ, NULL);
- status = h2_session_read(session, 1, 10);
- if (APR_STATUS_IS_TIMEUP(status)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
- "h2_session(%ld): IDLE -> KEEPALIVE", session->id);
- session->state = H2_SESSION_ST_KEEPALIVE;
- }
- else if (status == APR_SUCCESS) {
- /* got something, go busy again */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
- "h2_session(%ld): IDLE -> BUSY", session->id);
- session->state = H2_SESSION_ST_BUSY;
+ case H2_SESSION_ST_IDLE:
+ /* make certain, we send everything before we idle */
+ h2_conn_io_flush(&session->io);
+ if (!session->keep_sync_until && async && !session->open_streams
+ && !session->r && session->remote.emitted_count) {
+ if (trace) {
+ ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+ "h2_session(%ld): async idle, nonblock read, "
+ "%d streams open", session->id,
+ session->open_streams);
+ }
+ /* We do not return to the async mpm immediately, since under
+ * load, mpms show the tendency to throw keep_alive connections
+ * away very rapidly.
+ * So, if we are still processing streams, we wait for the
+ * normal timeout first and, on timeout, close.
+ * If we have no streams, we still wait a short amount of
+ * time here for the next frame to arrive, before handing
+ * it to keep_alive processing of the mpm.
+ */
+ status = h2_session_read(session, 0);
+
+ if (status == APR_SUCCESS) {
+ session->have_read = 1;
+ dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
+ }
+ else if (APR_STATUS_IS_EAGAIN(status) || APR_STATUS_IS_TIMEUP(status)) {
+ if (apr_time_now() > session->idle_until) {
+ dispatch_event(session, H2_SESSION_EV_CONN_TIMEOUT, 0, NULL);
+ }
+ else {
+ status = APR_EAGAIN;
+ goto out;
+ }
+ }
+ else {
+ ap_log_cerror( APLOG_MARK, APLOG_DEBUG, status, c,
+ APLOGNO(03403)
+ "h2_session(%ld): idle, no data, error",
+ session->id);
+ dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, "timeout");
+ }
}
else {
- reason = "keepalive error";
- goto out;
+ if (trace) {
+ ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+ "h2_session(%ld): sync idle, stutter 1-sec, "
+ "%d streams open", session->id,
+ session->open_streams);
+ }
+ /* We wait in smaller increments, using a 1 second timeout.
+ * That gives us the chance to check for MPMQ_STOPPING often.
+ */
+ status = h2_mplx_idle(session->mplx);
+ if (status != APR_SUCCESS) {
+ dispatch_event(session, H2_SESSION_EV_CONN_ERROR,
+ H2_ERR_ENHANCE_YOUR_CALM, "less is more");
+ }
+ h2_filter_cin_timeout_set(session->cin, apr_time_from_sec(1));
+ status = h2_session_read(session, 1);
+ if (status == APR_SUCCESS) {
+ session->have_read = 1;
+ dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
+ }
+ else if (status == APR_EAGAIN) {
+ /* nothing to read */
+ }
+ else if (APR_STATUS_IS_TIMEUP(status)) {
+ apr_time_t now = apr_time_now();
+ if (now > session->keep_sync_until) {
+ /* if we are on an async mpm, now is the time that
+ * we may dare to pass control to it. */
+ session->keep_sync_until = 0;
+ }
+ if (now > session->idle_until) {
+ if (trace) {
+ ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+ "h2_session(%ld): keepalive timeout",
+ session->id);
+ }
+ dispatch_event(session, H2_SESSION_EV_CONN_TIMEOUT, 0, "timeout");
+ }
+ else if (trace) {
+ ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+ "h2_session(%ld): keepalive, %f sec left",
+ session->id, (session->idle_until - now) / 1000000.0f);
+ }
+ /* continue reading handling */
+ }
+ else {
+ if (trace) {
+ ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+ "h2_session(%ld): idle(1 sec timeout) "
+ "read failed", session->id);
+ }
+ dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, "error");
+ }
}
+
break;
case H2_SESSION_ST_BUSY:
+ case H2_SESSION_ST_LOCAL_SHUTDOWN:
+ case H2_SESSION_ST_REMOTE_SHUTDOWN:
if (nghttp2_session_want_read(session->ngh2)) {
- h2_filter_cin_timeout_set(session->cin, session->timeout_secs);
- status = h2_session_read(session, 0, 10);
+ ap_update_child_status(session->c->sbh, SERVER_BUSY_READ, NULL);
+ h2_filter_cin_timeout_set(session->cin, session->s->timeout);
+ status = h2_session_read(session, 0);
if (status == APR_SUCCESS) {
- /* got something, continue processing */
- have_read = 1;
+ session->have_read = 1;
+ dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
}
else if (status == APR_EAGAIN) {
/* nothing to read */
}
+ else if (APR_STATUS_IS_TIMEUP(status)) {
+ dispatch_event(session, H2_SESSION_EV_CONN_TIMEOUT, 0, NULL);
+ break;
+ }
else {
- reason = "busy read error";
- goto out;
+ dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
}
}
- if (!h2_stream_set_is_empty(session->streams)) {
- /* resume any streams for which data is available again */
- h2_session_resume_streams_with_data(session);
- /* Submit any responses/push_promises that are ready */
- status = h2_session_submit(session);
- if (status == APR_SUCCESS) {
- have_written = 1;
- }
- else if (status != APR_EAGAIN) {
- reason = "submit error";
- goto out;
- }
- /* send out window updates for our inputs */
- status = h2_mplx_in_update_windows(session->mplx);
- if (status != APR_SUCCESS && status != APR_EAGAIN) {
- reason = "window update error";
- goto out;
- }
+ /* trigger window updates, stream resumes and submits */
+ status = h2_mplx_dispatch_master_events(session->mplx,
+ on_stream_resume,
+ on_stream_response,
+ session);
+ if (status != APR_SUCCESS) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, c,
+ "h2_session(%ld): dispatch error",
+ session->id);
+ dispatch_event(session, H2_SESSION_EV_CONN_ERROR,
+ H2_ERR_INTERNAL_ERROR,
+ "dispatch error");
+ break;
}
if (nghttp2_session_want_write(session->ngh2)) {
+ ap_update_child_status(session->c->sbh, SERVER_BUSY_WRITE, NULL);
status = h2_session_send(session);
if (status != APR_SUCCESS) {
- reason = "send error";
- goto out;
+ dispatch_event(session, H2_SESSION_EV_CONN_ERROR,
+ H2_ERR_INTERNAL_ERROR, "writing");
+ break;
}
- have_written = 1;
}
- if (have_read || have_written) {
- session->wait_us = 0;
- }
- else {
- /* nothing for input and output to do. If we remain
- * in this state, we go into a tight loop and suck up
- * CPU cycles.
- * Ideally, we'd like to do a blocking read, but that
- * is not possible if we have scheduled tasks and wait
- * for them to produce something. */
- if (h2_stream_set_is_empty(session->streams)) {
- /* When we have no streams, no task event are possible,
- * switch to blocking reads */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
- "h2_session(%ld): BUSY -> IDLE", session->id);
- session->state = H2_SESSION_ST_IDLE_READ;
- }
- else if (!h2_stream_set_has_unsubmitted(session->streams)
- && !h2_stream_set_has_suspended(session->streams)) {
- /* none of our streams is waiting for a response or
- * new output data from task processing,
- * switch to blocking reads. */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
- "h2_session(%ld): BUSY -> IDLE", session->id);
- session->state = H2_SESSION_ST_IDLE_READ;
- }
- else {
- /* Unable to do blocking reads, as we wait on events from
- * task processing in other threads. Do a busy wait with
- * backoff timer. */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
- "h2_session(%ld): BUSY -> WAIT", session->id);
- session->state = H2_SESSION_ST_BUSY_WAIT;
+ if (session->have_read || session->have_written) {
+ if (session->wait_us) {
+ session->wait_us = 0;
}
}
+ else if (!nghttp2_session_want_write(session->ngh2)) {
+ dispatch_event(session, H2_SESSION_EV_NO_IO, 0, NULL);
+ }
break;
- case H2_SESSION_ST_BUSY_WAIT:
- session->wait_us = H2MAX(session->wait_us, 10);
- if (APLOGctrace1(c)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+ case H2_SESSION_ST_WAIT:
+ if (session->wait_us <= 0) {
+ session->wait_us = 10;
+ if (h2_conn_io_flush(&session->io) != APR_SUCCESS) {
+ dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+ break;
+ }
+ }
+ else {
+ /* repeating, increase timer for graceful backoff */
+ session->wait_us = H2MIN(session->wait_us*2, MAX_WAIT_MICROS);
+ }
+
+ if (trace) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, c,
"h2_session: wait for data, %ld micros",
(long)session->wait_us);
}
-
- h2_conn_io_flush(&session->io);
- ap_log_cerror( APLOG_MARK, APLOG_TRACE2, status, c,
- "h2_session(%ld): process -> trywait", session->id);
status = h2_mplx_out_trywait(session->mplx, session->wait_us,
session->iowait);
if (status == APR_SUCCESS) {
- /* got something, go busy again */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
- "h2_session(%ld): WAIT -> BUSY", session->id);
- session->state = H2_SESSION_ST_BUSY;
- }
- else if (status == APR_TIMEUP) {
- if (nghttp2_session_want_read(session->ngh2)) {
- status = h2_session_read(session, 0, 1);
- if (status == APR_SUCCESS) {
- /* got something, go busy again */
- session->wait_us = 0;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
- "h2_session(%ld): WAIT -> BUSY", session->id);
- session->state = H2_SESSION_ST_BUSY;
- }
- else if (status != APR_EAGAIN) {
- reason = "busy read error";
- goto out;
- }
- }
- /* nothing, increase timer for graceful backup */
- session->wait_us = H2MIN(session->wait_us*2, MAX_WAIT_MICROS);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
- "h2_session(%ld): WAIT -> BUSY", session->id);
- session->state = H2_SESSION_ST_BUSY;
- }
- else {
- reason = "busy wait error";
- goto out;
- }
- break;
-
- case H2_SESSION_ST_KEEPALIVE:
- /* Our normal H2Timeout has passed and we are considering to
- * extend that with the H2KeepAliveTimeout. */
- remain_secs = session->keepalive_secs - session->timeout_secs;
- if (remain_secs <= 0) {
- /* keepalive is <= normal timeout, close the session */
- reason = "keepalive expired";
- h2_session_shutdown(session, 0);
- goto out;
+ session->wait_us = 0;
+ dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
}
- session->c->keepalive = AP_CONN_KEEPALIVE;
- ap_update_child_status_from_conn(c->sbh, SERVER_BUSY_KEEPALIVE, c);
-
- if ((apr_time_sec(session->s->keep_alive_timeout) >= remain_secs)
- && async && session->c->cs
- && !session->r) {
- /* Async MPMs are able to handle keep-alive connections without
- * blocking a thread. For this to happen, we need to return from
- * processing, indicating the IO event we are waiting for, and
- * may be called again if the event happens.
- * TODO: this does not properly GOAWAY connections...
- * TODO: This currently does not work on upgraded requests...
- */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
- "h2_session(%ld): async KEEPALIVE -> IDLE_READ", session->id);
- session->state = H2_SESSION_ST_IDLE_READ;
- session->c->cs->state = CONN_STATE_WRITE_COMPLETION;
- reason = "async keepalive";
- status = APR_SUCCESS;
- goto out;
+ else if (APR_STATUS_IS_TIMEUP(status)) {
+ /* go back to checking all inputs again */
+ transit(session, "wait cycle", session->local.accepting?
+ H2_SESSION_ST_BUSY : H2_SESSION_ST_LOCAL_SHUTDOWN);
}
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
- "h2_session(%ld): KEEPALIVE read", session->id);
- h2_filter_cin_timeout_set(session->cin, remain_secs);
- status = h2_session_read(session, 1, 1);
- if (APR_STATUS_IS_TIMEUP(status)) {
- reason = "keepalive expired";
- h2_session_shutdown(session, 0);
- goto out;
+ else if (APR_STATUS_IS_ECONNRESET(status)
+ || APR_STATUS_IS_ECONNABORTED(status)) {
+ dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
}
- else if (status != APR_SUCCESS) {
- reason = "keepalive error";
- goto out;
+ else {
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, c,
+ APLOGNO(03404)
+ "h2_session(%ld): waiting on conditional",
+ session->id);
+ h2_session_shutdown(session, H2_ERR_INTERNAL_ERROR,
+ "cond wait error", 0);
}
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
- "h2_session(%ld): KEEPALIVE -> BUSY", session->id);
- session->state = H2_SESSION_ST_BUSY;
break;
- case H2_SESSION_ST_CLOSING:
- if (nghttp2_session_want_write(session->ngh2)) {
- status = h2_session_send(session);
- if (status != APR_SUCCESS) {
- reason = "send error";
- goto out;
- }
- have_written = 1;
- }
- reason = "closing";
+ case H2_SESSION_ST_DONE:
+ status = APR_EOF;
goto out;
- case H2_SESSION_ST_ABORTED:
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c,
- "h2_session(%ld): processing ABORTED", session->id);
- return APR_ECONNABORTED;
-
default:
ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EGENERAL, c,
- "h2_session(%ld): state %d", session->id, session->state);
- return APR_EGENERAL;
+ APLOGNO(03080)
+ "h2_session(%ld): unknown state %d", session->id, session->state);
+ dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, 0, NULL);
+ break;
}
- if (!nghttp2_session_want_read(session->ngh2)
- && !nghttp2_session_want_write(session->ngh2)) {
- session->state = H2_SESSION_ST_CLOSING;
- }
-
- if (have_written) {
- h2_conn_io_flush(&session->io);
+ if (!nghttp2_session_want_read(session->ngh2)
+ && !nghttp2_session_want_write(session->ngh2)) {
+ dispatch_event(session, H2_SESSION_EV_NGH2_DONE, 0, NULL);
+ }
+ if (session->reprioritize) {
+ h2_mplx_reprioritize(session->mplx, stream_pri_cmp, session);
+ session->reprioritize = 0;
}
}
+
out:
- if (have_written) {
- h2_conn_io_flush(&session->io);
+ if (trace) {
+ ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+ "h2_session(%ld): [%s] process returns",
+ session->id, state_name(session->state));
+ }
+
+ if ((session->state != H2_SESSION_ST_DONE)
+ && (APR_STATUS_IS_EOF(status)
+ || APR_STATUS_IS_ECONNRESET(status)
+ || APR_STATUS_IS_ECONNABORTED(status))) {
+ dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+ }
+
+ status = (session->state == H2_SESSION_ST_DONE)? APR_EOF : APR_SUCCESS;
+ if (session->state == H2_SESSION_ST_DONE) {
+ if (!session->eoc_written) {
+ session->eoc_written = 1;
+ h2_conn_io_write_eoc(&session->io, session);
+ }
}
- ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
- "h2_session(%ld): process return, state %d, reason '%s'",
- session->id, session->state, reason);
+
return status;
}
+
+apr_status_t h2_session_pre_close(h2_session *session, int async)
+{
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+ "h2_session(%ld): pre_close", session->id);
+ dispatch_event(session, H2_SESSION_EV_PRE_CLOSE, 0, "timeout");
+ return APR_SUCCESS;
+}