static void fix_event_conn(conn_rec *c, conn_rec *master);
-conn_rec *h2_conn_create(conn_rec *master, apr_pool_t *pool)
+conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *p,
+ apr_thread_t *thread, apr_socket_t *socket)
{
conn_rec *c;
AP_DEBUG_ASSERT(master);
-
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, master,
+ "h2_conn(%ld): created from master", master->id);
+
/* This is like the slave connection creation from 2.5-DEV. A
* very efficient way - not sure how compatible this is, since
* the core hooks are no longer run.
* But maybe it's is better this way, not sure yet.
*/
- c = (conn_rec *) apr_palloc(pool, sizeof(conn_rec));
+ c = (conn_rec *) apr_palloc(p, sizeof(conn_rec));
if (c == NULL) {
- ap_log_perror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, pool,
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, master,
APLOGNO(02913) "h2_task: creating conn");
return NULL;
}
memcpy(c, master, sizeof(conn_rec));
- c->id = (master->id & (long)pool);
- c->master = master;
- c->input_filters = NULL;
- c->output_filters = NULL;
- c->pool = pool;
- return c;
-}
-
-apr_status_t h2_slave_setup(h2_task *task, apr_bucket_alloc_t *bucket_alloc,
- apr_thread_t *thread, apr_socket_t *socket)
-{
- conn_rec *master = task->mplx->c;
-
- ap_log_perror(APLOG_MARK, APLOG_TRACE3, 0, task->pool,
- "h2_conn(%ld): created from master", master->id);
-
- /* Ok, we are just about to start processing the connection and
- * the worker is calling us to setup all necessary resources.
- * We can borrow some from the worker itself and some we do as
- * sub-resources from it, so that we get a nice reuse of
- * pools.
- */
- task->c->pool = task->pool;
- task->c->current_thread = thread;
- task->c->bucket_alloc = bucket_alloc;
+
+ /* Replace these */
+ c->id = (master->id & (long)p);
+ c->master = master;
+ c->pool = p;
+ c->current_thread = thread;
+ c->conn_config = ap_create_conn_config(p);
+ c->notes = apr_table_make(p, 5);
+ c->input_filters = NULL;
+ c->output_filters = NULL;
+ c->bucket_alloc = apr_bucket_alloc_create(p);
+ c->cs = NULL;
+ c->data_in_input_filters = 0;
+ c->data_in_output_filters = 0;
+ c->clogging_input_filters = 1;
+ c->log = NULL;
+ c->log_id = NULL;
- task->c->conn_config = ap_create_conn_config(task->pool);
- task->c->notes = apr_table_make(task->pool, 5);
+ /* TODO: these should be unique to this thread */
+ c->sbh = master->sbh;
- /* In order to do this in 2.4.x, we need to add a member to conn_rec */
- task->c->master = master;
+ /* Simulate that we had already a request on this connection. */
+ c->keepalives = 1;
- ap_set_module_config(task->c->conn_config, &core_module, socket);
+ ap_set_module_config(c->conn_config, &core_module, socket);
/* This works for mpm_worker so far. Other mpm modules have
* different needs, unfortunately. The most interesting one
/* all fine */
break;
case H2_MPM_EVENT:
- fix_event_conn(task->c, master);
+ fix_event_conn(c, master);
break;
default:
/* fingers crossed */
break;
}
- /* Simulate that we had already a request on this connection. */
- task->c->keepalives = 1;
-
- return APR_SUCCESS;
+ return c;
}
/* This is an internal mpm event.c struct which is disguised
h2_mpm_type_t h2_conn_mpm_type(void);
-conn_rec *h2_conn_create(conn_rec *master, apr_pool_t *stream_pool);
-
-apr_status_t h2_slave_setup(struct h2_task *task, apr_bucket_alloc_t *bucket_alloc,
- apr_thread_t *thread, apr_socket_t *socket);
+conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *p,
+ apr_thread_t *thread, apr_socket_t *socket);
#endif /* defined(__mod_h2__h2_conn__) */
/* setup the correct output filters to process the response
* on the proper mod_http2 way. */
ap_log_rerror(APLOG_MARK, APLOG_TRACE3, 0, r, "adding task output filter");
- if (task->serialize_headers) {
+ if (task->ser_headers) {
ap_add_output_filter("H1_TO_H2_RESP", task, r, r->connection);
}
else {
#include "h2_task.h"
#include "h2_util.h"
-h2_io *h2_io_create(int id, apr_pool_t *pool, apr_bucket_alloc_t *bucket_alloc)
+h2_io *h2_io_create(int id, apr_pool_t *pool)
{
h2_io *io = apr_pcalloc(pool, sizeof(*io));
if (io) {
io->id = id;
io->pool = pool;
- io->bucket_alloc = bucket_alloc;
+ io->bucket_alloc = apr_bucket_alloc_create(pool);
}
return io;
}
/**
* Creates a new h2_io for the given stream id.
*/
-h2_io *h2_io_create(int id, apr_pool_t *pool, apr_bucket_alloc_t *bucket_alloc);
+h2_io *h2_io_create(int id, apr_pool_t *pool);
/**
* Frees any resources hold by the h2_io instance.
return NULL;
}
- m->bucket_alloc = apr_bucket_alloc_create(m->pool);
-
m->q = h2_tq_create(m->pool, h2_config_geti(conf, H2_CONF_MAX_STREAMS));
m->stream_ios = h2_io_set_create(m->pool);
m->ready_ios = h2_io_set_create(m->pool);
workers_unregister(m);
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
+ /* disable WINDOW_UPDATE callbacks */
+ h2_mplx_set_consumed_cb(m, NULL, NULL);
while (!h2_io_set_iter(m->stream_ios, stream_done_iter, m)) {
/* iterator until all h2_io have been orphaned or destroyed */
}
m->spare_pool = NULL;
}
- io = h2_io_create(stream_id, io_pool, m->bucket_alloc);
+ io = h2_io_create(stream_id, io_pool);
h2_io_set_add(m->stream_ios, io);
return io;
}
-apr_status_t h2_mplx_process(h2_mplx *m, int stream_id,
- const h2_request *req, int eos,
+apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, const h2_request *req,
h2_stream_pri_cmp *cmp, void *ctx)
{
apr_status_t status;
if (APR_SUCCESS == status) {
h2_io *io = open_io(m, stream_id);
io->request = req;
- io->request_body = !eos;
- if (eos) {
+ if (!io->request->body) {
status = h2_io_in_close(io);
}
return status;
}
-h2_task *h2_mplx_pop_task(h2_mplx *m, h2_worker *w, int *has_more)
+const h2_request *h2_mplx_pop_request(h2_mplx *m, int *has_more)
{
- h2_task *task = NULL;
+ const h2_request *req = NULL;
apr_status_t status;
AP_DEBUG_ASSERT(m);
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
int sid;
- while (!task && (sid = h2_tq_shift(m->q)) > 0) {
- /* Anything not already setup correctly in the task
- * needs to be so now, as task will be executed right about
- * when this method returns. */
+ while (!req && (sid = h2_tq_shift(m->q)) > 0) {
h2_io *io = h2_io_set_get(m->stream_ios, sid);
if (io) {
- task = h2_worker_create_task(w, m, io->request, !io->request_body);
+ req = io->request;
}
}
*has_more = !h2_tq_empty(m->q);
apr_thread_mutex_unlock(m->lock);
}
- return task;
+ return req;
}
struct h2_request;
struct h2_io_set;
struct apr_thread_cond_t;
-struct h2_worker;
struct h2_workers;
struct h2_stream_set;
struct h2_task_queue;
volatile int refs;
conn_rec *c;
apr_pool_t *pool;
- apr_bucket_alloc_t *bucket_alloc;
unsigned int aborted : 1;
* @param m the multiplexer
* @param stream_id the identifier of the stream
* @param r the request to be processed
- * @param eos if input is complete
* @param cmp the stream priority compare function
* @param ctx context data for the compare function
*/
-apr_status_t h2_mplx_process(h2_mplx *m, int stream_id,
- const struct h2_request *r, int eos,
+apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, const struct h2_request *r,
h2_stream_pri_cmp *cmp, void *ctx);
/**
*/
apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx);
-struct h2_task *h2_mplx_pop_task(h2_mplx *mplx, struct h2_worker *w, int *has_more);
+const struct h2_request *h2_mplx_pop_request(h2_mplx *mplx, int *has_more);
/**
* Register a callback for the amount of input data consumed per stream. The
unsigned int chunked : 1; /* iff requst body needs to be forwarded as chunked */
unsigned int eoh : 1; /* iff end-of-headers has been seen and request is complete */
+ unsigned int body : 1; /* iff this request has a body */
unsigned int push : 1; /* iff server push is possible for this request */
const struct h2_config *config;
session->id, (int)h2_stream_set_size(session->streams));
}
if (session->mplx) {
+ h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
h2_mplx_release_and_join(session->mplx, session->iowait);
session->mplx = NULL;
}
eos, push_enabled);
if (status == APR_SUCCESS) {
if (!eos) {
+ stream->request->body = 1;
stream->bbin = apr_brigade_create(stream->pool,
stream->session->c->bucket_alloc);
}
stream->input_remaining = stream->request->content_length;
status = h2_mplx_process(stream->session->mplx, stream->id,
- stream->request, eos, cmp, ctx);
+ stream->request, cmp, ctx);
stream->scheduled = 1;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
}
h2_task *h2_task_create(long session_id, const h2_request *req,
- apr_pool_t *pool, h2_mplx *mplx, int eos)
+ apr_pool_t *pool, h2_mplx *mplx)
{
- h2_task *task = apr_pcalloc(pool, sizeof(h2_task));
+ h2_task *task = apr_pcalloc(pool, sizeof(h2_task));
if (task == NULL) {
ap_log_perror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, pool,
APLOGNO(02941) "h2_task(%ld-%d): create stream task",
return NULL;
}
- task->id = apr_psprintf(pool, "%ld-%d", session_id, req->id);
- task->stream_id = req->id;
- task->pool = pool;
- task->mplx = mplx;
- task->c = h2_conn_create(mplx->c, task->pool);
+ task->id = apr_psprintf(pool, "%ld-%d", session_id, req->id);
+ task->stream_id = req->id;
+ task->mplx = mplx;
+ task->request = req;
+ task->input_eos = !req->body;
+ task->ser_headers = h2_config_geti(req->config, H2_CONF_SER_HEADERS);
- task->request = req;
- task->input_eos = eos;
-
return task;
}
-apr_status_t h2_task_destroy(h2_task *task)
-{
- (void)task;
- return APR_SUCCESS;
-}
-
-apr_status_t h2_task_do(h2_task *task, h2_worker *worker)
+apr_status_t h2_task_do(h2_task *task, conn_rec *c, apr_thread_cond_t *cond,
+ apr_socket_t *socket)
{
- apr_status_t status = APR_SUCCESS;
-
AP_DEBUG_ASSERT(task);
+ task->io = cond;
+ task->input = h2_task_input_create(task, c->pool, c->bucket_alloc);
+ task->output = h2_task_output_create(task, c->pool);
- task->serialize_headers = h2_config_geti(task->request->config, H2_CONF_SER_HEADERS);
-
- status = h2_worker_setup_task(worker, task);
+ ap_process_connection(c, socket);
- /* save in connection that this one is a pseudo connection */
- h2_ctx_create_for(task->c, task);
-
- if (status == APR_SUCCESS) {
- task->input = h2_task_input_create(task, task->pool,
- task->c->bucket_alloc);
- task->output = h2_task_output_create(task, task->pool);
-
- ap_process_connection(task->c, h2_worker_get_socket(worker));
-
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
- "h2_task(%s): processing done", task->id);
- }
- else {
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, task->c,
- APLOGNO(02957) "h2_task(%s): error setting up h2_task",
- task->id);
- }
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+ "h2_task(%s): processing done", task->id);
- if (task->input) {
- h2_task_input_destroy(task->input);
- task->input = NULL;
- }
+ h2_task_input_destroy(task->input);
+ h2_task_output_close(task->output);
+ h2_task_output_destroy(task->output);
+ task->io = NULL;
- if (task->output) {
- h2_task_output_close(task->output);
- h2_task_output_destroy(task->output);
- task->output = NULL;
- }
-
- if (task->io) {
- apr_thread_cond_signal(task->io);
- }
-
- h2_worker_release_task(worker, task);
- h2_mplx_task_done(task->mplx, task->stream_id);
-
- return status;
+ return APR_SUCCESS;
}
static apr_status_t h2_task_process_request(const h2_request *req, conn_rec *c)
ctx = h2_ctx_get(c, 0);
if (h2_ctx_is_task(ctx)) {
- if (!ctx->task->serialize_headers) {
+ if (!ctx->task->ser_headers) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c,
"h2_h2, processing request directly");
h2_task_process_request(ctx->task->request, c);
struct h2_task {
const char *id;
int stream_id;
- apr_pool_t *pool;
- apr_bucket_alloc_t *bucket_alloc;
-
struct h2_mplx *mplx;
- struct conn_rec *c;
const struct h2_request *request;
- unsigned int filters_set : 1;
- unsigned int input_eos : 1;
- unsigned int serialize_headers : 1;
+ unsigned int filters_set : 1;
+ unsigned int input_eos : 1;
+ unsigned int ser_headers : 1;
struct h2_task_input *input;
struct h2_task_output *output;
-
struct apr_thread_cond_t *io; /* used to wait for events on */
};
h2_task *h2_task_create(long session_id, const struct h2_request *req,
- apr_pool_t *pool, struct h2_mplx *mplx,
- int eos);
-
-apr_status_t h2_task_destroy(h2_task *task);
+ apr_pool_t *pool, struct h2_mplx *mplx);
-apr_status_t h2_task_do(h2_task *task, struct h2_worker *worker);
+apr_status_t h2_task_do(h2_task *task, conn_rec *c,
+ struct apr_thread_cond_t *cond, apr_socket_t *socket);
void h2_task_register_hooks(void);
input->task = task;
input->bb = NULL;
- if (task->serialize_headers) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
+ if (task->ser_headers) {
+ ap_log_perror(APLOG_MARK, APLOG_TRACE1, 0, pool,
"h2_task_input(%s): serialize request %s %s",
task->id, task->request->method, task->request->path);
input->bb = apr_brigade_create(pool, bucket_alloc);
* @macro
* Version number of the h2 module as c string
*/
-#define MOD_HTTP2_VERSION "1.0.13-DEVa"
+#define MOD_HTTP2_VERSION "1.0.14-DEVa"
/**
* @macro
* release. This is a 24 bit number with 8 bits for major number, 8 bits
* for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
*/
-#define MOD_HTTP2_VERSION_NUM 0x01000d
+#define MOD_HTTP2_VERSION_NUM 0x01000e
#endif /* mod_h2_h2_version_h */
#include "h2_private.h"
#include "h2_conn.h"
+#include "h2_ctx.h"
+#include "h2_h2.h"
#include "h2_mplx.h"
#include "h2_request.h"
#include "h2_task.h"
h2_worker *worker = (h2_worker *)wctx;
apr_status_t status = APR_SUCCESS;
h2_mplx *m;
+ const h2_request *req;
+ h2_task *task;
+ conn_rec *c, *master;
+ int stream_id;
+
(void)thread;
/* Furthermore, other code might want to see the socket for
return NULL;
}
- worker->task = NULL;
m = NULL;
while (!worker->aborted) {
- status = worker->get_next(worker, &m, &worker->task, worker->ctx);
+ status = worker->get_next(worker, &m, &req, worker->ctx);
- if (worker->task) {
- h2_task_do(worker->task, worker);
- worker->task = NULL;
- apr_thread_cond_signal(worker->io);
+ if (req) {
+ stream_id = req->id;
+ master = m->c;
+ c = h2_slave_create(master, worker->task_pool,
+ worker->thread, worker->socket);
+ if (!c) {
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, c,
+ APLOGNO(02957) "h2_task(%s): error setting up slave connection",
+ task->id);
+ h2_mplx_out_rst(m, task->stream_id, H2_ERR_INTERNAL_ERROR);
+ }
+ else {
+ task = h2_task_create(m->id, req, worker->task_pool, m);
+ h2_ctx_create_for(c, task);
+ h2_task_do(task, c, worker->io, worker->socket);
+
+ apr_thread_cond_signal(worker->io);
+ }
+ apr_pool_clear(worker->task_pool);
+ /* task is gone */
+ task = NULL;
+ h2_mplx_task_done(m, stream_id);
}
}
}
apr_pool_pre_cleanup_register(w->pool, w, cleanup_join_thread);
+ apr_pool_create(&w->task_pool, w->pool);
apr_thread_create(&w->thread, attr, execute, w, w->pool);
}
return w;
}
h2_task *h2_worker_create_task(h2_worker *worker, h2_mplx *m,
- const h2_request *req, int eos)
+ const h2_request *req)
{
h2_task *task;
- /* Create a subpool from the worker one to be used for all things
- * with life-time of this task execution.
- */
- if (!worker->task_pool) {
- apr_pool_create(&worker->task_pool, worker->pool);
- worker->pool_reuses = 100;
- }
- task = h2_task_create(m->id, req, worker->task_pool, m, eos);
-
- /* Link the task to the worker which provides useful things such
- * as mutex, a socket etc. */
- task->io = worker->io;
-
+ task = h2_task_create(m->id, req, worker->task_pool, m);
return task;
}
-apr_status_t h2_worker_setup_task(h2_worker *worker, h2_task *task) {
- apr_status_t status;
-
-
- status = h2_slave_setup(task, apr_bucket_alloc_create(task->pool),
- worker->thread, worker->socket);
-
- return status;
-}
-
-void h2_worker_release_task(h2_worker *worker, struct h2_task *task)
-{
- task->io = NULL;
- task->pool = NULL;
- if (worker->pool_reuses-- <= 0) {
- apr_pool_destroy(worker->task_pool);
- worker->task_pool = NULL;
- }
- else {
- apr_pool_clear(worker->task_pool);
- }
-}
-
-apr_socket_t *h2_worker_get_socket(h2_worker *worker)
-{
- return worker->socket;
-}
-
* gets aborted (idle timeout, for example). */
typedef apr_status_t h2_worker_mplx_next_fn(h2_worker *worker,
struct h2_mplx **pm,
- struct h2_task **ptask,
+ const struct h2_request **preq,
void *ctx);
/* Invoked just before the worker thread exits. */
void *ctx;
unsigned int aborted : 1;
- int pool_reuses;
- struct h2_task *task;
};
/**
int h2_worker_is_aborted(h2_worker *worker);
struct h2_task *h2_worker_create_task(h2_worker *worker, struct h2_mplx *m,
- const struct h2_request *req, int eos);
-apr_status_t h2_worker_setup_task(h2_worker *worker, struct h2_task *task);
-void h2_worker_release_task(h2_worker *worker, struct h2_task *task);
-
-apr_socket_t *h2_worker_get_socket(h2_worker *worker);
-
+ const struct h2_request *req);
+
#endif /* defined(__mod_h2__h2_worker__) */
#include "h2_private.h"
#include "h2_mplx.h"
-#include "h2_task.h"
+#include "h2_request.h"
#include "h2_task_queue.h"
#include "h2_worker.h"
#include "h2_workers.h"
* the h2_workers lock.
*/
static apr_status_t get_mplx_next(h2_worker *worker, h2_mplx **pm,
- h2_task **ptask, void *ctx)
+ const h2_request **preq, void *ctx)
{
apr_status_t status;
h2_mplx *m = NULL;
- h2_task *task = NULL;
+ const h2_request *req = NULL;
apr_time_t max_wait, start_wait;
int has_more = 0;
h2_workers *workers = (h2_workers *)ctx;
- if (*pm && ptask != NULL) {
+ if (*pm && preq != NULL) {
/* We have a h2_mplx instance and the worker wants the next task.
* Try to get one from the given mplx. */
- *ptask = h2_mplx_pop_task(*pm, worker, &has_more);
- if (*ptask) {
+ *preq = h2_mplx_pop_request(*pm, &has_more);
+ if (*preq) {
return APR_SUCCESS;
}
}
*pm = NULL;
}
- if (!ptask) {
+ if (!preq) {
/* the worker does not want a next task, we're done.
*/
return APR_SUCCESS;
ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
"h2_worker(%d): looking for work", h2_worker_get_id(worker));
- while (!task && !h2_worker_is_aborted(worker) && !workers->aborted) {
+ while (!req && !h2_worker_is_aborted(worker) && !workers->aborted) {
/* Get the next h2_mplx to process that has a task to hand out.
* If it does, place it at the end of the queu and return the
* we do a timed wait or block indefinitely.
*/
m = NULL;
- while (!task && !H2_MPLX_LIST_EMPTY(&workers->mplxs)) {
+ while (!req && !H2_MPLX_LIST_EMPTY(&workers->mplxs)) {
m = H2_MPLX_LIST_FIRST(&workers->mplxs);
H2_MPLX_REMOVE(m);
- task = h2_mplx_pop_task(m, worker, &has_more);
- if (task) {
+ req = h2_mplx_pop_request(m, &has_more);
+ if (req) {
if (has_more) {
H2_MPLX_LIST_INSERT_TAIL(&workers->mplxs, m);
}
}
}
- if (!task) {
+ if (!req) {
/* Need to wait for either a new mplx to arrive.
*/
cleanup_zombies(workers, 0);
/* Here, we either have gotten task and mplx for the worker or
* needed to give up with more than enough workers.
*/
- if (task) {
+ if (req) {
ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
- "h2_worker(%d): start task(%s)",
- h2_worker_get_id(worker), task->id);
+ "h2_worker(%d): start request(%ld-%d)",
+ h2_worker_get_id(worker), m->id, req->id);
/* Since we hand out a reference to the worker, we increase
* its ref count.
*/
h2_mplx_reference(m);
*pm = m;
- *ptask = task;
+ *preq = req;
if (has_more && workers->idle_worker_count > 1) {
apr_thread_cond_signal(workers->mplx_added);
struct apr_thread_mutex_t;
struct apr_thread_cond_t;
struct h2_mplx;
+struct h2_request;
struct h2_task;
struct h2_task_queue;