#include "h2_conn.h"
#include "h2_ctx.h"
#include "h2_h2.h"
-#include "h2_io.h"
#include "h2_response.h"
#include "h2_mplx.h"
#include "h2_ngn_shed.h"
}
}
+/* utility for iterating over ihash task sets */
+typedef struct {
+ h2_mplx *m;
+ h2_task *task;
+ apr_time_t now;
+} task_iter_ctx;
+
/* NULL or the mutex hold by this thread, used for recursive calls
*/
static apr_threadkey_t *thread_lock;
static void stream_output_consumed(void *ctx,
h2_bucket_beam *beam, apr_off_t length)
{
- h2_io *io = ctx;
- if (length > 0 && io->task && io->task->assigned) {
- h2_req_engine_out_consumed(io->task->assigned, io->task->c, length);
+ h2_task *task = ctx;
+ if (length > 0 && task && task->assigned) {
+ h2_req_engine_out_consumed(task->assigned, task->c, length);
}
}
{
if (m->tx_handles_reserved <= 0) {
m->tx_handles_reserved += h2_workers_tx_reserve(m->workers,
- H2MIN(m->tx_chunk_size, h2_ilist_count(m->stream_ios)));
+ H2MIN(m->tx_chunk_size, h2_ihash_count(m->tasks)));
}
}
m->tx_handles_reserved = m->tx_chunk_size;
h2_workers_tx_free(m->workers, count);
}
- else if (m->tx_handles_reserved
- && (!m->stream_ios || h2_ilist_empty(m->stream_ios))) {
+ else if (m->tx_handles_reserved && h2_ihash_empty(m->tasks)) {
h2_workers_tx_free(m->workers, m->tx_handles_reserved);
m->tx_handles_reserved = 0;
}
{
AP_DEBUG_ASSERT(m);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): destroy, ios=%d",
- m->id, (int)h2_ilist_count(m->stream_ios));
+ "h2_mplx(%ld): destroy, tasks=%d",
+ m->id, (int)h2_ihash_count(m->tasks));
check_tx_free(m);
if (m->pool) {
apr_pool_destroy(m->pool);
m->bucket_alloc = apr_bucket_alloc_create(m->pool);
m->max_streams = h2_config_geti(conf, H2_CONF_MAX_STREAMS);
m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
+
+ m->streams = h2_ihash_create(m->pool, offsetof(h2_stream,id));
m->q = h2_iq_create(m->pool, m->max_streams);
- m->stream_ios = h2_ilist_create(m->pool);
- m->ready_ios = h2_ilist_create(m->pool);
+ m->tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id));
+ m->ready_tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id));
+
m->stream_timeout = stream_timeout;
m->workers = workers;
m->workers_max = workers->max_workers;
return max_stream_started;
}
-static void io_in_consumed_signal(h2_mplx *m, h2_io *io)
+static void input_consumed_signal(h2_mplx *m, h2_task *task)
{
- if (io->beam_in && io->worker_started) {
- h2_beam_send(io->beam_in, NULL, 0); /* trigger updates */
+ if (task->input.beam && task->worker_started) {
+ h2_beam_send(task->input.beam, NULL, 0); /* trigger updates */
}
}
-static int io_out_consumed_signal(h2_mplx *m, h2_io *io)
+static int output_consumed_signal(h2_mplx *m, h2_task *task)
{
- if (io->beam_out && io->worker_started && io->task && io->task->assigned) {
- h2_beam_send(io->beam_out, NULL, 0); /* trigger updates */
+ if (task->output.beam && task->worker_started && task->assigned) {
+ h2_beam_send(task->output.beam, NULL, 0); /* trigger updates */
}
return 0;
}
-static void io_destroy(h2_mplx *m, h2_io *io, int events)
+static void task_destroy(h2_mplx *m, h2_task *task, int events)
{
conn_rec *slave = NULL;
- int reuse_slave;
+ int reuse_slave = 0;
/* cleanup any buffered input */
- h2_io_shutdown(io);
+ h2_task_shutdown(task);
if (events) {
/* Process outstanding events before destruction */
- io_in_consumed_signal(m, io);
+ input_consumed_signal(m, task);
}
/* The pool is cleared/destroyed which also closes all
* allocated file handles. Give this count back to our
* file handle pool. */
- if (io->beam_in) {
- m->tx_handles_reserved += h2_beam_get_files_beamed(io->beam_in);
+ if (task->input.beam) {
+ m->tx_handles_reserved +=
+ h2_beam_get_files_beamed(task->input.beam);
}
- if (io->beam_out) {
- m->tx_handles_reserved += h2_beam_get_files_beamed(io->beam_out);
+ if (task->output.beam) {
+ m->tx_handles_reserved +=
+ h2_beam_get_files_beamed(task->output.beam);
}
-
- h2_ilist_remove(m->stream_ios, io->id);
- h2_ilist_remove(m->ready_ios, io->id);
- if (m->redo_ios) {
- h2_ilist_remove(m->redo_ios, io->id);
- }
-
+
+ slave = task->c;
reuse_slave = ((m->spare_slaves->nelts < m->spare_slaves->nalloc)
- && !io->rst_error);
- if (io->task) {
- slave = io->task->c;
- h2_task_destroy(io->task);
- io->task = NULL;
- }
-
- if (io->pool) {
- if (m->spare_io_pool) {
- apr_pool_destroy(m->spare_io_pool);
- }
- apr_pool_clear(io->pool);
- m->spare_io_pool = io->pool;
+ && !task->rst_error);
+
+ h2_ihash_remove(m->tasks, task->stream_id);
+ h2_ihash_remove(m->ready_tasks, task->stream_id);
+ if (m->redo_tasks) {
+ h2_ihash_remove(m->redo_tasks, task->stream_id);
}
+ h2_task_destroy(task);
if (slave) {
if (reuse_slave && slave->keepalive == AP_CONN_KEEPALIVE) {
check_tx_free(m);
}
-static int io_stream_done(h2_mplx *m, h2_io *io, int rst_error)
+static int task_stream_done(h2_mplx *m, h2_task *task, int rst_error)
{
/* Remove io from ready set, we will never submit it */
- h2_ilist_remove(m->ready_ios, io->id);
- if (!io->worker_started || io->worker_done) {
+ h2_ihash_remove(m->ready_tasks, task->stream_id);
+ if (task->worker_done) {
/* already finished or not even started yet */
- h2_iq_remove(m->q, io->id);
- io_destroy(m, io, 1);
+ h2_iq_remove(m->q, task->stream_id);
+ task_destroy(m, task, 1);
return 0;
}
else {
/* cleanup once task is done */
- io->orphaned = 1;
+ task->orphaned = 1;
+ if (task->input.beam) {
+ /* TODO: this is currently allocated by the stream and will disappear */
+ h2_beam_shutdown(task->input.beam);
+ task->input.beam = NULL;
+ }
if (rst_error) {
- h2_io_rst(io, rst_error);
+ h2_task_rst(task, rst_error);
}
return 1;
}
static int stream_done_iter(void *ctx, void *val)
{
- return io_stream_done((h2_mplx*)ctx, val, 0);
+ return task_stream_done((h2_mplx*)ctx, val, 0);
}
-static int stream_print(void *ctx, void *val)
+static int task_print(void *ctx, void *val)
{
h2_mplx *m = ctx;
- h2_io *io = val;
- if (io && io->request) {
+ h2_task *task = val;
+ if (task->request) {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
- "->03198: h2_stream(%ld-%d): %s %s %s -> %s %d"
+ "->03198: h2_stream(%s): %s %s %s -> %s %d"
"[orph=%d/started=%d/done=%d]",
- m->id, io->id,
- io->request->method, io->request->authority, io->request->path,
- io->response? "http" : (io->rst_error? "reset" : "?"),
- io->response? io->response->http_status : io->rst_error,
- io->orphaned, io->worker_started, io->worker_done);
+ task->id, task->request->method,
+ task->request->authority, task->request->path,
+ task->response? "http" : (task->rst_error? "reset" : "?"),
+ task->response? task->response->http_status : task->rst_error,
+ task->orphaned, task->worker_started,
+ task->worker_done);
}
- else if (io) {
+ else if (task) {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
- "->03198: h2_stream(%ld-%d): NULL -> %s %d"
- "[orph=%d/started=%d/done=%d]",
- m->id, io->id,
- io->response? "http" : (io->rst_error? "reset" : "?"),
- io->response? io->response->http_status : io->rst_error,
- io->orphaned, io->worker_started, io->worker_done);
+ "->03198: h2_stream(%ld-%d): NULL", m->id, task->stream_id);
}
else {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
h2_iq_clear(m->q);
apr_thread_cond_broadcast(m->task_thawed);
- while (!h2_ilist_iter(m->stream_ios, stream_done_iter, m)) {
+ while (!h2_ihash_iter(m->tasks, stream_done_iter, m)) {
/* iterate until all ios have been orphaned or destroyed */
}
for (i = 0; m->workers_busy > 0; ++i) {
m->join_wait = wait;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): release_join, waiting on %d worker to report back",
- m->id, (int)h2_ilist_count(m->stream_ios));
+ "h2_mplx(%ld): release_join, waiting on %d tasks to report back",
+ m->id, (int)h2_ihash_count(m->tasks));
status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs));
- while (!h2_ilist_iter(m->stream_ios, stream_done_iter, m)) {
+ while (!h2_ihash_iter(m->tasks, stream_done_iter, m)) {
/* iterate until all ios have been orphaned or destroyed */
}
if (APR_STATUS_IS_TIMEUP(status)) {
*/
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03198)
"h2_mplx(%ld): release, waiting for %d seconds now for "
- "%d h2_workers to return, have still %d requests outstanding",
+ "%d h2_workers to return, have still %d tasks outstanding",
m->id, i*wait_secs, m->workers_busy,
- (int)h2_ilist_count(m->stream_ios));
+ (int)h2_ihash_count(m->tasks));
if (i == 1) {
- h2_ilist_iter(m->stream_ios, stream_print, m);
+ h2_ihash_iter(m->tasks, task_print, m);
}
}
h2_mplx_abort(m);
}
}
- if (!h2_ilist_empty(m->stream_ios)) {
+ if (!h2_ihash_empty(m->tasks)) {
ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, m->c,
- "h2_mplx(%ld): release_join, %d streams still open",
- m->id, (int)h2_ilist_count(m->stream_ios));
+ "h2_mplx(%ld): release_join, %d tasks still open",
+ m->id, (int)h2_ihash_count(m->tasks));
}
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
"h2_mplx(%ld): release_join -> destroy", m->id);
*/
AP_DEBUG_ASSERT(m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_ilist_get(m->stream_ios, stream_id);
+ h2_task *task = h2_ihash_get(m->tasks, stream_id);
+ h2_ihash_remove(m->streams, stream_id);
/* there should be an h2_io, once the stream has been scheduled
* for processing, e.g. when we received all HEADERs. But when
* a stream is cancelled very early, it will not exist. */
- if (io) {
+ if (task) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%ld-%d): marking stream as done.",
+ "h2_mplx(%ld-%d): marking stream task as done.",
m->id, stream_id);
- io_stream_done(m, io, rst_error);
+ task_stream_done(m, task, rst_error);
}
leave_mutex(m, acquired);
}
static int update_window(void *ctx, void *val)
{
h2_mplx *m = ctx;
- io_in_consumed_signal(m, val);
+ input_consumed_signal(m, val);
return 1;
}
return APR_ECONNABORTED;
}
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_ilist_iter(m->stream_ios, update_window, m);
+ h2_ihash_iter(m->tasks, update_window, m);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_session(%ld): windows updated", m->id);
return status;
}
+static int task_iter_first(void *ctx, void *val)
+{
+ task_iter_ctx *tctx = ctx;
+ h2_task *task = val;
+ tctx->task = task;
+ return 0;
+}
+
h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams)
{
apr_status_t status;
AP_DEBUG_ASSERT(m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_ilist_shift(m->ready_ios);
- if (io && !m->aborted) {
- stream = h2_ihash_get(streams, io->id);
- if (stream) {
- io->submitted = 1;
- if (io->rst_error) {
- h2_stream_rst(stream, io->rst_error);
+ task_iter_ctx ctx;
+ ctx.m = m;
+ ctx.task = NULL;
+ h2_ihash_iter(m->ready_tasks, task_iter_first, &ctx);
+
+ if (ctx.task && !m->aborted) {
+ h2_task *task = ctx.task;
+
+ h2_ihash_remove(m->ready_tasks, task->stream_id);
+ stream = h2_ihash_get(streams, task->stream_id);
+ if (stream && task) {
+ task->submitted = 1;
+ if (task->rst_error) {
+ h2_stream_rst(stream, task->rst_error);
}
else {
- AP_DEBUG_ASSERT(io->response);
- h2_stream_set_response(stream, io->response, io->beam_out);
+ AP_DEBUG_ASSERT(task->response);
+ h2_stream_set_response(stream, task->response,
+ task->output.beam);
}
}
- else {
+ else if (task) {
/* We have the io ready, but the stream has gone away, maybe
* reset by the client. Should no longer happen since such
* streams should clear io's from the ready queue.
*/
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03347)
- "h2_mplx(%ld): stream for response %d closed, "
+ "h2_mplx(%s): stream for response closed, "
"resetting io to close request processing",
- m->id, io->id);
- io->orphaned = 1;
- h2_io_rst(io, H2_ERR_STREAM_CLOSED);
- if (!io->worker_started || io->worker_done) {
- io_destroy(m, io, 1);
+ task->id);
+ task->orphaned = 1;
+ h2_task_rst(task, H2_ERR_STREAM_CLOSED);
+ if (!task->worker_started || task->worker_done) {
+ task_destroy(m, task, 1);
}
else {
/* hang around until the h2_task is done, but
* shutdown input/output and send out any events asap. */
- h2_io_shutdown(io);
- io_in_consumed_signal(m, io);
+ h2_task_shutdown(task);
+ input_consumed_signal(m, task);
}
}
}
return stream;
}
-static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response,
- h2_bucket_beam *output)
+static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response)
{
apr_status_t status = APR_SUCCESS;
+ h2_task *task = h2_ihash_get(m->tasks, stream_id);
- h2_io *io = h2_ilist_get(m->stream_ios, stream_id);
- if (!io || io->orphaned) {
+ if (!task || task->orphaned) {
return APR_ECONNABORTED;
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld-%d): open response: %d, rst=%d",
- m->id, stream_id, response->http_status,
- response->rst_error);
+ "h2_mplx(%s): open response: %d, rst=%d",
+ task->id, response->http_status, response->rst_error);
+
+ h2_task_set_response(task, response);
- if (output) {
- h2_beam_buffer_size_set(output, m->stream_max_mem);
- h2_beam_timeout_set(output, m->stream_timeout);
- h2_beam_on_consumed(output, stream_output_consumed, io);
- m->tx_handles_reserved -= h2_beam_get_files_beamed(output);
- h2_beam_on_file_beam(output, can_beam_file, m);
- h2_beam_mutex_set(output, io_mutex_enter, io_mutex_leave,
- io->task->cond, m);
+ if (task->output.beam) {
+ h2_beam_buffer_size_set(task->output.beam, m->stream_max_mem);
+ h2_beam_timeout_set(task->output.beam, m->stream_timeout);
+ h2_beam_on_consumed(task->output.beam, stream_output_consumed, task);
+ m->tx_handles_reserved -= h2_beam_get_files_beamed(task->output.beam);
+ h2_beam_on_file_beam(task->output.beam, can_beam_file, m);
+ h2_beam_mutex_set(task->output.beam, io_mutex_enter, io_mutex_leave,
+ task->cond, m);
}
- h2_io_set_response(io, response, output);
- h2_ilist_add(m->ready_ios, io);
+ h2_ihash_add(m->ready_tasks, task);
if (response && response->http_status < 300) {
/* we might see some file buckets in the output, see
* if we have enough handles reserved. */
return status;
}
-apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response,
- h2_bucket_beam *output)
+apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response)
{
apr_status_t status;
int acquired;
status = APR_ECONNABORTED;
}
else {
- status = out_open(m, stream_id, response, output);
+ status = out_open(m, stream_id, response);
}
leave_mutex(m, acquired);
}
AP_DEBUG_ASSERT(m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_ilist_get(m->stream_ios, stream_id);
- if (io && !io->orphaned) {
- if (!io->response && !io->rst_error) {
+ h2_task *task = h2_ihash_get(m->tasks, stream_id);
+ if (task && !task->orphaned) {
+ if (!task->response && !task->rst_error) {
/* In case a close comes before a response was created,
* insert an error one so that our streams can properly
* reset.
*/
h2_response *r = h2_response_die(stream_id, APR_EGENERAL,
- io->request, m->pool);
- status = out_open(m, stream_id, r, NULL);
+ task->request, m->pool);
+ status = out_open(m, stream_id, r);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c,
"h2_mplx(%ld-%d): close, no response, no rst",
- m->id, io->id);
+ m->id, stream_id);
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
- "h2_mplx(%ld-%d): close", m->id, io->id);
- if (io->beam_out) {
- status = h2_beam_close(io->beam_out);
- h2_beam_log(io->beam_out, stream_id, "out_close", m->c,
+ "h2_mplx(%ld-%d): close", m->id, stream_id);
+ if (task->output.beam) {
+ status = h2_beam_close(task->output.beam);
+ h2_beam_log(task->output.beam, stream_id, "out_close", m->c,
APLOG_TRACE2);
}
- io_out_consumed_signal(m, io);
+ output_consumed_signal(m, task);
have_out_data_for(m, stream_id);
}
else {
status = APR_ECONNABORTED;
}
else {
- apr_pool_t *io_pool;
- h2_io *io;
+ h2_beam_create(&stream->input, stream->pool, stream->id,
+ "input", 0);
+ h2_ihash_add(m->streams, stream);
if (!m->need_registration) {
m->need_registration = h2_iq_empty(m->q);
if (m->workers_busy < m->workers_max) {
do_registration = m->need_registration;
}
+ h2_iq_add(m->q, stream->id, cmp, ctx);
- io_pool = m->spare_io_pool;
- if (io_pool) {
- m->spare_io_pool = NULL;
- }
- else {
- apr_pool_create(&io_pool, m->pool);
- apr_pool_tag(io_pool, "h2_io");
- }
- io = h2_io_create(stream->id, io_pool, stream->request);
- h2_ilist_add(m->stream_ios, io);
- h2_iq_add(m->q, io->id, cmp, ctx);
-
- stream->input = io->beam_in;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
"h2_mplx(%ld-%d): process, body=%d",
- m->c->id, stream->id, io->request->body);
+ m->c->id, stream->id, stream->request->body);
}
leave_mutex(m, acquired);
}
static h2_task *pop_task(h2_mplx *m)
{
h2_task *task = NULL;
- h2_io *io;
+ h2_stream *stream;
int sid;
while (!m->aborted && !task && (m->workers_busy < m->workers_limit)
&& (sid = h2_iq_shift(m->q)) > 0) {
- io = h2_ilist_get(m->stream_ios, sid);
- if (io) {
+ stream = h2_ihash_get(m->streams, sid);
+ if (stream) {
conn_rec *slave, **pslave;
-
- if (io->orphaned) {
- /* TODO: add to purge list */
- io_destroy(m, io, 0);
- if (m->join_wait) {
- apr_thread_cond_signal(m->join_wait);
- }
- continue;
- }
-
+
pslave = (conn_rec **)apr_array_pop(m->spare_slaves);
if (pslave) {
slave = *pslave;
}
slave->sbh = m->c->sbh;
- io->task = task = h2_task_create(slave, io->request,
- io->beam_in, m);
+ task = h2_task_create(slave, stream->request, stream->input, m);
+ h2_ihash_add(m->tasks, task);
+
m->c->keepalives++;
apr_table_setn(slave->notes, H2_TASK_ID_NOTE, task->id);
- io->worker_started = 1;
- io->started_at = apr_time_now();
+ task->worker_started = 1;
+ task->started_at = apr_time_now();
- if (io->beam_in) {
- h2_beam_timeout_set(io->beam_in, m->stream_timeout);
- h2_beam_on_consumed(io->beam_in, stream_input_consumed, m);
- h2_beam_on_file_beam(io->beam_in, can_beam_file, m);
- h2_beam_mutex_set(io->beam_in, io_mutex_enter,
+ if (task->input.beam) {
+ h2_beam_timeout_set(task->input.beam, m->stream_timeout);
+ h2_beam_on_consumed(task->input.beam, stream_input_consumed, m);
+ h2_beam_on_file_beam(task->input.beam, can_beam_file, m);
+ h2_beam_mutex_set(task->input.beam, io_mutex_enter,
io_mutex_leave, task->cond, m);
}
if (sid > m->max_stream_started) {
static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
{
if (task) {
- h2_io *io = h2_ilist_get(m->stream_ios, task->stream_id);
-
if (task->frozen) {
/* this task was handed over to an engine for processing
* and the original worker has finished. That means the
apr_thread_cond_broadcast(m->task_thawed);
}
else {
+ apr_time_t now = apr_time_now();
+
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): task(%s) done", m->id, task->id);
/* clean our references and report request as done. Signal
* other mplx's. Perhaps leave after n requests? */
h2_mplx_out_close(m, task->stream_id);
- if (ngn && io) {
+ if (ngn) {
apr_off_t bytes = 0;
- if (io->beam_out) {
- h2_beam_send(io->beam_out, NULL, APR_NONBLOCK_READ);
- bytes += h2_beam_get_buffered(io->beam_out);
+ if (task->output.beam) {
+ h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
+ bytes += h2_beam_get_buffered(task->output.beam);
}
if (bytes > 0) {
/* we need to report consumed and current buffered output
h2_ngn_shed_done_ngn(m->ngn_shed, task->engine);
}
- if (io) {
- apr_time_t now = apr_time_now();
- if (!io->orphaned && m->redo_ios
- && h2_ilist_get(m->redo_ios, io->id)) {
- /* reset and schedule again */
- h2_io_redo(io);
- h2_ilist_remove(m->redo_ios, io->id);
- h2_iq_add(m->q, io->id, NULL, NULL);
- }
- else {
- io->worker_done = 1;
- io->done_at = now;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): request(%d) done, %f ms"
- " elapsed", m->id, io->id,
- (io->done_at - io->started_at) / 1000.0);
- if (io->started_at > m->last_idle_block) {
- /* this task finished without causing an 'idle block', e.g.
- * a block by flow control.
- */
- if (now - m->last_limit_change >= m->limit_change_interval
- && m->workers_limit < m->workers_max) {
- /* Well behaving stream, allow it more workers */
- m->workers_limit = H2MIN(m->workers_limit * 2,
- m->workers_max);
- m->last_limit_change = now;
- m->need_registration = 1;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): increase worker limit to %d",
- m->id, m->workers_limit);
- }
- }
- }
-
- if (io->orphaned) {
- /* TODO: add to purge list */
- io_destroy(m, io, 0);
- if (m->join_wait) {
- apr_thread_cond_signal(m->join_wait);
+ if (!task->orphaned && m->redo_tasks
+ && h2_ihash_get(m->redo_tasks, task->stream_id)) {
+ /* reset and schedule again */
+ h2_task_redo(task);
+ h2_ihash_remove(m->redo_tasks, task->stream_id);
+ h2_iq_add(m->q, task->stream_id, NULL, NULL);
+ }
+ else {
+ task->worker_done = 1;
+ task->done_at = now;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%s): request done, %f ms"
+ " elapsed", task->id,
+ (task->done_at - task->started_at) / 1000.0);
+ if (task->started_at > m->last_idle_block) {
+ /* this task finished without causing an 'idle block', e.g.
+ * a block by flow control.
+ */
+ if (now - m->last_limit_change >= m->limit_change_interval
+ && m->workers_limit < m->workers_max) {
+ /* Well behaving stream, allow it more workers */
+ m->workers_limit = H2MIN(m->workers_limit * 2,
+ m->workers_max);
+ m->last_limit_change = now;
+ m->need_registration = 1;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): increase worker limit to %d",
+ m->id, m->workers_limit);
}
}
- else {
- /* hang around until the stream deregisters */
+ }
+
+ if (task->orphaned) {
+ /* TODO: add to purge list */
+ task_destroy(m, task, 0);
+ if (m->join_wait) {
+ apr_thread_cond_signal(m->join_wait);
}
}
else {
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
- "h2_mplx(%ld): task %s without corresp. h2_io",
- m->id, task->id);
+ /* hang around until the stream deregisters */
}
}
}
* h2_mplx DoS protection
******************************************************************************/
-typedef struct {
- h2_mplx *m;
- h2_io *io;
- apr_time_t now;
-} io_iter_ctx;
-
-static int latest_repeatable_busy_unsubmitted_iter(void *data, void *val)
+static int latest_repeatable_unsubmitted_iter(void *data, void *val)
{
- io_iter_ctx *ctx = data;
- h2_io *io = val;
- if (io->worker_started && !io->worker_done
- && h2_io_can_redo(io) && !h2_ilist_get(ctx->m->redo_ios, io->id)) {
- /* this io occupies a worker, the response has not been submitted yet,
+ task_iter_ctx *ctx = data;
+ h2_task *task = val;
+ if (!task->worker_done && h2_task_can_redo(task)
+ && !h2_ihash_get(ctx->m->redo_tasks, task->stream_id)) {
+ /* this task occupies a worker, the response has not been submitted yet,
* not been cancelled and it is a repeatable request
* -> it can be re-scheduled later */
- if (!ctx->io || ctx->io->started_at < io->started_at) {
+ if (!ctx->task || ctx->task->started_at < task->started_at) {
/* we did not have one or this one was started later */
- ctx->io = io;
+ ctx->task = task;
}
}
return 1;
}
-static h2_io *get_latest_repeatable_busy_unsubmitted_io(h2_mplx *m)
+static h2_task *get_latest_repeatable_unsubmitted_task(h2_mplx *m)
{
- io_iter_ctx ctx;
+ task_iter_ctx ctx;
ctx.m = m;
- ctx.io = NULL;
- h2_ilist_iter(m->stream_ios, latest_repeatable_busy_unsubmitted_iter, &ctx);
- return ctx.io;
+ ctx.task = NULL;
+ h2_ihash_iter(m->tasks, latest_repeatable_unsubmitted_iter, &ctx);
+ return ctx.task;
}
static int timed_out_busy_iter(void *data, void *val)
{
- io_iter_ctx *ctx = data;
- h2_io *io = val;
- if (io->worker_started && !io->worker_done
- && (ctx->now - io->started_at) > ctx->m->stream_timeout) {
+ task_iter_ctx *ctx = data;
+ h2_task *task = val;
+ if (!task->worker_done
+ && (ctx->now - task->started_at) > ctx->m->stream_timeout) {
/* timed out stream occupying a worker, found */
- ctx->io = io;
+ ctx->task = task;
return 0;
}
return 1;
}
-static h2_io *get_timed_out_busy_stream(h2_mplx *m)
+
+static h2_task *get_timed_out_busy_task(h2_mplx *m)
{
- io_iter_ctx ctx;
+ task_iter_ctx ctx;
ctx.m = m;
- ctx.io = NULL;
+ ctx.task = NULL;
ctx.now = apr_time_now();
- h2_ilist_iter(m->stream_ios, timed_out_busy_iter, &ctx);
- return ctx.io;
+ h2_ihash_iter(m->tasks, timed_out_busy_iter, &ctx);
+ return ctx.task;
}
-static apr_status_t unschedule_slow_ios(h2_mplx *m)
+static apr_status_t unschedule_slow_tasks(h2_mplx *m)
{
- h2_io *io;
+ h2_task *task;
int n;
- if (!m->redo_ios) {
- m->redo_ios = h2_ilist_create(m->pool);
+ if (!m->redo_tasks) {
+ m->redo_tasks = h2_ihash_create(m->pool, offsetof(h2_task, stream_id));
}
/* Try to get rid of streams that occupy workers. Look for safe requests
* that are repeatable. If none found, fail the connection.
*/
- n = (m->workers_busy - m->workers_limit - h2_ilist_count(m->redo_ios));
- while (n > 0 && (io = get_latest_repeatable_busy_unsubmitted_io(m))) {
- h2_ilist_add(m->redo_ios, io);
- h2_io_rst(io, H2_ERR_CANCEL);
+ n = (m->workers_busy - m->workers_limit - h2_ihash_count(m->redo_tasks));
+ while (n > 0 && (task = get_latest_repeatable_unsubmitted_task(m))) {
+ h2_task_rst(task, H2_ERR_CANCEL);
+ h2_ihash_add(m->redo_tasks, task);
--n;
}
- if ((m->workers_busy - h2_ilist_count(m->redo_ios)) > m->workers_limit) {
- io = get_timed_out_busy_stream(m);
- if (io) {
+ if ((m->workers_busy - h2_ihash_count(m->redo_tasks)) > m->workers_limit) {
+ task = get_timed_out_busy_task(m);
+ if (task) {
/* Too many busy workers, unable to cancel enough streams
* and with a busy, timed out stream, we tell the client
* to go away... */
int acquired;
if (enter_mutex(m, &acquired) == APR_SUCCESS) {
- apr_size_t scount = h2_ilist_count(m->stream_ios);
+ apr_size_t scount = h2_ihash_count(m->streams);
if (scount > 0 && m->workers_busy) {
/* If we have streams in connection state 'IDLE', meaning
* all streams are ready to sent data out, but lack
}
if (m->workers_busy > m->workers_limit) {
- status = unschedule_slow_ios(m);
+ status = unschedule_slow_tasks(m);
}
}
leave_mutex(m, acquired);
static int ngn_update_window(void *ctx, void *val)
{
ngn_update_ctx *uctx = ctx;
- h2_io *io = val;
- if (io && io->task && io->task->assigned == uctx->ngn
- && io_out_consumed_signal(uctx->m, io)) {
+ h2_task *task = val;
+ if (task && task->assigned == uctx->ngn
+ && output_consumed_signal(uctx->m, task)) {
++uctx->streams_updated;
}
return 1;
ctx.m = m;
ctx.ngn = ngn;
ctx.streams_updated = 0;
- h2_ilist_iter(m->stream_ios, ngn_update_window, &ctx);
+ h2_ihash_iter(m->tasks, ngn_update_window, &ctx);
return ctx.streams_updated? APR_SUCCESS : APR_EAGAIN;
}
task->r = r;
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_ilist_get(m->stream_ios, task->stream_id);
- if (!io || io->orphaned) {
+ if (task->orphaned) {
status = APR_ECONNABORTED;
}
else {