if (apr_bucket_shared_destroy(h)) {
h2_session *session = h->session;
if (session) {
- h2_session_cleanup(session);
+ h2_session_eoc_callback(session);
}
apr_bucket_free(h);
}
session->c->local_addr->port);
if (status != APR_SUCCESS) {
h2_session_abort(session, status, rv);
- h2_session_cleanup(session);
+ h2_session_eoc_callback(session);
return status;
}
static void fix_event_conn(conn_rec *c, conn_rec *master);
-/*
- * We would like to create the connection more lightweight like
- * slave connections in 2.5-DEV. But we get 500 responses on long
- * cgi tests in modules/h2.t as the script parsing seems to see an
- * EOF from the cgi before anything is sent.
- *
-conn_rec *h2_conn_create(conn_rec *master, apr_pool_t *pool)
-{
- conn_rec *c = (conn_rec *) apr_palloc(pool, sizeof(conn_rec));
-
- memcpy(c, master, sizeof(conn_rec));
- c->id = (master->id & (long)pool);
- c->slaves = NULL;
- c->master = master;
- c->input_filters = NULL;
- c->output_filters = NULL;
- c->pool = pool;
-
- return c;
-}
-*/
+static int SLAVE_CONN_25DEV_STYLE = 1;
conn_rec *h2_conn_create(conn_rec *master, apr_pool_t *pool)
{
conn_rec *c;
AP_DEBUG_ASSERT(master);
-
- /* CAVEAT: it seems necessary to setup the conn_rec in the master
- * connection thread. Other attempts crashed.
- * HOWEVER: we setup the connection using the pools and other items
- * from the master connection, since we do not want to allocate
- * lots of resources here.
- * Lets allocated pools and everything else when we actually start
- * working on this new connection.
- */
- /* Not sure about the scoreboard handle. Reusing the one from the main
- * connection could make sense, is not really correct, but we cannot
- * easily create new handles for our worker threads either.
- * TODO
- */
- socket = ap_get_module_config(master->conn_config, &core_module);
- c = ap_run_create_connection(pool, master->base_server,
- socket,
- master->id^((long)pool),
- master->sbh,
- master->bucket_alloc);
+
+ if (SLAVE_CONN_25DEV_STYLE) {
+ /* This is like the slave connection creation from 2.5-DEV. A
+ * very efficient way - not sure how compatible this is, since
+ * the core hooks are no longer run.
+ * But maybe it's is better this way, not sure yet.
+ */
+ c = (conn_rec *) apr_palloc(pool, sizeof(conn_rec));
+
+ memcpy(c, master, sizeof(conn_rec));
+ c->id = (master->id & (long)pool);
+ c->master = master;
+ c->input_filters = NULL;
+ c->output_filters = NULL;
+ c->pool = pool;
+ }
+ else {
+ /* CAVEAT: it seems necessary to setup the conn_rec in the master
+ * connection thread. Other attempts crashed.
+ * HOWEVER: we setup the connection using the pools and other items
+ * from the master connection, since we do not want to allocate
+ * lots of resources here.
+ * Lets allocated pools and everything else when we actually start
+ * working on this new connection.
+ */
+ /* Not sure about the scoreboard handle. Reusing the one from the main
+ * connection could make sense, is not really correct, but we cannot
+ * easily create new handles for our worker threads either.
+ * TODO
+ */
+ socket = ap_get_module_config(master->conn_config, &core_module);
+ c = ap_run_create_connection(pool, master->base_server,
+ socket,
+ master->id^((long)pool),
+ master->sbh,
+ master->bucket_alloc);
+ }
if (c == NULL) {
ap_log_perror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, pool,
APLOGNO(02913) "h2_task: creating conn");
- return NULL;
}
return c;
}
void h2_io_set_response(h2_io *io, h2_response *response)
{
+ AP_DEBUG_ASSERT(io->pool);
AP_DEBUG_ASSERT(response);
AP_DEBUG_ASSERT(!io->response);
io->response = h2_response_copy(io->pool, response);
struct h2_io {
int id; /* stream identifier */
apr_pool_t *pool; /* stream pool */
- int zombie;
+ int orphaned; /* h2_stream is gone for this io */
int task_done;
struct h2_task *task; /* task created for this io */
static void io_destroy(h2_mplx *m, h2_io *io)
{
- if (io) {
- apr_pool_t *pool = io->pool;
- if (pool) {
- io->pool = NULL;
- apr_pool_clear(pool);
- if (m->spare_pool) {
- apr_pool_destroy(m->spare_pool);
- }
- m->spare_pool = pool;
+ apr_pool_t *pool = io->pool;
+ if (pool) {
+ io->pool = NULL;
+ apr_pool_clear(pool);
+ if (m->spare_pool) {
+ apr_pool_destroy(m->spare_pool);
}
- /* The pool is cleared/destroyed which also closes all
- * allocated file handles. Give this count back to our
- * file handle pool. */
- m->file_handles_allowed += io->files_handles_owned;
- h2_io_set_remove(m->stream_ios, io);
- h2_io_set_remove(m->ready_ios, io);
- h2_io_destroy(io);
+ m->spare_pool = pool;
}
+ /* The pool is cleared/destroyed which also closes all
+ * allocated file handles. Give this count back to our
+ * file handle pool. */
+ m->file_handles_allowed += io->files_handles_owned;
+ h2_io_set_remove(m->stream_ios, io);
+ h2_io_set_remove(m->ready_ios, io);
+ h2_io_destroy(io);
}
apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error)
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
-
+
+ /* there should be an h2_io, once the stream has been scheduled
+ * for processing, e.g. when we received all HEADERs. But when
+ * a stream is cancelled very early, it will not exist. */
if (io) {
/* Remove io from ready set, we will never submit it */
h2_io_set_remove(m->ready_ios, io);
}
else {
/* cleanup once task is done */
- io->zombie = 1;
+ io->orphaned = 1;
if (rst_error) {
- /* Forward error code to fail any further attempt to
- * write to io */
h2_io_rst(io, rst_error);
}
}
"h2_mplx(%ld): task(%d) done", m->id, stream_id);
if (io) {
io->task_done = 1;
- if (io->zombie) {
+ if (io->orphaned) {
io_destroy(m, io);
}
else {
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io) {
+ if (io && !io->orphaned) {
io->input_arrived = iowait;
status = h2_io_in_read(io, bb, 0);
while (APR_STATUS_IS_EAGAIN(status)
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io) {
+ if (io && !io->orphaned) {
status = h2_io_in_write(io, bb);
if (io->input_arrived) {
apr_thread_cond_signal(io->input_arrived);
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io) {
+ if (io && !io->orphaned) {
status = h2_io_in_close(io);
if (io->input_arrived) {
apr_thread_cond_signal(io->input_arrived);
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io) {
+ if (io && !io->orphaned) {
H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_readx_pre");
status = h2_io_out_readx(io, cb, ctx, plen, peos);
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io) {
+ if (io && !io->orphaned) {
H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_read_to_pre");
status = h2_io_out_read_to(io, bb, plen, peos);
"h2_mplx(%ld): stream for response %d closed, "
"resetting io to close request processing",
m->id, io->id);
- h2_io_rst(io, H2_ERR_STREAM_CLOSED);
+ io->orphaned = 1;
+ if (io->task_done) {
+ io_destroy(m, io);
+ }
+ else {
+ /* hang around until the h2_task is done */
+ h2_io_rst(io, H2_ERR_STREAM_CLOSED);
+ }
}
if (io->output_drained) {
apr_status_t status = APR_SUCCESS;
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io) {
+ if (io && !io->orphaned) {
if (f) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
"h2_mplx(%ld-%d): open response: %d, rst=%d",
if (APR_SUCCESS == status) {
if (!m->aborted) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io) {
+ if (io && !io->orphaned) {
status = out_write(m, io, f, bb, iowait);
H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_write");
if (APR_SUCCESS == status) {
if (!m->aborted) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io) {
+ if (io && !io->orphaned) {
if (!io->response && !io->rst_error) {
/* In case a close comes before a response was created,
* insert an error one so that our streams can properly
if (APR_SUCCESS == status) {
if (!m->aborted) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io && !io->rst_error) {
+ if (io && !io->rst_error && !io->orphaned) {
h2_io_rst(io, error);
if (!io->response) {
h2_io_set_add(m->ready_ios, io);
if (APR_SUCCESS == status) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
if (io) {
- has_eos = h2_io_in_has_eos_for(io);
+ has_eos = io->orphaned || h2_io_in_has_eos_for(io);
}
apr_thread_mutex_unlock(m->lock);
}
}
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
- conn_rec *c;
h2_io *io;
cmp_ctx x;
io = open_io(m, stream_id);
- c = h2_conn_create(m->c, io->pool);
- io->task = h2_task_create(m->id, req, io->pool, m, c, eos);
+ io->task = h2_task_create(m->id, req, io->pool, m, eos);
if (eos) {
status = h2_io_in_close(io);
if (APR_SUCCESS == status) {
task = h2_tq_shift(m->q);
*has_more = !h2_tq_empty(m->q);
+ if (task) {
+ /* Anything not already setup correctly in the task
+ * needs to be so now, as task will be executed right about
+ * when this method returns. */
+
+ }
apr_thread_mutex_unlock(m->lock);
}
return task;
return APR_SUCCESS;
}
+static apr_status_t session_pool_cleanup(void *data)
+{
+ h2_session *session = data;
+
+ /* keep us from destroying the pool, since that is already ongoing. */
+ session->pool = NULL;
+ h2_session_destroy(session);
+ return APR_SUCCESS;
+}
+
static h2_session *h2_session_create_int(conn_rec *c,
request_rec *r,
h2_config *config,
session->c = c;
session->r = r;
+ apr_pool_pre_cleanup_register(pool, session, session_pool_cleanup);
+
session->max_stream_count = h2_config_geti(config, H2_CONF_MAX_STREAMS);
session->max_stream_mem = h2_config_geti(config, H2_CONF_STREAM_MAX_MEM);
return h2_session_create_int(r->connection, r, config, workers);
}
-void h2_session_destroy(h2_session *session)
+void h2_session_cleanup(h2_session *session)
{
AP_DEBUG_ASSERT(session);
if (session->mplx) {
h2_mplx_release_and_join(session->mplx, session->iowait);
session->mplx = NULL;
}
+ if (session->ngh2) {
+ nghttp2_session_del(session->ngh2);
+ session->ngh2 = NULL;
+ }
+ if (session->spare) {
+ apr_pool_destroy(session->spare);
+ session->spare = NULL;
+ }
+}
+
+void h2_session_destroy(h2_session *session)
+{
+ AP_DEBUG_ASSERT(session);
+ h2_session_cleanup(session);
+
if (session->streams) {
if (!h2_stream_set_is_empty(session->streams)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
h2_stream_set_destroy(session->streams);
session->streams = NULL;
}
- if (session->ngh2) {
- nghttp2_session_del(session->ngh2);
- session->ngh2 = NULL;
- }
- if (session->spare) {
- apr_pool_destroy(session->spare);
- session->spare = NULL;
- }
if (session->pool) {
+ apr_pool_cleanup_kill(session->pool, session, session_pool_cleanup);
apr_pool_destroy(session->pool);
}
}
-void h2_session_cleanup(h2_session *session)
+
+void h2_session_eoc_callback(h2_session *session)
{
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
"session(%ld): cleanup and destroy", session->id);
}
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0,session->c,
"h2_session: closing, writing eoc");
+
+ h2_session_cleanup(session);
h2_conn_io_writeb(&session->io,
h2_bucket_eoc_create(session->c->bucket_alloc,
session));
*/
void h2_session_destroy(h2_session *session);
+/**
+ * Cleanup session data while winding down. No new streams
+ * may be created afterwards, but existing stream can still be
+ * looked up.
+ * Called automatically on destroy.
+ */
+void h2_session_cleanup(h2_session *session);
+
/**
* Cleanup the session and all objects it still contains. This will not
* destroy h2_task instances that have not finished yet.
* @param session the session to destroy
*/
-void h2_session_cleanup(h2_session *session);
+void h2_session_eoc_callback(h2_session *session);
/**
* Called once at start of session.
h2_task *h2_task_create(long session_id, const h2_request *req,
- apr_pool_t *pool, h2_mplx *mplx,
- conn_rec *c, int eos)
+ apr_pool_t *pool, h2_mplx *mplx, int eos)
{
h2_task *task = apr_pcalloc(pool, sizeof(h2_task));
if (task == NULL) {
task->id = apr_psprintf(pool, "%ld-%d", session_id, req->id);
task->stream_id = req->id;
task->mplx = mplx;
- task->c = c;
+ task->c = h2_conn_create(mplx->c, pool);
task->request = req;
task->input_eos = eos;
task->input = h2_task_input_create(task, task->pool,
task->c->bucket_alloc);
task->output = h2_task_output_create(task, task->pool);
+
ap_process_connection(task->c, h2_worker_get_socket(worker));
+
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
"h2_task(%s): processing done", task->id);
}
}
h2_worker_release_task(worker, task);
-
h2_mplx_task_done(task->mplx, task->stream_id);
return status;
};
h2_task *h2_task_create(long session_id, const struct h2_request *req,
- apr_pool_t *pool, struct h2_mplx *mplx,
- conn_rec *c, int eos);
+ apr_pool_t *pool, struct h2_mplx *mplx, int eos);
apr_status_t h2_task_destroy(h2_task *task);