c->clogging_input_filters = 1;
c->log = NULL;
c->log_id = NULL;
- c->keepalives = 0;
+ /* Simulate that we had already a request on this connection. */
+ c->keepalives = 1;
/* We cannot install the master connection socket on the slaves, as
* modules mess with timeouts/blocking of the socket, with
* unwanted side effects to the master connection processing.
apr_status_t h2_slave_run_pre_connection(conn_rec *slave, apr_socket_t *csd)
{
- /* We always start slaves with 1 */
- slave->keepalives = 1;
return ap_run_pre_connection(slave, csd);
}
-apr_status_t h2_slave_needs_pre_run(conn_rec *slave)
-{
- return slave->keepalives == 0;
-}
-
apr_allocator_t *allocator);
void h2_slave_destroy(conn_rec *slave, apr_allocator_t **pallocator);
-apr_status_t h2_slave_needs_pre_run(conn_rec *slave);
apr_status_t h2_slave_run_pre_connection(conn_rec *slave, apr_socket_t *csd);
void h2_slave_run_connection(conn_rec *slave);
apr_pool_t *pool; /* stream pool */
apr_bucket_alloc_t *bucket_alloc;
- struct h2_request *request; /* request on this io */
+ const struct h2_request *request;/* request on this io */
struct h2_response *response; /* response to request */
int rst_error; /* h2 related stream abort error */
}
if (io->task) {
- conn_rec *slave = h2_task_detach(io->task);
+ conn_rec *slave = io->task->c;
h2_task_destroy(io->task);
io->task = NULL;
- if (slave) {
- if (m->spare_slaves->nelts < m->spare_slaves->nalloc) {
- APR_ARRAY_PUSH(m->spare_slaves, conn_rec*) = slave;
- }
- else {
- h2_slave_destroy(slave, NULL);
- }
+
+ if (m->spare_slaves->nelts < m->spare_slaves->nalloc) {
+ APR_ARRAY_PUSH(m->spare_slaves, conn_rec*) = slave;
+ }
+ else {
+ h2_slave_destroy(slave, NULL);
}
}
return status;
}
-static h2_request *pop_request(h2_mplx *m)
+static h2_task *pop_task(h2_mplx *m)
{
- h2_request *req = NULL;
- int stream_id;
-
- while (!m->aborted && !req
+ h2_task *task = NULL;
+ int sid;
+ while (!m->aborted && !task
&& (m->workers_busy < m->workers_limit)
- && (stream_id = h2_iq_shift(m->q)) > 0) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+ && (sid = h2_iq_shift(m->q)) > 0) {
+ h2_io *io = h2_io_set_get(m->stream_ios, sid);
if (io && io->orphaned) {
io_destroy(m, io, 0);
if (m->join_wait) {
}
}
else if (io) {
- req = io->request;
- io->started_at = apr_time_now();
- if (stream_id > m->max_stream_started) {
- m->max_stream_started = stream_id;
+ conn_rec *slave, **pslave;
+
+ pslave = (conn_rec **)apr_array_pop(m->spare_slaves);
+ if (pslave) {
+ slave = *pslave;
+ }
+ else {
+ slave = h2_slave_create(m->c, m->pool, NULL);
+ h2_slave_run_pre_connection(slave, ap_get_conn_socket(slave));
}
+
+
+ io->task = task = h2_task_create(m->id, io->request, slave, m);
+ apr_table_setn(slave->notes, H2_TASK_ID_NOTE, task->id);
+
io->worker_started = 1;
+ io->started_at = apr_time_now();
+ if (sid > m->max_stream_started) {
+ m->max_stream_started = sid;
+ }
++m->workers_busy;
}
}
- return req;
+ return task;
}
-static conn_rec *get_slave(h2_mplx *m)
+h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more)
{
- conn_rec **pslave = (conn_rec **)apr_array_pop(m->spare_slaves);
- if (pslave) {
- return *pslave;
- }
- else {
- return h2_slave_create(m->c, m->pool, NULL);
- }
-}
-
-conn_rec *h2_mplx_get_slave(h2_mplx *m)
-{
- conn_rec *slave = NULL;
- int acquired;
-
- AP_DEBUG_ASSERT(m);
- if (enter_mutex(m, &acquired) == APR_SUCCESS) {
- slave = get_slave(m);
- leave_mutex(m, acquired);
- }
- return slave;
-}
-
-h2_request *h2_mplx_pop_request(h2_mplx *m, int *has_more)
-{
- h2_request *req = NULL;
+ h2_task *task = NULL;
apr_status_t status;
int acquired;
*has_more = 0;
}
else {
- req = pop_request(m);
+ task = pop_task(m);
*has_more = !h2_iq_empty(m->q);
}
- if (!req && has_more) {
+ if (has_more && !task) {
m->need_registration = 1;
}
leave_mutex(m, acquired);
}
- return req;
+ return task;
}
static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
apr_thread_cond_broadcast(m->task_thawed);
}
else {
- h2_io *io = h2_io_set_get(m->stream_ios, task->request->id);
+ h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): task(%s) done", m->id, task->id);
/* TODO: this will keep a worker attached to this h2_mplx as
* long as it has requests to handle. Might no be fair to
* other mplx's. Perhaps leave after n requests? */
- h2_mplx_out_close(m, task->request->id, NULL);
+ h2_mplx_out_close(m, task->stream_id, NULL);
if (ngn && io) {
apr_off_t bytes = io->output_consumed + h2_io_out_length(io);
}
}
-void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_request **preq)
+void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask)
{
int acquired;
if (enter_mutex(m, &acquired) == APR_SUCCESS) {
task_done(m, task, NULL);
--m->workers_busy;
- if (preq) {
+ if (ptask) {
/* caller wants another task */
- *preq = pop_request(m);
+ *ptask = pop_task(m);
}
leave_mutex(m, acquired);
}
task->r = r;
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_get(m->stream_ios, task->request->id);
+ h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
if (!io || io->orphaned) {
status = APR_ECONNABORTED;
}
else {
- io->task = task;
status = h2_ngn_shed_push_task(m->ngn_shed, ngn_type, task, einit);
}
leave_mutex(m, acquired);
*/
void h2_mplx_abort(h2_mplx *mplx);
-struct h2_request *h2_mplx_pop_request(h2_mplx *mplx, int *has_more);
+struct h2_task *h2_mplx_pop_task(h2_mplx *mplx, int *has_more);
-void h2_mplx_task_done(h2_mplx *m, struct h2_task *task,
- struct h2_request **prequest);
+void h2_mplx_task_done(h2_mplx *m, struct h2_task *task, struct h2_task **ptask);
/**
* Get the highest stream identifier that has been passed on to processing.
*/
int h2_mplx_get_max_stream_started(h2_mplx *m);
-conn_rec *h2_mplx_get_slave(h2_mplx *m);
-
/*******************************************************************************
* IO lifetime of streams.
******************************************************************************/
return APR_SUCCESS;
}
-h2_task *h2_task_create(apr_pool_t *pool, const h2_request *req, h2_mplx *mplx)
+h2_task *h2_task_create(long session_id, const h2_request *req,
+ conn_rec *c, h2_mplx *mplx)
{
- h2_task *task = apr_pcalloc(pool, sizeof(h2_task));
+ apr_pool_t *pool;
+ h2_task *task;
+
+ apr_pool_create(&pool, c->pool);
+ task = apr_pcalloc(pool, sizeof(h2_task));
if (task == NULL) {
- ap_log_perror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, pool,
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, c,
APLOGNO(02941) "h2_task(%ld-%d): create stream task",
- mplx->id, req->id);
+ session_id, req->id);
h2_mplx_out_close(mplx, req->id, NULL);
return NULL;
}
- task->id = apr_psprintf(pool, "%ld-%d", mplx->id, req->id);
- task->pool = pool;
+
+ task->id = apr_psprintf(pool, "%ld-%d", session_id, req->id);
+ task->stream_id = req->id;
+ task->c = c;
task->mplx = mplx;
+ task->pool = pool;
task->request = req;
task->input_eos = !req->body;
task->ser_headers = req->serialize;
task->blocking = 1;
- return task;
-}
-conn_rec *h2_task_detach(h2_task *task)
-{
- conn_rec *c = task->c;
- if (c) {
- task->c = NULL;
- ap_remove_input_filter_byhandle(c->output_filters, "H2_TO_H1");
- ap_remove_output_filter_byhandle(c->output_filters, "H1_TO_H2");
- apr_table_setn(c->notes, H2_TASK_ID_NOTE, NULL);
- }
- return c;
+ h2_ctx_create_for(c, task);
+ /* Add our own, network level in- and output filters. */
+ ap_add_input_filter("H2_TO_H1", task, NULL, c);
+ ap_add_output_filter("H1_TO_H2", task, NULL, c);
+
+ return task;
}
void h2_task_destroy(h2_task *task)
{
- h2_task_detach(task);
+ ap_remove_input_filter_byhandle(task->c->input_filters, "H2_TO_H1");
+ ap_remove_output_filter_byhandle(task->c->output_filters, "H1_TO_H2");
if (task->pool) {
apr_pool_destroy(task->pool);
- /* memory gone */
- }
-}
-
-void h2_task_attach(h2_task *task, conn_rec *c)
-{
- if (task->c) {
- h2_task_detach(task);
}
- task->c = c;
- h2_ctx_create_for(c, task);
- apr_table_setn(c->notes, H2_TASK_ID_NOTE, task->id);
- ap_add_input_filter("H2_TO_H1", task, NULL, c);
- ap_add_output_filter("H1_TO_H2", task, NULL, c);
}
void h2_task_set_io_blocking(h2_task *task, int blocking)
struct h2_task {
const char *id;
- apr_pool_t *pool;
+ int stream_id;
+ conn_rec *c;
struct h2_mplx *mplx;
+ apr_pool_t *pool;
const struct h2_request *request;
- conn_rec *c;
unsigned int filters_set : 1;
unsigned int input_eos : 1;
request_rec *r; /* request being processed in this task */
};
-h2_task *h2_task_create(apr_pool_t *pool, const struct h2_request *req,
- struct h2_mplx *mplx);
+h2_task *h2_task_create(long session_id, const struct h2_request *req,
+ conn_rec *c, struct h2_mplx *mplx);
void h2_task_destroy(h2_task *task);
-/**
- * Attach the task to the given connection, install filter etc.
- */
-void h2_task_attach(h2_task *task, conn_rec *c);
-/**
- * Remove any attachments to the connection again, if still attached.
- * Return the connection or NULL if none was attached.
- */
-conn_rec *h2_task_detach(h2_task *task);
-
apr_status_t h2_task_do(h2_task *task, struct apr_thread_cond_t *cond);
void h2_task_register_hooks(void);
apr_status_t status = APR_SUCCESS;
apr_off_t bblen = 0;
- AP_DEBUG_ASSERT(input);
- AP_DEBUG_ASSERT(input->task);
- AP_DEBUG_ASSERT(f->c);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
"h2_task_input(%s): read, block=%d, mode=%d, readbytes=%ld",
input->task->id, block, mode, (long)readbytes);
* setting.
*/
status = h2_mplx_in_read(input->task->mplx, block,
- input->task->request->id, input->bb,
+ input->task->stream_id, input->bb,
f->r? f->r->trailers_in : NULL,
input->task->io);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
h2_task_output *output = apr_pcalloc(task->pool, sizeof(h2_task_output));
if (output) {
output->task = task;
- output->from_h1 = h2_from_h1_create(task->request->id, task->pool);
+ output->from_h1 = h2_from_h1_create(task->stream_id, task->pool);
}
return output;
}
output->task->id, output->task->request->method,
output->task->request->authority,
output->task->request->path);
- return h2_mplx_out_open(output->task->mplx, output->task->request->id,
+ return h2_mplx_out_open(output->task->mplx, output->task->stream_id,
response, f, bb, output->task->io);
}
"h2_task(%s): write response body (%ld bytes)",
output->task->id, (long)written);
- status = h2_mplx_out_write(output->task->mplx, output->task->request->id,
+ status = h2_mplx_out_write(output->task->mplx, output->task->stream_id,
f, output->task->blocking, bb,
get_trailers(output), output->task->io);
if (status == APR_INCOMPLETE) {
}
if (output->task->frozen) {
- h2_util_bb_log(output->task->c, output->task->request->id, APLOG_TRACE2,
+ h2_util_bb_log(output->task->c, output->task->stream_id, APLOG_TRACE2,
"frozen task output write, ignored", bb);
return APR_SUCCESS;
}
static void* APR_THREAD_FUNC execute(apr_thread_t *thread, void *wctx)
{
h2_worker *worker = (h2_worker *)wctx;
- h2_mplx *mplx;
- h2_request *req;
int sticky;
while (!worker->aborted) {
+ h2_task *task;
/* Get a h2_task from the main workers queue. */
- worker->get_next(worker, worker->ctx, &mplx, &req, &sticky);
- while (req) {
- h2_task *task;
- apr_pool_t *pool;
- conn_rec *slave;
-
- slave = h2_mplx_get_slave(mplx);
- if (h2_slave_needs_pre_run(slave)) {
- h2_slave_run_pre_connection(slave, ap_get_conn_socket(slave));
- }
-
- apr_pool_create(&pool, slave->pool);
- task = h2_task_create(pool, req, mplx);
- h2_task_attach(task, slave);
+ worker->get_next(worker, worker->ctx, &task, &sticky);
+ while (task) {
h2_task_do(task, worker->io);
/* if someone was waiting on this task, time to wake up */
/* report the task done and maybe get another one from the same
* mplx (= master connection), if we can be sticky.
*/
- req = NULL;
if (sticky && !worker->aborted) {
- h2_mplx_task_done(mplx, task, &req);
+ h2_mplx_task_done(task->mplx, task, &task);
}
else {
- h2_mplx_task_done(mplx, task, NULL);
+ h2_mplx_task_done(task->mplx, task, NULL);
+ task = NULL;
}
- task = NULL;
}
}
* gets aborted (idle timeout, for example). */
typedef apr_status_t h2_worker_mplx_next_fn(h2_worker *worker,
void *ctx,
- struct h2_mplx **pmplx,
- struct h2_request **prequest,
+ struct h2_task **ptask,
int *psticky);
/* Invoked just before the worker thread exits. */
}
}
-static h2_request *next_request(h2_workers *workers, h2_mplx **pmplx)
+static h2_task *next_task(h2_workers *workers)
{
- h2_request *req = NULL;
+ h2_task *task = NULL;
h2_mplx *last = NULL;
int has_more;
* new mplx to arrive. Depending on how many workers do exist,
* we do a timed wait or block indefinitely.
*/
- while (!req && !H2_MPLX_LIST_EMPTY(&workers->mplxs)) {
+ while (!task && !H2_MPLX_LIST_EMPTY(&workers->mplxs)) {
h2_mplx *m = H2_MPLX_LIST_FIRST(&workers->mplxs);
if (last == m) {
H2_MPLX_REMOVE(m);
--workers->mplx_count;
- req = h2_mplx_pop_request(m, &has_more);
+ task = h2_mplx_pop_task(m, &has_more);
if (has_more) {
H2_MPLX_LIST_INSERT_TAIL(&workers->mplxs, m);
++workers->mplx_count;
last = m;
}
}
-
- if (req) {
- *pmplx = m;
- return req;
- }
}
- return req;
+ return task;
}
/**
* or the max_wait timer expires and more than min workers exist.
*/
static apr_status_t get_mplx_next(h2_worker *worker, void *ctx,
- h2_mplx **pmplx, h2_request **preq,
- int *psticky)
+ h2_task **ptask, int *psticky)
{
apr_status_t status;
apr_time_t wait_until = 0, now;
h2_workers *workers = ctx;
- h2_request *req = NULL;
- h2_mplx *mplx = NULL;
+ h2_task *task = NULL;
- *preq = NULL;
- *pmplx = NULL;
+ *ptask = NULL;
*psticky = 0;
status = apr_thread_mutex_lock(workers->lock);
"h2_worker(%d): looking for work", h2_worker_get_id(worker));
while (!h2_worker_is_aborted(worker) && !workers->aborted
- && !(req = next_request(workers, &mplx))) {
+ && !(task = next_task(workers))) {
/* Need to wait for a new tasks to arrive. If we are above
* minimum workers, we do a timed wait. When timeout occurs
}
}
- /* Here, we either have gotten a request or decided to shut down
+ /* Here, we either have gotten task or decided to shut down
* the calling worker.
*/
- if (req) {
+ if (task) {
/* Ok, we got something to give back to the worker for execution.
* If we have more idle workers than h2_mplx in our queue, then
* we let the worker be sticky, e.g. making it poll the task's
* has no new tasks to process, so the worker will get back here
* eventually.
*/
- *preq = req;
- *pmplx = mplx;
+ *ptask = task;
*psticky = (workers->max_workers >= workers->mplx_count);
if (workers->mplx_count && workers->idle_workers > 1) {
apr_thread_mutex_unlock(workers->lock);
}
- return *preq? APR_SUCCESS : APR_EOF;
+ return *ptask? APR_SUCCESS : APR_EOF;
}
static void worker_done(h2_worker *worker, void *ctx)