static void dispatch_event(h2_session *session, h2_session_event_t ev,
int err, const char *msg);
+typedef struct stream_sel_ctx {
+ h2_session *session;
+ h2_stream *candidate;
+} stream_sel_ctx;
+
+static int find_cleanup_stream(void *udata, void *sdata)
+{
+ stream_sel_ctx *ctx = udata;
+ h2_stream *stream = sdata;
+ if (H2_STREAM_CLIENT_INITIATED(stream->id)) {
+ if (!ctx->session->local.accepting
+ && stream->id > ctx->session->local.accepted_max) {
+ ctx->candidate = stream;
+ return 0;
+ }
+ }
+ else {
+ if (!ctx->session->remote.accepting
+ && stream->id > ctx->session->remote.accepted_max) {
+ ctx->candidate = stream;
+ return 0;
+ }
+ }
+ return 1;
+}
+
+static void cleanup_streams(h2_session *session)
+{
+ stream_sel_ctx ctx;
+ ctx.session = session;
+ ctx.candidate = NULL;
+ while (1) {
+ h2_ihash_iter(session->streams, find_cleanup_stream, &ctx);
+ if (ctx.candidate) {
+ h2_session_stream_destroy(session, ctx.candidate);
+ ctx.candidate = NULL;
+ }
+ else {
+ break;
+ }
+ }
+}
+
h2_stream *h2_session_open_stream(h2_session *session, int stream_id)
{
h2_stream * stream;
stream = h2_stream_open(stream_id, stream_pool, session);
h2_ihash_add(session->streams, stream);
- if (H2_STREAM_CLIENT_INITIATED(stream_id)
- && stream_id > session->max_stream_received) {
- ++session->requests_received;
- session->max_stream_received = stream->id;
+ if (H2_STREAM_CLIENT_INITIATED(stream_id)) {
+ if (stream_id > session->remote.emitted_max) {
+ ++session->remote.emitted_count;
+ session->remote.emitted_max = stream->id;
+ session->local.accepted_max = stream->id;
+ }
+ }
+ else {
+ if (stream_id > session->local.emitted_max) {
+ ++session->local.emitted_count;
+ session->remote.emitted_max = stream->id;
+ }
}
return stream;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
"h2_stream(%ld-%d): handled, closing",
session->id, (int)stream->id);
- if (stream->id > session->max_stream_handled) {
- session->max_stream_handled = stream->id;
+ if (H2_STREAM_CLIENT_INITIATED(stream->id)) {
+ if (stream->id > session->local.completed_max) {
+ session->local.completed_max = stream->id;
+ }
}
}
else {
else {
s = h2_session_open_stream((h2_session *)userp, frame->hd.stream_id);
}
- return s? 0 : NGHTTP2_ERR_CALLBACK_FAILURE;
+ return s? 0 : NGHTTP2_ERR_START_STREAM_NOT_ALLOWED;
}
static int on_header_cb(nghttp2_session *ngh2, const nghttp2_frame *frame,
}
break;
case NGHTTP2_GOAWAY:
+ session->remote.accepted_max = frame->goaway.last_stream_id;
+ session->remote.error = frame->goaway.error_code;
dispatch_event(session, H2_SESSION_EV_REMOTE_GOAWAY, 0, NULL);
break;
default:
}
}
-static apr_status_t h2_session_shutdown(h2_session *session, int reason,
+static apr_status_t h2_session_shutdown(h2_session *session, int error,
const char *msg, int force_close)
{
apr_status_t status = APR_SUCCESS;
- const char *err = msg;
AP_DEBUG_ASSERT(session);
- if (!err && reason) {
- err = nghttp2_strerror(reason);
+ if (!msg && error) {
+ msg = nghttp2_strerror(error);
+ }
+
+ if (error || force_close) {
+ /* not a graceful shutdown, we want to leave...
+ * Do not start further streams that are waiting to be scheduled.
+ * Find out the max stream id that we habe been processed or
+ * are still actively working on.
+ * Remove all streams greater than this number without submitting
+ * a RST_STREAM frame, since that should be clear from the GOAWAY
+ * we send. */
+ session->local.accepted_max = h2_mplx_shutdown(session->mplx);
+ session->local.error = error;
+ }
+ else {
+ /* graceful shutdown. we will continue processing all streams
+ * we have, but no longer accept new ones. Report the max stream
+ * we have received and discard all new ones. */
}
nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE,
- h2_mplx_get_max_stream_started(session->mplx),
- reason, (uint8_t*)err, err? strlen(err):0);
+ session->local.accepted_max,
+ error, (uint8_t*)msg, msg? strlen(msg):0);
status = nghttp2_session_send(session->ngh2);
if (status == APR_SUCCESS) {
status = h2_conn_io_flush(&session->io);
}
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03069)
"session(%ld): sent GOAWAY, err=%d, msg=%s",
- session->id, reason, err? err : "");
- dispatch_event(session, H2_SESSION_EV_LOCAL_GOAWAY, reason, err);
+ session->id, error, msg? msg : "");
+ dispatch_event(session, H2_SESSION_EV_LOCAL_GOAWAY, error, msg);
if (force_close) {
h2_mplx_abort(session->mplx);
session->c = c;
session->r = r;
session->s = h2_ctx_server_get(ctx);
+ session->pool = pool;
session->config = h2_config_sget(session->s);
+ session->workers = workers;
session->state = H2_SESSION_ST_INIT;
+ session->local.accepting = 1;
+ session->remote.accepting = 1;
- session->pool = pool;
apr_pool_pre_cleanup_register(pool, session, session_pool_cleanup);
- session->max_stream_count = h2_config_geti(session->config, H2_CONF_MAX_STREAMS);
- session->max_stream_mem = h2_config_geti(session->config, H2_CONF_STREAM_MAX_MEM);
+ session->max_stream_count = h2_config_geti(session->config,
+ H2_CONF_MAX_STREAMS);
+ session->max_stream_mem = h2_config_geti(session->config,
+ H2_CONF_STREAM_MAX_MEM);
status = apr_thread_cond_create(&session->iowait, session->pool);
if (status != APR_SUCCESS) {
return NULL;
}
- session->streams = h2_ihash_create(session->pool,offsetof(h2_stream, id));
- session->workers = workers;
+ session->streams = h2_ihash_create(session->pool,
+ offsetof(h2_stream, id));
session->mplx = h2_mplx_create(c, session->pool, session->config,
session->s->timeout, workers);
h2_mplx_set_consumed_cb(session->mplx, update_window, session);
/* Install the connection input filter that feeds the session */
- session->cin = h2_filter_cin_create(session->pool, h2_session_receive, session);
+ session->cin = h2_filter_cin_create(session->pool,
+ h2_session_receive, session);
ap_add_input_filter("H2_IN", session->cin, r, c);
h2_conn_io_init(&session->io, c, session->config, session->pool);
h2_session_destroy(session);
return NULL;
}
- nghttp2_option_set_peer_max_concurrent_streams(options,
- (uint32_t)session->max_stream_count);
+ nghttp2_option_set_peer_max_concurrent_streams(
+ options, (uint32_t)session->max_stream_count);
/* We need to handle window updates ourself, otherwise we
* get flooded by nghttp2. */
nghttp2_option_set_no_auto_window_update(options, 1);
h2_stream *h2_session_get_stream(h2_session *session, int stream_id)
{
- if (!session->last_stream || stream_id != session->last_stream->id) {
- session->last_stream = h2_ihash_get(session->streams, stream_id);
- }
- return session->last_stream;
+ return h2_ihash_get(session->streams, stream_id);
}
static ssize_t stream_data_cb(nghttp2_session *ng2s,
h2_mplx_stream_done(session->mplx, stream->id, stream->rst_error);
}
- if (session->last_stream == stream) {
- session->last_stream = NULL;
- }
if (session->streams) {
h2_ihash_remove(session->streams, stream->id);
}
int h2_session_push_enabled(h2_session *session)
{
- /* iff we can and they can */
- return (h2_config_geti(session->config, H2_CONF_PUSH)
+ /* iff we can and they can and want */
+ return (session->remote.accepting /* remote GOAWAY received */
+ && h2_config_geti(session->config, H2_CONF_PUSH)
&& nghttp2_session_get_remote_settings(session->ngh2,
- NGHTTP2_SETTINGS_ENABLE_PUSH));
+ NGHTTP2_SETTINGS_ENABLE_PUSH));
}
static apr_status_t h2_session_send(h2_session *session)
static void h2_session_ev_local_goaway(h2_session *session, int arg, const char *msg)
{
- session->local_shutdown = 1;
+ session->local.accepting = 0;
+ cleanup_streams(session);
switch (session->state) {
case H2_SESSION_ST_LOCAL_SHUTDOWN:
/* already did that? */
static void h2_session_ev_remote_goaway(h2_session *session, int arg, const char *msg)
{
+ session->remote.accepting = 0;
+ cleanup_streams(session);
switch (session->state) {
case H2_SESSION_ST_REMOTE_SHUTDOWN:
/* already received that? */
/* When we have no streams, no task event are possible,
* switch to blocking reads */
transit(session, "no io", H2_SESSION_ST_IDLE);
- session->idle_until = (session->requests_received?
+ session->idle_until = (session->remote.emitted_count?
session->s->keep_alive_timeout :
session->s->timeout) + now;
session->keep_sync_until = now + apr_time_from_sec(1);
"%s, streams: %d/%d/%d/%d/%d (open/recv/resp/push/rst)",
msg? msg : "-",
(int)h2_ihash_count(session->streams),
- (int)session->requests_received,
+ (int)session->remote.emitted_count,
(int)session->responses_submitted,
(int)session->pushes_submitted,
(int)session->pushes_reset + session->streams_reset);
: SERVER_BUSY_READ), "idle");
/* make certain, the client receives everything before we idle */
if (!session->keep_sync_until
- && async && no_streams && !session->r && session->requests_received) {
+ && async && no_streams && !session->r && session->remote.emitted_count) {
ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
"h2_session(%ld): async idle, nonblock read", session->id);
/* We do not return to the async mpm immediately, since under
}
else if (status == APR_TIMEUP) {
/* go back to checking all inputs again */
- transit(session, "wait cycle", session->local_shutdown?
- H2_SESSION_ST_LOCAL_SHUTDOWN : H2_SESSION_ST_BUSY);
+ transit(session, "wait cycle", session->local.accepting?
+ H2_SESSION_ST_BUSY : H2_SESSION_ST_LOCAL_SHUTDOWN);
}
else {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, c,
&& !nghttp2_session_want_write(session->ngh2)) {
dispatch_event(session, H2_SESSION_EV_NGH2_DONE, 0, NULL);
}
+ if (session->reprioritize) {
+ h2_mplx_reprioritize(session->mplx, stream_pri_cmp, session);
+ session->reprioritize = 0;
+ }
}
out:
* of 'h2c', NULL otherwise */
server_rec *s; /* server/vhost we're starting on */
const struct h2_config *config; /* Relevant config for this session */
-
+ apr_pool_t *pool; /* pool to use in session */
+ struct h2_mplx *mplx; /* multiplexer for stream data */
+ struct h2_workers *workers; /* for executing stream tasks */
+ struct h2_filter_cin *cin; /* connection input filter context */
+ h2_conn_io io; /* io on httpd conn filters */
+ struct h2_ihash_t *streams; /* streams handled by this session */
+ struct nghttp2_session *ngh2; /* the nghttp2 session (internal use) */
+
h2_session_state state; /* state session is in */
+
+ h2_session_props local; /* properties of local session */
+ h2_session_props remote; /* properites of remote session */
+
unsigned int reprioritize : 1; /* scheduled streams priority changed */
unsigned int eoc_written : 1; /* h2 eoc bucket written */
unsigned int flush : 1; /* flushing output necessary */
- unsigned int local_shutdown: 1; /* GOAWAY has been sent by us */
apr_interval_time_t wait_us; /* timout during BUSY_WAIT state, micro secs */
+ struct h2_push_diary *push_diary; /* remember pushes, avoid duplicates */
+
int unsent_submits; /* number of submitted, but not yet written responses. */
int unsent_promises; /* number of submitted, but not yet written push promised */
- int requests_received; /* number of http/2 requests received */
int responses_submitted; /* number of http/2 responses submitted */
int streams_reset; /* number of http/2 streams reset by client */
int pushes_promised; /* number of http/2 push promises submitted */
apr_size_t frames_received; /* number of http/2 frames received */
apr_size_t frames_sent; /* number of http/2 frames sent */
- int max_stream_received; /* highest stream id created */
- int max_stream_handled; /* highest stream id completed */
-
apr_size_t max_stream_count; /* max number of open streams */
apr_size_t max_stream_mem; /* max buffer memory for a single stream */
apr_time_t idle_until; /* Time we shut down due to sheer boredom */
apr_time_t keep_sync_until; /* Time we sync wait until passing to async mpm */
- apr_pool_t *pool; /* pool to use in session handling */
apr_bucket_brigade *bbtmp; /* brigade for keeping temporary data */
struct apr_thread_cond_t *iowait; /* our cond when trywaiting for data */
- struct h2_filter_cin *cin; /* connection input filter context */
- h2_conn_io io; /* io on httpd conn filters */
-
- struct h2_mplx *mplx; /* multiplexer for stream data */
-
- struct h2_stream *last_stream; /* last stream worked with */
- struct h2_ihash_t *streams; /* streams handled by this session */
-
apr_pool_t *spare; /* spare stream pool */
- struct nghttp2_session *ngh2; /* the nghttp2 session (internal use) */
- struct h2_workers *workers; /* for executing stream tasks */
-
- struct h2_push_diary *push_diary; /* remember pushes, avoid duplicates */
-
char status[64]; /* status message for scoreboard */
int last_status_code; /* the one already reported */
const char *last_status_msg; /* the one already reported */