if (apr_bucket_shared_destroy(h)) {
h2_session *session = h->session;
+ apr_bucket_free(h);
if (session) {
h2_session_eoc_callback(session);
+ /* all is gone now */
}
- apr_bucket_free(h);
}
}
H2_INITIAL_WINDOW_SIZE, /* window_size */
-1, /* min workers */
-1, /* max workers */
- 10 * 60, /* max workers idle secs */
+ 10, /* max workers idle secs */
64 * 1024, /* stream max mem size */
NULL, /* no alt-svcs */
-1, /* alt-svc max age */
ap_log_cerror( APLOG_MARK, APLOG_DEBUG, status, session->c,
"h2_session(%ld): done", session->id);
- h2_session_close(session);
- h2_session_flush(session);
- /* hereafter session might be gone */
-
/* Make sure this connection gets closed properly. */
ap_update_child_status_from_conn(c->sbh, SERVER_CLOSING, c);
c->keepalive = AP_CONN_CLOSE;
c->cs->state = CONN_STATE_WRITE_COMPLETION;
}
+ h2_session_close(session);
+ /* hereafter session will be gone */
return status;
}
#include <http_connection.h>
#include "h2_private.h"
+#include "h2_bucket_eoc.h"
#include "h2_config.h"
#include "h2_conn_io.h"
#include "h2_h2.h"
#define WRITE_BUFFER_SIZE (8*WRITE_SIZE_MAX)
-apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c)
+apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c, apr_pool_t *pool)
{
h2_config *cfg = h2_config_get(c);
io->connection = c;
- io->input = apr_brigade_create(c->pool, c->bucket_alloc);
- io->output = apr_brigade_create(c->pool, c->bucket_alloc);
+ io->input = apr_brigade_create(pool, c->bucket_alloc);
+ io->output = apr_brigade_create(pool, c->bucket_alloc);
io->buflen = 0;
io->is_tls = h2_h2_is_tls(c);
io->buffer_output = io->is_tls;
if (io->buffer_output) {
io->bufsize = WRITE_BUFFER_SIZE;
- io->buffer = apr_pcalloc(c->pool, io->bufsize);
+ io->buffer = apr_pcalloc(pool, io->bufsize);
}
else {
io->bufsize = 0;
&bucket_length, block);
if (status == APR_SUCCESS && bucket_length > 0) {
+ apr_size_t consumed = 0;
+
if (APLOGctrace2(io->connection)) {
char buffer[32];
h2_util_hex_dump(buffer, sizeof(buffer)/sizeof(buffer[0]),
io->connection->id, (int)bucket_length, buffer);
}
- if (bucket_length > 0) {
- apr_size_t consumed = 0;
- status = on_read_cb(bucket_data, bucket_length,
- &consumed, pdone, puser);
- if (status == APR_SUCCESS && bucket_length > consumed) {
- /* We have data left in the bucket. Split it. */
- status = apr_bucket_split(bucket, consumed);
- }
- readlen += consumed;
+ status = on_read_cb(bucket_data, bucket_length, &consumed,
+ pdone, puser);
+ if (status == APR_SUCCESS && bucket_length > consumed) {
+ /* We have data left in the bucket. Split it. */
+ status = apr_bucket_split(bucket, consumed);
}
+ readlen += consumed;
}
}
apr_bucket_delete(bucket);
}
+
if (readlen == 0 && status == APR_SUCCESS && block == APR_NONBLOCK_READ) {
return APR_EAGAIN;
}
/* Seems something is left from a previous read, lets
* satisfy our caller with the data we already have. */
status = h2_conn_io_bucket_read(io, block, on_read_cb, puser, &done);
+ apr_brigade_cleanup(io->input);
if (status != APR_SUCCESS || done) {
return status;
}
- apr_brigade_cleanup(io->input);
}
/* We only do a blocking read when we have no streams to process. So,
ap_update_child_status(io->connection->sbh, SERVER_BUSY_READ, NULL);
}
+ /* TODO: replace this with a connection filter itself, so that we
+ * no longer need to transfer incoming buckets to our own brigade.
+ */
status = ap_get_brigade(io->connection->input_filters,
io->input, AP_MODE_READBYTES,
block, 64 * 4096);
apr_status_t h2_conn_io_pass(h2_conn_io *io)
{
return h2_conn_io_flush_int(io, 0);
+}
+
+apr_status_t h2_conn_io_close(h2_conn_io *io, void *session)
+{
+ apr_bucket *b;
+
+ /* Send out anything in our buffers */
+ h2_conn_io_flush_int(io, 0);
+
+ b = h2_bucket_eoc_create(io->connection->bucket_alloc, session);
+ APR_BRIGADE_INSERT_TAIL(io->output, b);
+ b = apr_bucket_flush_create(io->connection->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(io->output, b);
+ return ap_pass_brigade(io->connection->output_filters, io->output);
+ /* and all is gone */
}
\ No newline at end of file
int unflushed;
} h2_conn_io;
-apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c);
+apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c, apr_pool_t *pool);
int h2_conn_io_is_buffered(h2_conn_io *io);
apr_status_t h2_conn_io_pass(h2_conn_io *io);
apr_status_t h2_conn_io_flush(h2_conn_io *io);
+apr_status_t h2_conn_io_close(h2_conn_io *io, void *session);
#endif /* defined(__mod_h2__h2_conn_io__) */
return NULL;
}
-void h2_io_set_destroy_all(h2_io_set *sp)
-{
- int i;
- for (i = 0; i < sp->list->nelts; ++i) {
- h2_io *io = h2_io_IDX(sp->list, i);
- h2_io_destroy(io);
- }
- sp->list->nelts = 0;
-}
-
-void h2_io_set_remove_all(h2_io_set *sp)
-{
- sp->list->nelts = 0;
-}
-
int h2_io_set_is_empty(h2_io_set *sp)
{
AP_DEBUG_ASSERT(sp);
return sp->list->nelts == 0;
}
-void h2_io_set_iter(h2_io_set *sp,
+int h2_io_set_iter(h2_io_set *sp,
h2_io_set_iter_fn *iter, void *ctx)
{
int i;
for (i = 0; i < sp->list->nelts; ++i) {
h2_io *s = h2_io_IDX(sp->list, i);
if (!iter(ctx, s)) {
- break;
+ return 0;
}
}
+ return 1;
}
apr_size_t h2_io_set_size(h2_io_set *sp)
h2_io *h2_io_set_get(h2_io_set *set, int stream_id);
h2_io *h2_io_set_remove(h2_io_set *set, struct h2_io *io);
-void h2_io_set_remove_all(h2_io_set *set);
-void h2_io_set_destroy_all(h2_io_set *set);
int h2_io_set_is_empty(h2_io_set *set);
apr_size_t h2_io_set_size(h2_io_set *set);
typedef int h2_io_set_iter_fn(void *ctx, struct h2_io *io);
-void h2_io_set_iter(h2_io_set *set,
- h2_io_set_iter_fn *iter, void *ctx);
+/**
+ * Iterator over all h2_io* in the set or until a
+ * callback returns 0. It is not safe to add or remove
+ * set members during iteration.
+ *
+ * @param set the set of h2_io to iterate over
+ * @param iter the function to call for each io
+ * @param ctx user data for the callback
+ * @return 1 iff iteration completed for all members
+ */
+int h2_io_set_iter(h2_io_set *set,
+ h2_io_set_iter_fn *iter, void *ctx);
h2_io *h2_io_set_pop_highest_prio(h2_io_set *set);
static void h2_mplx_destroy(h2_mplx *m)
{
AP_DEBUG_ASSERT(m);
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
+ "h2_mplx(%ld): destroy, refs=%d",
+ m->id, m->refs);
m->aborted = 1;
if (m->ready_ios) {
h2_io_set_destroy(m->ready_ios);
m->stream_ios = NULL;
}
- if (m->lock) {
- apr_thread_mutex_destroy(m->lock);
- m->lock = NULL;
- }
-
- if (m->spare_pool) {
- apr_pool_destroy(m->spare_pool);
- m->spare_pool = NULL;
- }
if (m->pool) {
apr_pool_destroy(m->pool);
}
h2_workers_unregister(m->workers, m);
}
+static void io_destroy(h2_mplx *m, h2_io *io)
+{
+ apr_pool_t *pool = io->pool;
+
+ io->pool = NULL;
+ /* The pool is cleared/destroyed which also closes all
+ * allocated file handles. Give this count back to our
+ * file handle pool. */
+ m->file_handles_allowed += io->files_handles_owned;
+ h2_io_set_remove(m->stream_ios, io);
+ h2_io_set_remove(m->ready_ios, io);
+ h2_io_destroy(io);
+
+ if (pool) {
+ apr_pool_clear(pool);
+ if (m->spare_pool) {
+ apr_pool_destroy(m->spare_pool);
+ }
+ m->spare_pool = pool;
+ }
+}
+
+static int io_stream_done(h2_mplx *m, h2_io *io, int rst_error)
+{
+ /* Remove io from ready set, we will never submit it */
+ h2_io_set_remove(m->ready_ios, io);
+ if (io->task_done || h2_tq_remove(m->q, io->id)) {
+ /* already finished or not even started yet */
+ io_destroy(m, io);
+ return 0;
+ }
+ else {
+ /* cleanup once task is done */
+ io->orphaned = 1;
+ if (rst_error) {
+ h2_io_rst(io, rst_error);
+ }
+ return 1;
+ }
+}
+
+static int stream_done_iter(void *ctx, h2_io *io) {
+ return io_stream_done((h2_mplx*)ctx, io, 0);
+}
+
apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
{
apr_status_t status;
+
workers_unregister(m);
-
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
+ while (!h2_io_set_iter(m->stream_ios, stream_done_iter, m)) {
+ /* iterator until all h2_io have been orphaned or destroyed */
+ }
+
release(m, 0);
while (m->refs > 0) {
m->join_wait = wait;
apr_thread_cond_wait(wait, m->lock);
}
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
- "h2_mplx(%ld): release_join -> destroy", m->id);
- m->pool = NULL;
- apr_thread_mutex_unlock(m->lock);
+ "h2_mplx(%ld): release_join -> destroy, (#ios=%ld)",
+ m->id, (long)h2_io_set_size(m->stream_ios));
h2_mplx_destroy(m);
+ /* all gone */
+ /*apr_thread_mutex_unlock(m->lock);*/
}
return status;
}
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
m->aborted = 1;
- h2_io_set_destroy_all(m->stream_ios);
apr_thread_mutex_unlock(m->lock);
}
- workers_unregister(m);
-}
-
-
-static void io_destroy(h2_mplx *m, h2_io *io)
-{
- apr_pool_t *pool = io->pool;
-
- io->pool = NULL;
- /* The pool is cleared/destroyed which also closes all
- * allocated file handles. Give this count back to our
- * file handle pool. */
- m->file_handles_allowed += io->files_handles_owned;
- h2_io_set_remove(m->stream_ios, io);
- h2_io_set_remove(m->ready_ios, io);
- h2_io_destroy(io);
-
- if (pool) {
- apr_pool_clear(pool);
- if (m->spare_pool) {
- apr_pool_destroy(m->spare_pool);
- }
- m->spare_pool = pool;
- }
}
apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error)
apr_status_t status;
AP_DEBUG_ASSERT(m);
- if (m->aborted) {
- return APR_ECONNABORTED;
- }
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
* for processing, e.g. when we received all HEADERs. But when
* a stream is cancelled very early, it will not exist. */
if (io) {
- /* Remove io from ready set, we will never submit it */
- h2_io_set_remove(m->ready_ios, io);
- if (io->task_done || h2_tq_remove(m->q, io->id)) {
- /* already finished or not even started yet */
- io_destroy(m, io);
- }
- else {
- /* cleanup once task is done */
- io->orphaned = 1;
- if (rst_error) {
- h2_io_rst(io, rst_error);
- }
- }
-
+ io_stream_done(m, io, rst_error);
}
apr_thread_mutex_unlock(m->lock);
#include <http_log.h>
#include "h2_private.h"
-#include "h2_bucket_eoc.h"
#include "h2_bucket_eos.h"
#include "h2_config.h"
#include "h2_h2.h"
return stream;
}
-apr_status_t h2_session_flush(h2_session *session)
-{
- return h2_conn_io_flush(&session->io);
-}
-
/**
* Determine the importance of streams when scheduling tasks.
* - if both stream depend on the same one, compare weights
session->c = c;
session->r = r;
+ session->pool = pool;
apr_pool_pre_cleanup_register(pool, session, session_pool_cleanup);
session->max_stream_count = h2_config_geti(config, H2_CONF_MAX_STREAMS);
session->max_stream_mem = h2_config_geti(config, H2_CONF_STREAM_MAX_MEM);
- session->pool = pool;
-
status = apr_thread_cond_create(&session->iowait, session->pool);
if (status != APR_SUCCESS) {
return NULL;
session->workers = workers;
session->mplx = h2_mplx_create(c, session->pool, workers);
- h2_conn_io_init(&session->io, c);
+ h2_conn_io_init(&session->io, c, session->pool);
session->bbtmp = apr_brigade_create(session->pool, c->bucket_alloc);
status = init_callbacks(c, &callbacks);
apr_pool_destroy(session->spare);
session->spare = NULL;
}
- if (session->mplx) {
- h2_mplx_release_and_join(session->mplx, session->iowait);
- session->mplx = NULL;
- }
}
void h2_session_destroy(h2_session *session)
AP_DEBUG_ASSERT(session);
h2_session_cleanup(session);
+ if (session->mplx) {
+ h2_mplx_release_and_join(session->mplx, session->iowait);
+ session->mplx = NULL;
+ }
if (session->streams) {
if (!h2_stream_set_is_empty(session->streams)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0,session->c,
"h2_session: closing, writing eoc");
- h2_session_cleanup(session);
- return h2_conn_io_writeb(&session->io,
- h2_bucket_eoc_create(session->c->bucket_alloc,
- session));
+ h2_session_cleanup(session);
+ return h2_conn_io_close(&session->io, session);
}
static ssize_t stream_data_cb(nghttp2_session *ng2s,
*/
apr_status_t h2_session_abort(h2_session *session, apr_status_t reason, int rv);
-/**
- * Pass any buffered output data through the connection filters.
- * @param session the session to flush
- */
-apr_status_t h2_session_flush(h2_session *session);
-
/**
* Called before a session gets destroyed, might flush output etc.
*/
apr_thread_cond_signal(task->io);
}
- h2_mplx_task_done(task->mplx, task->stream_id);
h2_worker_release_task(worker, task);
+ h2_mplx_task_done(task->mplx, task->stream_id);
return status;
}
* @macro
* Version number of the h2 module as c string
*/
-#define MOD_HTTP2_VERSION "1.0.5-DEV"
+#define MOD_HTTP2_VERSION "1.0.6-DEV"
/**
* @macro
* release. This is a 24 bit number with 8 bits for major number, 8 bits
* for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
*/
-#define MOD_HTTP2_VERSION_NUM 0x010005
+#define MOD_HTTP2_VERSION_NUM 0x010006
#endif /* mod_h2_h2_version_h */
apr_allocator_t *allocator = NULL;
apr_pool_t *pool = NULL;
h2_worker *w;
+ apr_status_t status;
- apr_status_t status = apr_allocator_create(&allocator);
+ status = apr_allocator_create(&allocator);
if (status != APR_SUCCESS) {
return NULL;
}
apr_pool_pre_cleanup_register(w->pool, w, cleanup_join_thread);
apr_thread_create(&w->thread, attr, execute, w, w->pool);
- apr_pool_create(&w->task_pool, w->pool);
}
return w;
}
/* Create a subpool from the worker one to be used for all things
* with life-time of this task execution.
*/
+ if (!worker->task_pool) {
+ apr_pool_create(&worker->task_pool, worker->pool);
+ }
task = h2_task_create(m->id, req, worker->task_pool, m, eos);
+
/* Link the task to the worker which provides useful things such
* as mutex, a socket etc. */
task->io = worker->io;