}
}
+static int purge_stream(void *ctx, void *val)
+{
+ h2_mplx *m = ctx;
+ h2_stream *stream = val;
+ h2_ihash_remove(m->spurge, stream->id);
+ h2_stream_destroy(stream);
+ return 0;
+}
+
+static void purge_streams(h2_mplx *m)
+{
+ if (!h2_ihash_empty(m->spurge)) {
+ while(!h2_ihash_iter(m->spurge, purge_stream, m)) {
+ /* repeat until empty */
+ }
+ h2_ihash_clear(m->spurge);
+ }
+}
+
static void h2_mplx_destroy(h2_mplx *m)
{
AP_DEBUG_ASSERT(m);
m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
m->streams = h2_ihash_create(m->pool, offsetof(h2_stream,id));
+ m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id));
+ m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id));
m->q = h2_iq_create(m->pool, m->max_streams);
m->tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id));
m->ready_tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id));
return max_stream_started;
}
-static void input_consumed_signal(h2_mplx *m, h2_task *task)
+static void input_consumed_signal(h2_mplx *m, h2_stream *stream)
{
- if (task->input.beam && task->worker_started) {
- h2_beam_send(task->input.beam, NULL, 0); /* trigger updates */
+ if (stream->input) {
+ h2_beam_send(stream->input, NULL, 0); /* trigger updates */
}
}
}
-static void task_destroy(h2_mplx *m, h2_task *task, int events)
+static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master)
{
conn_rec *slave = NULL;
int reuse_slave = 0;
"h2_task(%s): shutdown", task->id);
}
- if (events) {
+ if (called_from_master) {
/* Process outstanding events before destruction */
- input_consumed_signal(m, task);
+ h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
+ if (stream) {
+ input_consumed_signal(m, stream);
+ }
}
/* The pool is cleared/destroyed which also closes all
* allocated file handles. Give this count back to our
* file handle pool. */
- if (task->input.beam) {
- m->tx_handles_reserved +=
- h2_beam_get_files_beamed(task->input.beam);
- }
if (task->output.beam) {
m->tx_handles_reserved +=
h2_beam_get_files_beamed(task->output.beam);
static void stream_done(h2_mplx *m, h2_stream *stream, int rst_error)
{
- h2_task *task;
+ h2_task *task = h2_ihash_get(m->tasks, stream->id);
+ /* Situation: we are, on the master connection, done with processing
+ * the stream. Either we have handled it successfully, or the stream
+ * was reset by the client or the connection is gone and we are
+ * shutting down the whole session.
+ *
+ * We possibly have created a task for this stream to be processed
+ * on a slave connection. The processing might actually be ongoing
+ * right now or has already finished. A finished task waits for its
+ * stream to be done. This is the common case.
+ *
+ * If the stream had input (e.g. the request had a body), a task
+ * may have read, or is still reading buckets from the input beam.
+ * This means that the task is referencing memory from the stream's
+ * pool (or the master connection bucket alloc). Before we can free
+ * the stream pool, we need to make sure that those references are
+ * gone. This is what h2_beam_shutdown() on the input waits for.
+ *
+ * With the input handled, we can tear down that beam and care
+ * about the output beam. The stream might still have buffered some
+ * buckets read from the output, so we need to get rid of those. That
+ * is done by h2_stream_cleanup().
+ *
+ * Now it is save to destroy the task (if it exists and is finished).
+ *
+ * FIXME: we currently destroy the stream, even if the task is still
+ * ongoing. This is not ok, since task->request is coming from stream
+ * memory. We should either copy it on task creation or wait with the
+ * stream destruction until the task is done.
+ */
h2_ihash_remove(m->streams, stream->id);
if (stream->input) {
- apr_status_t status;
- status = h2_beam_shutdown(stream->input, APR_NONBLOCK_READ);
- if (status == APR_EAGAIN) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_stream(%ld-%d): wait on input shutdown",
- m->id, stream->id);
- status = h2_beam_shutdown(stream->input, APR_BLOCK_READ);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
- "h2_stream(%ld-%d): input shutdown returned",
- m->id, stream->id);
- }
+ m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input);
}
+ h2_stream_cleanup(stream);
- task = h2_ihash_get(m->tasks, stream->id);
if (task) {
/* Remove task from ready set, we will never submit it */
h2_ihash_remove(m->ready_tasks, stream->id);
+ task->input.beam = NULL;
- if (task->worker_done) {
- /* already finished or not even started yet */
- h2_iq_remove(m->q, task->stream_id);
- task_destroy(m, task, 0);
- }
- else {
+ if (!task->worker_done) {
/* task still running, cleanup once it is done */
- task->orphaned = 1;
- task->input.beam = NULL;
if (rst_error) {
h2_task_rst(task, rst_error);
}
+ /* FIXME: this should work, but does not
+ h2_ihash_add(m->shold, stream);
+ return;*/
+ }
+ else {
+ /* already finished */
+ h2_iq_remove(m->q, task->stream_id);
+ task_destroy(m, task, 0);
}
}
+ h2_stream_destroy(stream);
}
static int stream_done_iter(void *ctx, void *val)
{
- h2_stream *stream = val;
stream_done((h2_mplx*)ctx, val, 0);
- h2_stream_destroy(stream);
return 0;
}
{
h2_mplx *m = ctx;
h2_task *task = val;
+ h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
if (task->request) {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
"->03198: h2_stream(%s): %s %s %s -> %s %d"
task->request->authority, task->request->path,
task->response? "http" : (task->rst_error? "reset" : "?"),
task->response? task->response->http_status : task->rst_error,
- task->orphaned, task->worker_started,
+ (stream? 0 : 1), task->worker_started,
task->worker_done);
}
else if (task) {
apr_thread_cond_broadcast(m->task_thawed);
}
}
+ AP_DEBUG_ASSERT(h2_ihash_empty(m->tasks));
+ AP_DEBUG_ASSERT(h2_ihash_empty(m->shold));
+ purge_streams(m);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
"h2_mplx(%ld): release_join (%d tasks left) -> destroy",
}
}
-apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error)
+apr_status_t h2_mplx_stream_done(h2_mplx *m, h2_stream *stream)
{
apr_status_t status = APR_SUCCESS;
int acquired;
- /* This maybe called from inside callbacks that already hold the lock.
- * E.g. when we are streaming out DATA and the EOF triggers the stream
- * release.
- */
AP_DEBUG_ASSERT(m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_stream *stream = h2_ihash_get(m->streams, stream_id);
- if (stream) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%ld-%d): marking stream as done.",
- m->id, stream_id);
- stream_done(m, stream, rst_error);
- }
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%ld-%d): marking stream as done.",
+ m->id, stream->id);
+ stream_done(m, stream, stream->rst_error);
leave_mutex(m, acquired);
}
return status;
static int update_window(void *ctx, void *val)
{
- h2_mplx *m = ctx;
- input_consumed_signal(m, val);
+ input_consumed_signal(ctx, val);
return 1;
}
return APR_ECONNABORTED;
}
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_ihash_iter(m->tasks, update_window, m);
+ h2_ihash_iter(m->streams, update_window, m);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_session(%ld): windows updated", m->id);
return 0;
}
-h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams)
+h2_stream *h2_mplx_next_submit(h2_mplx *m)
{
apr_status_t status;
h2_stream *stream = NULL;
h2_task *task = ctx.task;
h2_ihash_remove(m->ready_tasks, task->stream_id);
- stream = h2_ihash_get(streams, task->stream_id);
+ stream = h2_ihash_get(m->streams, task->stream_id);
if (stream && task) {
task->submitted = 1;
if (task->rst_error) {
"h2_mplx(%s): stream for response closed, "
"resetting io to close request processing",
task->id);
- task->orphaned = 1;
h2_task_rst(task, H2_ERR_STREAM_CLOSED);
if (!task->worker_started || task->worker_done) {
task_destroy(m, task, 1);
}
else {
/* hang around until the h2_task is done, but
- * shutdown input/output and send out any events asap. */
+ * shutdown output */
h2_task_shutdown(task, 0);
- input_consumed_signal(m, task);
}
}
}
{
apr_status_t status = APR_SUCCESS;
h2_task *task = h2_ihash_get(m->tasks, stream_id);
+ h2_stream *stream = h2_ihash_get(m->streams, stream_id);
- if (!task || task->orphaned) {
+ if (!task || !stream) {
return APR_ECONNABORTED;
}
static apr_status_t out_close(h2_mplx *m, h2_task *task)
{
apr_status_t status = APR_SUCCESS;
+ h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
- if (!task || task->orphaned) {
+ if (!task || !stream) {
return APR_ECONNABORTED;
}
static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
{
- if (task) {
- if (task->frozen) {
- /* this task was handed over to an engine for processing
- * and the original worker has finished. That means the
- * engine may start processing now. */
- h2_task_thaw(task);
- /* we do not want the task to block on writing response
- * bodies into the mplx. */
- /* FIXME: this implementation is incomplete. */
- h2_task_set_io_blocking(task, 0);
- apr_thread_cond_broadcast(m->task_thawed);
- return;
- }
- else {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): task(%s) done", m->id, task->id);
- out_close(m, task);
-
- if (ngn) {
- apr_off_t bytes = 0;
- if (task->output.beam) {
- h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
- bytes += h2_beam_get_buffered(task->output.beam);
- }
- if (bytes > 0) {
- /* we need to report consumed and current buffered output
- * to the engine. The request will be streamed out or cancelled,
- * no more data is coming from it and the engine should update
- * its calculations before we destroy this information. */
- h2_req_engine_out_consumed(ngn, task->c, bytes);
- }
+ if (task->frozen) {
+ /* this task was handed over to an engine for processing
+ * and the original worker has finished. That means the
+ * engine may start processing now. */
+ h2_task_thaw(task);
+ /* we do not want the task to block on writing response
+ * bodies into the mplx. */
+ /* FIXME: this implementation is incomplete. */
+ h2_task_set_io_blocking(task, 0);
+ apr_thread_cond_broadcast(m->task_thawed);
+ return;
+ }
+ else {
+ h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): task(%s) done", m->id, task->id);
+ out_close(m, task);
+
+ if (ngn) {
+ apr_off_t bytes = 0;
+ if (task->output.beam) {
+ h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
+ bytes += h2_beam_get_buffered(task->output.beam);
}
-
- if (task->engine) {
- if (!h2_req_engine_is_shutdown(task->engine)) {
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
- "h2_mplx(%ld): task(%s) has not-shutdown "
- "engine(%s)", m->id, task->id,
- h2_req_engine_get_id(task->engine));
- }
- h2_ngn_shed_done_ngn(m->ngn_shed, task->engine);
+ if (bytes > 0) {
+ /* we need to report consumed and current buffered output
+ * to the engine. The request will be streamed out or cancelled,
+ * no more data is coming from it and the engine should update
+ * its calculations before we destroy this information. */
+ h2_req_engine_out_consumed(ngn, task->c, bytes);
}
-
- if (!m->aborted && !task->orphaned && m->redo_tasks
- && h2_ihash_get(m->redo_tasks, task->stream_id)) {
- /* reset and schedule again */
- h2_task_redo(task);
- h2_ihash_remove(m->redo_tasks, task->stream_id);
- h2_iq_add(m->q, task->stream_id, NULL, NULL);
- return;
+ }
+
+ if (task->engine) {
+ if (!h2_req_engine_is_shutdown(task->engine)) {
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+ "h2_mplx(%ld): task(%s) has not-shutdown "
+ "engine(%s)", m->id, task->id,
+ h2_req_engine_get_id(task->engine));
}
-
- task->worker_done = 1;
- task->done_at = apr_time_now();
- if (task->output.beam) {
- h2_beam_on_consumed(task->output.beam, NULL, NULL);
- h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL);
+ h2_ngn_shed_done_ngn(m->ngn_shed, task->engine);
+ }
+
+ if (!m->aborted && stream && m->redo_tasks
+ && h2_ihash_get(m->redo_tasks, task->stream_id)) {
+ /* reset and schedule again */
+ h2_task_redo(task);
+ h2_ihash_remove(m->redo_tasks, task->stream_id);
+ h2_iq_add(m->q, task->stream_id, NULL, NULL);
+ return;
+ }
+
+ task->worker_done = 1;
+ task->done_at = apr_time_now();
+ if (task->output.beam) {
+ h2_beam_on_consumed(task->output.beam, NULL, NULL);
+ h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL);
+ }
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%s): request done, %f ms"
+ " elapsed", task->id,
+ (task->done_at - task->started_at) / 1000.0);
+ if (task->started_at > m->last_idle_block) {
+ /* this task finished without causing an 'idle block', e.g.
+ * a block by flow control.
+ */
+ if (task->done_at- m->last_limit_change >= m->limit_change_interval
+ && m->workers_limit < m->workers_max) {
+ /* Well behaving stream, allow it more workers */
+ m->workers_limit = H2MIN(m->workers_limit * 2,
+ m->workers_max);
+ m->last_limit_change = task->done_at;
+ m->need_registration = 1;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): increase worker limit to %d",
+ m->id, m->workers_limit);
}
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%s): request done, %f ms"
- " elapsed", task->id,
- (task->done_at - task->started_at) / 1000.0);
- if (task->started_at > m->last_idle_block) {
- /* this task finished without causing an 'idle block', e.g.
- * a block by flow control.
- */
- if (task->done_at- m->last_limit_change >= m->limit_change_interval
- && m->workers_limit < m->workers_max) {
- /* Well behaving stream, allow it more workers */
- m->workers_limit = H2MIN(m->workers_limit * 2,
- m->workers_max);
- m->last_limit_change = task->done_at;
- m->need_registration = 1;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): increase worker limit to %d",
- m->id, m->workers_limit);
- }
+ }
+
+ if (stream) {
+ /* hang around until the stream deregisters */
+ }
+ else {
+ stream = h2_ihash_get(m->shold, task->stream_id);
+ task_destroy(m, task, 0);
+ if (stream) {
+ stream->response = NULL; /* ref from task memory */
+ /* We cannot destroy the stream here since this is
+ * called from a worker thread and freeing memory pools
+ * is only safe in the only thread using it (and its
+ * parent pool / allocator) */
+ h2_ihash_remove(m->shold, stream->id);
+ h2_ihash_add(m->spurge, stream);
}
- if (task->orphaned) {
- task_destroy(m, task, 0);
- if (m->join_wait) {
- apr_thread_cond_signal(m->join_wait);
- }
- }
- else {
- /* hang around until the stream deregisters */
+ if (m->join_wait) {
+ apr_thread_cond_signal(m->join_wait);
}
}
}
task->r = r;
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- if (task->orphaned) {
- status = APR_ECONNABORTED;
+ h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
+
+ if (stream) {
+ status = h2_ngn_shed_push_task(m->ngn_shed, ngn_type, task, einit);
}
else {
- status = h2_ngn_shed_push_task(m->ngn_shed, ngn_type, task, einit);
+ status = APR_ECONNABORTED;
}
leave_mutex(m, acquired);
}
h2_stream * stream;
apr_pool_t *stream_pool;
- if (session->spare) {
- stream_pool = session->spare;
- session->spare = NULL;
- }
- else {
- apr_pool_create(&stream_pool, session->pool);
- apr_pool_tag(stream_pool, "h2_stream");
- }
+ apr_pool_create(&stream_pool, session->pool);
+ apr_pool_tag(stream_pool, "h2_stream");
stream = h2_stream_open(stream_id, stream_pool, session,
initiated_on, req);
-
+ ++session->open_streams;
+ ++session->unanswered_streams;
+ nghttp2_session_set_stream_user_data(session->ngh2, stream_id, stream);
h2_ihash_add(session->streams, stream);
+
if (H2_STREAM_CLIENT_INITIATED(stream_id)) {
if (stream_id > session->remote.emitted_max) {
++session->remote.emitted_count;
return 0;
}
+static h2_stream *get_stream(h2_session *session, int stream_id)
+{
+ return nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
+}
+
static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags,
int32_t stream_id,
const uint8_t *data, size_t len, void *userp)
return 0;
}
- stream = h2_session_get_stream(session, stream_id);
+ stream = get_stream(session, stream_id);
if (!stream) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03064)
"h2_stream(%ld-%d): on_data_chunk for unknown stream",
h2_stream *stream;
(void)ngh2;
- stream = h2_session_get_stream(session, stream_id);
+ stream = get_stream(session, stream_id);
if (stream) {
stream_release(session, stream, error_code);
}
/* We may see HEADERs at the start of a stream or after all DATA
* streams to carry trailers. */
(void)ngh2;
- s = h2_session_get_stream(session, frame->hd.stream_id);
+ s = get_stream(session, frame->hd.stream_id);
if (s) {
/* nop */
}
return 0;
}
- stream = h2_session_get_stream(session, frame->hd.stream_id);
+ stream = get_stream(session, frame->hd.stream_id);
if (!stream) {
ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c,
APLOGNO(02920)
/* This can be HEADERS for a new stream, defining the request,
* or HEADER may come after DATA at the end of a stream as in
* trailers */
- stream = h2_session_get_stream(session, frame->hd.stream_id);
+ stream = get_stream(session, frame->hd.stream_id);
if (stream) {
int eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM);
}
break;
case NGHTTP2_DATA:
- stream = h2_session_get_stream(session, frame->hd.stream_id);
+ stream = get_stream(session, frame->hd.stream_id);
if (stream) {
int eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
"h2_session(%ld-%d): RST_STREAM by client, errror=%d",
session->id, (int)frame->hd.stream_id,
(int)frame->rst_stream.error_code);
- stream = h2_session_get_stream(session, frame->hd.stream_id);
+ stream = get_stream(session, frame->hd.stream_id);
if (stream && stream->request && stream->request->initiated_on) {
++session->pushes_reset;
}
}
padlen = (unsigned char)frame->data.padlen;
- stream = h2_session_get_stream(session, stream_id);
+ stream = get_stream(session, stream_id);
if (!stream) {
ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_NOTFOUND, session->c,
APLOGNO(02924)
nghttp2_session_del(session->ngh2);
session->ngh2 = NULL;
}
- if (session->spare) {
- apr_pool_destroy(session->spare);
- session->spare = NULL;
- }
}
static void h2_session_destroy(h2_session *session)
AP_DEBUG_ASSERT(session);
h2_session_cleanup(session);
+ AP_DEBUG_ASSERT(session->open_streams == h2_ihash_count(session->streams));
h2_ihash_clear(session->streams);
+ session->open_streams = 0;
+ ap_remove_input_filter_byhandle((session->r? session->r->input_filters :
+ session->c->input_filters), "H2_IN");
if (APLOGctrace1(session->c)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
"h2_session(%ld): destroy", session->id);
static int h2_session_resume_streams_with_data(h2_session *session)
{
AP_DEBUG_ASSERT(session);
- if (!h2_ihash_empty(session->streams)
- && session->mplx && !session->mplx->aborted) {
+ if (session->open_streams && !session->mplx->aborted) {
resume_ctx ctx;
-
ctx.session = session;
ctx.resume_count = 0;
return 0;
}
-h2_stream *h2_session_get_stream(h2_session *session, int stream_id)
-{
- return h2_ihash_get(session->streams, stream_id);
-}
-
static ssize_t stream_data_cb(nghttp2_session *ng2s,
int32_t stream_id,
uint8_t *buf,
(void)ng2s;
(void)buf;
(void)source;
- stream = h2_session_get_stream(session, stream_id);
+ stream = get_stream(session, stream_id);
if (!stream) {
ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c,
APLOGNO(02937)
stream->id, err);
}
- stream->submitted = 1;
+ --session->unanswered_streams;
if (stream->request && stream->request->initiated_on) {
++session->pushes_submitted;
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
"h2_stream(%ld-%d): scheduling push stream",
session->id, stream->id);
- h2_stream_cleanup(stream);
stream = NULL;
}
++session->unsent_promises;
apr_status_t h2_session_stream_done(h2_session *session, h2_stream *stream)
{
- apr_pool_t *pool = h2_stream_detach_pool(stream);
- int stream_id = stream->id;
- int rst_error = stream->rst_error;
-
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
"h2_stream(%ld-%d): cleanup by EOS bucket destroy",
- session->id, stream_id);
- if (session->streams) {
- h2_ihash_remove(session->streams, stream_id);
- }
+ session->id, stream->id);
+ h2_ihash_remove(session->streams, stream->id);
+ --session->open_streams;
+ --session->unanswered_streams;
+ h2_mplx_stream_done(session->mplx, stream);
- h2_stream_cleanup(stream);
- h2_mplx_stream_done(session->mplx, stream_id, rst_error);
- h2_stream_destroy(stream);
-
- if (pool) {
- apr_pool_clear(pool);
- if (session->spare) {
- apr_pool_destroy(session->spare);
- }
- session->spare = pool;
- }
-
return APR_SUCCESS;
}
if (has_unsubmitted_streams(session)) {
/* If we have responses ready, submit them now. */
- while ((stream = h2_mplx_next_submit(session->mplx, session->streams))) {
+ while ((stream = h2_mplx_next_submit(session->mplx))) {
status = submit_response(session, stream);
++session->unsent_submits;
apr_snprintf(session->status, sizeof(session->status),
"%s, streams: %d/%d/%d/%d/%d (open/recv/resp/push/rst)",
msg? msg : "-",
- (int)h2_ihash_count(session->streams),
+ (int)session->open_streams,
(int)session->remote.emitted_count,
(int)session->responses_submitted,
(int)session->pushes_submitted,
session->state = nstate;
switch (session->state) {
case H2_SESSION_ST_IDLE:
- update_child_status(session, (h2_ihash_empty(session->streams)?
+ update_child_status(session, (session->open_streams == 0?
SERVER_BUSY_KEEPALIVE
: SERVER_BUSY_READ), "idle");
break;
if (h2_conn_io_flush(&session->io) != APR_SUCCESS) {
dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
}
- if (h2_ihash_empty(session->streams)) {
+ if (!session->open_streams) {
if (!is_accepting_streams(session)) {
/* We are no longer accepting new streams and have
* finished processing existing ones. Time to leave. */
break;
case H2_SESSION_ST_IDLE:
- /* make certain, the client receives everything before we idle */
- if (!session->keep_sync_until
- && async && h2_ihash_empty(session->streams)
+ /* make certain, we send everything before we idle */
+ if (!session->keep_sync_until && async && !session->open_streams
&& !session->r && session->remote.emitted_count) {
ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
"h2_session(%ld): async idle, nonblock read", session->id);
}
}
- if (!h2_ihash_empty(session->streams)) {
- /* resume any streams for which data is available again */
+ if (session->open_streams) {
+ /* resume any streams with output data */
h2_session_resume_streams_with_data(session);
/* Submit any responses/push_promises that are ready */
status = h2_session_submit(session);