return status;
}
-conn_rec *h2_slave_create(conn_rec *master, int slave_id,
- apr_pool_t *parent, apr_allocator_t *allocator)
+conn_rec *h2_slave_create(conn_rec *master, int slave_id, apr_pool_t *parent)
{
+ apr_allocator_t *allocator;
apr_pool_t *pool;
conn_rec *c;
void *cfg;
* independant of its parent pool in the sense that it can work in
* another thread.
*/
- if (!allocator) {
- apr_allocator_create(&allocator);
- }
+ apr_allocator_create(&allocator);
apr_pool_create_ex(&pool, parent, NULL, allocator);
apr_pool_tag(pool, "h2_slave_conn");
apr_allocator_owner_set(allocator, pool);
return c;
}
-void h2_slave_destroy(conn_rec *slave, apr_allocator_t **pallocator)
+void h2_slave_destroy(conn_rec *slave)
{
- apr_pool_t *parent;
- apr_allocator_t *allocator = apr_pool_allocator_get(slave->pool);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, slave,
"h2_slave_conn(%ld): destroy (task=%s)", slave->id,
apr_table_get(slave->notes, H2_TASK_ID_NOTE));
- /* Attache the allocator to the parent pool and return it for
- * reuse, otherwise the own is still the slave pool and it will
- * get destroyed with it. */
- parent = apr_pool_parent_get(slave->pool);
- if (pallocator && parent) {
- apr_allocator_owner_set(allocator, parent);
- *pallocator = allocator;
- }
apr_pool_destroy(slave->pool);
}
static void h2_mplx_destroy(h2_mplx *m)
{
+ conn_rec **pslave;
ap_assert(m);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): destroy, tasks=%d",
m->id, (int)h2_ihash_count(m->tasks));
check_tx_free(m);
+
+ while (m->spare_slaves->nelts > 0) {
+ pslave = (conn_rec **)apr_array_pop(m->spare_slaves);
+ h2_slave_destroy(*pslave);
+ }
if (m->pool) {
apr_pool_destroy(m->pool);
}
m->q = h2_iq_create(m->pool, m->max_streams);
m->sready = h2_ihash_create(m->pool, offsetof(h2_stream,id));
m->tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id));
+ m->redo_tasks = h2_ihash_create(m->pool, offsetof(h2_task, stream_id));
m->stream_timeout = stream_timeout;
m->workers = workers;
}
}
- if (task->output.beam) {
- h2_beam_on_produced(task->output.beam, NULL, NULL);
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
- APLOGNO(03385) "h2_task(%s): destroy "
- "output beam empty=%d, holds proxies=%d",
- task->id,
- h2_beam_empty(task->output.beam),
- h2_beam_holds_proxies(task->output.beam));
- }
+ h2_beam_on_produced(task->output.beam, NULL, NULL);
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
+ APLOGNO(03385) "h2_task(%s): destroy "
+ "output beam empty=%d, holds proxies=%d",
+ task->id,
+ h2_beam_empty(task->output.beam),
+ h2_beam_holds_proxies(task->output.beam));
slave = task->c;
reuse_slave = ((m->spare_slaves->nelts < m->spare_slaves->nalloc)
&& !task->rst_error);
h2_ihash_remove(m->tasks, task->stream_id);
- if (m->redo_tasks) {
- h2_ihash_remove(m->redo_tasks, task->stream_id);
- }
+ h2_ihash_remove(m->redo_tasks, task->stream_id);
h2_task_destroy(task);
if (slave) {
}
else {
slave->sbh = NULL;
- h2_slave_destroy(slave, NULL);
+ h2_slave_destroy(slave);
}
}
h2_iq_remove(m->q, stream->id);
h2_ihash_remove(m->sready, stream->id);
h2_ihash_remove(m->streams, stream->id);
- if (stream->input) {
- m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input);
- h2_beam_on_consumed(stream->input, NULL, NULL);
- /* Let anyone blocked reading know that there is no more to come */
- h2_beam_abort(stream->input);
- /* Remove mutex after, so that abort still finds cond to signal */
- h2_beam_mutex_set(stream->input, NULL, NULL, NULL);
- }
- if (stream->output) {
- m->tx_handles_reserved += h2_beam_get_files_beamed(stream->output);
- }
+
h2_stream_cleanup(stream);
+ m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input);
+ h2_beam_on_consumed(stream->input, NULL, NULL);
+ /* Let anyone blocked reading know that there is no more to come */
+ h2_beam_abort(stream->input);
+ /* Remove mutex after, so that abort still finds cond to signal */
+ h2_beam_mutex_set(stream->input, NULL, NULL, NULL);
+ m->tx_handles_reserved += h2_beam_get_files_beamed(stream->output);
task = h2_ihash_get(m->tasks, stream->id);
if (task) {
}
else {
/* already finished */
- task_destroy(m, task, 0);
+ task_destroy(m, task, 1);
}
}
h2_stream_destroy(stream);
if (task->c) {
task->c->aborted = 1;
}
- if (task->input.beam) {
- h2_beam_abort(task->input.beam);
- }
- if (task->output.beam) {
- h2_beam_abort(task->output.beam);
- }
+ h2_beam_abort(task->input.beam);
+ h2_beam_abort(task->output.beam);
}
return 1;
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
"h2_mplx(%s): close", task->id);
- if (task->output.beam) {
- status = h2_beam_close(task->output.beam);
- h2_beam_log(task->output.beam, task->stream_id, "out_close", m->c,
- APLOG_TRACE2);
- }
+ status = h2_beam_close(task->output.beam);
+ h2_beam_log(task->output.beam, task->stream_id, "out_close", m->c,
+ APLOG_TRACE2);
output_consumed_signal(m, task);
have_out_data_for(m, stream, 0);
return status;
slave = *pslave;
}
else {
- slave = h2_slave_create(m->c, stream->id, m->pool, NULL);
+ slave = h2_slave_create(m->c, stream->id, m->pool);
new_conn = 1;
}
m->max_stream_started = sid;
}
- if (stream->input) {
- h2_beam_timeout_set(stream->input, m->stream_timeout);
- h2_beam_on_consumed(stream->input, stream_input_consumed, m);
- h2_beam_on_file_beam(stream->input, can_beam_file, m);
- h2_beam_mutex_set(stream->input, beam_enter, task->cond, m);
- }
- if (stream->output) {
- h2_beam_buffer_size_set(stream->output, m->stream_max_mem);
- h2_beam_timeout_set(stream->output, m->stream_timeout);
- }
+ h2_beam_timeout_set(stream->input, m->stream_timeout);
+ h2_beam_on_consumed(stream->input, stream_input_consumed, m);
+ h2_beam_on_file_beam(stream->input, can_beam_file, m);
+ h2_beam_mutex_set(stream->input, beam_enter, task->cond, m);
+
+ h2_beam_buffer_size_set(stream->output, m->stream_max_mem);
+ h2_beam_timeout_set(stream->output, m->stream_timeout);
++m->workers_busy;
}
}
if (ngn) {
apr_off_t bytes = 0;
- if (task->output.beam) {
- h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
- bytes += h2_beam_get_buffered(task->output.beam);
- }
+ h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
+ bytes += h2_beam_get_buffered(task->output.beam);
if (bytes > 0) {
/* we need to report consumed and current buffered output
* to the engine. The request will be streamed out or cancelled,
}
stream = h2_ihash_get(m->streams, task->stream_id);
- if (!m->aborted && stream && m->redo_tasks
+ if (!m->aborted && stream
&& h2_ihash_get(m->redo_tasks, task->stream_id)) {
/* reset and schedule again */
h2_task_redo(task);
h2_task *task;
int n;
- if (!m->redo_tasks) {
- m->redo_tasks = h2_ihash_create(m->pool, offsetof(h2_task, stream_id));
- }
/* Try to get rid of streams that occupy workers. Look for safe requests
* that are repeatable. If none found, fail the connection.
*/
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
"h2_stream(%ld-%d): EOS bucket cleanup -> done",
session->id, stream->id);
- h2_ihash_remove(session->streams, stream->id);
h2_mplx_stream_done(session->mplx, stream);
dispatch_event(session, H2_SESSION_EV_STREAM_DONE, 0, NULL);
h2_stream *candidate;
} stream_sel_ctx;
-static int find_cleanup_stream(void *udata, void *sdata)
+static int find_cleanup_stream(h2_stream *stream, void *ictx)
{
- stream_sel_ctx *ctx = udata;
- h2_stream *stream = sdata;
+ stream_sel_ctx *ctx = ictx;
if (H2_STREAM_CLIENT_INITIATED(stream->id)) {
if (!ctx->session->local.accepting
&& stream->id > ctx->session->local.accepted_max) {
ctx.session = session;
ctx.candidate = NULL;
while (1) {
- h2_ihash_iter(session->streams, find_cleanup_stream, &ctx);
+ h2_mplx_stream_do(session->mplx, find_cleanup_stream, &ctx);
if (ctx.candidate) {
h2_session_stream_done(session, ctx.candidate);
ctx.candidate = NULL;
stream = h2_stream_open(stream_id, stream_pool, session,
initiated_on);
nghttp2_session_set_stream_user_data(session->ngh2, stream_id, stream);
- h2_ihash_add(session->streams, stream);
if (req) {
h2_stream_set_request(stream, req);
{
ap_assert(session);
- h2_ihash_clear(session->streams);
if (session->mplx) {
h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
h2_mplx_release_and_join(session->mplx, session->iowait);
return NULL;
}
- session->streams = h2_ihash_create(session->pool,
- offsetof(h2_stream, id));
session->mplx = h2_mplx_create(c, session->pool, session->config,
session->s->timeout, workers);