#include <assert.h>
#include <stddef.h>
+#include <stdlib.h>
-#include <apr_atomic.h>
#include <apr_thread_mutex.h>
#include <apr_thread_cond.h>
#include <apr_strings.h>
#include <http_core.h>
#include <http_log.h>
+#include "mod_http2.h"
+
#include "h2_private.h"
+#include "h2_bucket_beam.h"
#include "h2_config.h"
#include "h2_conn.h"
-#include "h2_io.h"
-#include "h2_io_set.h"
+#include "h2_ctx.h"
+#include "h2_h2.h"
#include "h2_response.h"
#include "h2_mplx.h"
+#include "h2_ngn_shed.h"
#include "h2_request.h"
#include "h2_stream.h"
-#include "h2_stream_set.h"
#include "h2_task.h"
-#include "h2_task_input.h"
-#include "h2_task_output.h"
-#include "h2_task_queue.h"
+#include "h2_worker.h"
#include "h2_workers.h"
+#include "h2_util.h"
+
+
+static void h2_beam_log(h2_bucket_beam *beam, int id, const char *msg,
+ conn_rec *c, int level)
+{
+ if (beam && APLOG_C_IS_LEVEL(c,level)) {
+ char buffer[2048];
+ apr_size_t off = 0;
+
+ off += apr_snprintf(buffer+off, H2_ALEN(buffer)-off, "cl=%d, ", beam->closed);
+ off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "red", ", ", &beam->red);
+ off += h2_util_bb_print(buffer+off, H2_ALEN(buffer)-off, "green", ", ", beam->green);
+ off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "hold", ", ", &beam->hold);
+ off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "purge", "", &beam->purge);
+
+ ap_log_cerror(APLOG_MARK, level, 0, c, "beam(%ld-%d): %s %s",
+ c->id, id, msg, buffer);
+ }
+}
+/* utility for iterating over ihash task sets */
+typedef struct {
+ h2_mplx *m;
+ h2_task *task;
+ apr_time_t now;
+} task_iter_ctx;
+
+/* NULL or the mutex hold by this thread, used for recursive calls
+ */
+static apr_threadkey_t *thread_lock;
-static int is_aborted(h2_mplx *m, apr_status_t *pstatus) {
+apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s)
+{
+ return apr_threadkey_private_create(&thread_lock, NULL, pool);
+}
+
+static apr_status_t enter_mutex(h2_mplx *m, int *pacquired)
+{
+ apr_status_t status;
+ void *mutex = NULL;
+
+ /* Enter the mutex if this thread already holds the lock or
+ * if we can acquire it. Only on the later case do we unlock
+ * onleaving the mutex.
+ * This allow recursive entering of the mutex from the saem thread,
+ * which is what we need in certain situations involving callbacks
+ */
AP_DEBUG_ASSERT(m);
- if (m->aborted) {
- *pstatus = APR_ECONNABORTED;
+ apr_threadkey_private_get(&mutex, thread_lock);
+ if (mutex == m->lock) {
+ *pacquired = 0;
+ return APR_SUCCESS;
+ }
+
+ AP_DEBUG_ASSERT(m->lock);
+ status = apr_thread_mutex_lock(m->lock);
+ *pacquired = (status == APR_SUCCESS);
+ if (*pacquired) {
+ apr_threadkey_private_set(m->lock, thread_lock);
+ }
+ return status;
+}
+
+static void leave_mutex(h2_mplx *m, int acquired)
+{
+ if (acquired) {
+ apr_threadkey_private_set(NULL, thread_lock);
+ apr_thread_mutex_unlock(m->lock);
+ }
+}
+
+static void beam_leave(void *ctx, apr_thread_mutex_t *lock)
+{
+ leave_mutex(ctx, 1);
+}
+
+static apr_status_t beam_enter(void *ctx, h2_beam_lock *pbl)
+{
+ h2_mplx *m = ctx;
+ int acquired;
+ apr_status_t status;
+
+ status = enter_mutex(m, &acquired);
+ if (status == APR_SUCCESS) {
+ pbl->mutex = m->lock;
+ pbl->leave = acquired? beam_leave : NULL;
+ pbl->leave_ctx = m;
+ }
+ return status;
+}
+
+static void stream_output_consumed(void *ctx,
+ h2_bucket_beam *beam, apr_off_t length)
+{
+ h2_task *task = ctx;
+ if (length > 0 && task && task->assigned) {
+ h2_req_engine_out_consumed(task->assigned, task->c, length);
+ }
+}
+
+static void stream_input_consumed(void *ctx,
+ h2_bucket_beam *beam, apr_off_t length)
+{
+ h2_mplx *m = ctx;
+ if (m->input_consumed && length) {
+ m->input_consumed(m->input_consumed_ctx, beam->id, length);
+ }
+}
+
+static int can_beam_file(void *ctx, h2_bucket_beam *beam, apr_file_t *file)
+{
+ h2_mplx *m = ctx;
+ if (m->tx_handles_reserved > 0) {
+ --m->tx_handles_reserved;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
+ "h2_mplx(%ld-%d): beaming file %s, tx_avail %d",
+ m->id, beam->id, beam->tag, m->tx_handles_reserved);
return 1;
}
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
+ "h2_mplx(%ld-%d): can_beam_file denied on %s",
+ m->id, beam->id, beam->tag);
return 0;
}
-static void have_out_data_for(h2_mplx *m, int stream_id);
+static void have_out_data_for(h2_mplx *m, h2_stream *stream, int response);
+static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master);
-static void h2_mplx_destroy(h2_mplx *m)
+static void check_tx_reservation(h2_mplx *m)
{
- AP_DEBUG_ASSERT(m);
- m->aborted = 1;
- if (m->q) {
- h2_tq_destroy(m->q);
- m->q = NULL;
+ if (m->tx_handles_reserved <= 0) {
+ m->tx_handles_reserved += h2_workers_tx_reserve(m->workers,
+ H2MIN(m->tx_chunk_size, h2_ihash_count(m->tasks)));
}
- if (m->ready_ios) {
- h2_io_set_destroy(m->ready_ios);
- m->ready_ios = NULL;
+}
+
+static void check_tx_free(h2_mplx *m)
+{
+ if (m->tx_handles_reserved > m->tx_chunk_size) {
+ apr_size_t count = m->tx_handles_reserved - m->tx_chunk_size;
+ m->tx_handles_reserved = m->tx_chunk_size;
+ h2_workers_tx_free(m->workers, count);
}
- if (m->stream_ios) {
- h2_io_set_destroy(m->stream_ios);
- m->stream_ios = NULL;
+ else if (m->tx_handles_reserved && h2_ihash_empty(m->tasks)) {
+ h2_workers_tx_free(m->workers, m->tx_handles_reserved);
+ m->tx_handles_reserved = 0;
}
+}
+
+static int purge_stream(void *ctx, void *val)
+{
+ h2_mplx *m = ctx;
+ h2_stream *stream = val;
+ int stream_id = stream->id;
+ h2_task *task;
+
+ /* stream_cleanup clears all buffers and destroys any buckets
+ * that might hold references into task space. Needs to be done
+ * before task destruction, otherwise it will complain. */
+ h2_stream_cleanup(stream);
- if (m->lock) {
- apr_thread_mutex_destroy(m->lock);
- m->lock = NULL;
+ task = h2_ihash_get(m->tasks, stream_id);
+ if (task) {
+ task_destroy(m, task, 1);
}
+ h2_stream_destroy(stream);
+ h2_ihash_remove(m->spurge, stream_id);
+ return 0;
+}
+
+static void purge_streams(h2_mplx *m)
+{
+ if (!h2_ihash_empty(m->spurge)) {
+ while(!h2_ihash_iter(m->spurge, purge_stream, m)) {
+ /* repeat until empty */
+ }
+ h2_ihash_clear(m->spurge);
+ AP_DEBUG_ASSERT(h2_ihash_empty(m->spurge));
+ }
+}
+
+static void h2_mplx_destroy(h2_mplx *m)
+{
+ AP_DEBUG_ASSERT(m);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): destroy, tasks=%d",
+ m->id, (int)h2_ihash_count(m->tasks));
+ check_tx_free(m);
if (m->pool) {
apr_pool_destroy(m->pool);
}
* their HTTP/1 cousins, the separate allocator seems to work better
* than protecting a shared h2_session one with an own lock.
*/
-h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, h2_workers *workers)
+h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
+ const h2_config *conf,
+ apr_interval_time_t stream_timeout,
+ h2_workers *workers)
{
apr_status_t status = APR_SUCCESS;
- h2_config *conf = h2_config_get(c);
apr_allocator_t *allocator = NULL;
h2_mplx *m;
AP_DEBUG_ASSERT(conf);
if (m) {
m->id = c->id;
APR_RING_ELEM_INIT(m, link);
- apr_atomic_set32(&m->refs, 1);
m->c = c;
apr_pool_create_ex(&m->pool, parent, NULL, allocator);
if (!m->pool) {
return NULL;
}
+ apr_pool_tag(m->pool, "h2_mplx");
apr_allocator_owner_set(allocator, m->pool);
status = apr_thread_mutex_create(&m->lock, APR_THREAD_MUTEX_DEFAULT,
return NULL;
}
+ status = apr_thread_cond_create(&m->task_thawed, m->pool);
+ if (status != APR_SUCCESS) {
+ h2_mplx_destroy(m);
+ return NULL;
+ }
+
m->bucket_alloc = apr_bucket_alloc_create(m->pool);
-
- m->q = h2_tq_create(m->id, m->pool);
- m->stream_ios = h2_io_set_create(m->pool);
- m->ready_ios = h2_io_set_create(m->pool);
- m->closed = h2_stream_set_create(m->pool);
+ m->max_streams = h2_config_geti(conf, H2_CONF_MAX_STREAMS);
m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
+
+ m->streams = h2_ihash_create(m->pool, offsetof(h2_stream,id));
+ m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id));
+ m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id));
+ m->q = h2_iq_create(m->pool, m->max_streams);
+ m->sready = h2_ihash_create(m->pool, offsetof(h2_stream,id));
+ m->sresume = h2_ihash_create(m->pool, offsetof(h2_stream,id));
+ m->tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id));
+
+ m->stream_timeout = stream_timeout;
m->workers = workers;
+ m->workers_max = workers->max_workers;
+ m->workers_def_limit = 4;
+ m->workers_limit = m->workers_def_limit;
+ m->last_limit_change = m->last_idle_block = apr_time_now();
+ m->limit_change_interval = apr_time_from_msec(200);
+
+ m->tx_handles_reserved = 0;
+ m->tx_chunk_size = 4;
- m->file_handles_allowed = h2_config_geti(conf, H2_CONF_SESSION_FILES);
+ m->spare_slaves = apr_array_make(m->pool, 10, sizeof(conn_rec*));
+
+ m->ngn_shed = h2_ngn_shed_create(m->pool, m->c, m->max_streams,
+ m->stream_max_mem);
+ h2_ngn_shed_set_ctx(m->ngn_shed , m);
}
return m;
}
-static void reference(h2_mplx *m)
+apr_uint32_t h2_mplx_shutdown(h2_mplx *m)
{
- apr_atomic_inc32(&m->refs);
+ int acquired, max_stream_started = 0;
+
+ if (enter_mutex(m, &acquired) == APR_SUCCESS) {
+ max_stream_started = m->max_stream_started;
+ /* Clear schedule queue, disabling existing streams from starting */
+ h2_iq_clear(m->q);
+ leave_mutex(m, acquired);
+ }
+ return max_stream_started;
}
-static void release(h2_mplx *m)
+static void input_consumed_signal(h2_mplx *m, h2_stream *stream)
{
- if (!apr_atomic_dec32(&m->refs)) {
- if (m->join_wait) {
- apr_thread_cond_signal(m->join_wait);
- }
+ if (stream->input && stream->started) {
+ h2_beam_send(stream->input, NULL, 0); /* trigger updates */
}
}
-void h2_mplx_reference(h2_mplx *m)
+static int output_consumed_signal(h2_mplx *m, h2_task *task)
{
- reference(m);
+ if (task->output.beam && task->worker_started && task->assigned) {
+ /* trigger updates */
+ h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
+ }
+ return 0;
}
-void h2_mplx_release(h2_mplx *m)
+
+
+static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master)
{
- release(m);
+ conn_rec *slave = NULL;
+ int reuse_slave = 0;
+ apr_status_t status;
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
+ "h2_task(%s): destroy", task->id);
+ if (called_from_master) {
+ /* Process outstanding events before destruction */
+ h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
+ if (stream) {
+ input_consumed_signal(m, stream);
+ }
+ }
+
+ /* The pool is cleared/destroyed which also closes all
+ * allocated file handles. Give this count back to our
+ * file handle pool. */
+ if (task->output.beam) {
+ m->tx_handles_reserved +=
+ h2_beam_get_files_beamed(task->output.beam);
+ h2_beam_on_produced(task->output.beam, NULL, NULL);
+ status = h2_beam_shutdown(task->output.beam, APR_NONBLOCK_READ, 1);
+ if (status != APR_SUCCESS){
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, m->c,
+ APLOGNO(03385) "h2_task(%s): output shutdown "
+ "incomplete, beam empty=%d, holds proxies=%d",
+ task->id,
+ h2_beam_empty(task->output.beam),
+ h2_beam_holds_proxies(task->output.beam));
+ }
+ }
+
+ slave = task->c;
+ reuse_slave = ((m->spare_slaves->nelts < m->spare_slaves->nalloc)
+ && !task->rst_error);
+
+ h2_ihash_remove(m->tasks, task->stream_id);
+ if (m->redo_tasks) {
+ h2_ihash_remove(m->redo_tasks, task->stream_id);
+ }
+ h2_task_destroy(task);
+
+ if (slave) {
+ if (reuse_slave && slave->keepalive == AP_CONN_KEEPALIVE) {
+ APR_ARRAY_PUSH(m->spare_slaves, conn_rec*) = slave;
+ }
+ else {
+ slave->sbh = NULL;
+ h2_slave_destroy(slave, NULL);
+ }
+ }
+
+ check_tx_free(m);
}
-static void workers_register(h2_mplx *m) {
- /* Initially, there was ref count increase for this as well, but
- * this is not needed, even harmful.
- * h2_workers is only a hub for all the h2_worker instances.
- * At the end-of-life of this h2_mplx, we always unregister at
- * the workers. The thing to manage are all the h2_worker instances
- * out there. Those may hold a reference to this h2_mplx and we cannot
- * call them to unregister.
+static void stream_done(h2_mplx *m, h2_stream *stream, int rst_error)
+{
+ h2_task *task;
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
+ "h2_stream(%ld-%d): done", m->c->id, stream->id);
+ /* Situation: we are, on the master connection, done with processing
+ * the stream. Either we have handled it successfully, or the stream
+ * was reset by the client or the connection is gone and we are
+ * shutting down the whole session.
+ *
+ * We possibly have created a task for this stream to be processed
+ * on a slave connection. The processing might actually be ongoing
+ * right now or has already finished. A finished task waits for its
+ * stream to be done. This is the common case.
+ *
+ * If the stream had input (e.g. the request had a body), a task
+ * may have read, or is still reading buckets from the input beam.
+ * This means that the task is referencing memory from the stream's
+ * pool (or the master connection bucket alloc). Before we can free
+ * the stream pool, we need to make sure that those references are
+ * gone. This is what h2_beam_shutdown() on the input waits for.
+ *
+ * With the input handled, we can tear down that beam and care
+ * about the output beam. The stream might still have buffered some
+ * buckets read from the output, so we need to get rid of those. That
+ * is done by h2_stream_cleanup().
+ *
+ * Now it is save to destroy the task (if it exists and is finished).
*
- * Therefore: ref counting for h2_workers in not needed, ref counting
- * for h2_worker using this is critical.
+ * FIXME: we currently destroy the stream, even if the task is still
+ * ongoing. This is not ok, since task->request is coming from stream
+ * memory. We should either copy it on task creation or wait with the
+ * stream destruction until the task is done.
*/
- h2_workers_register(m->workers, m);
+ h2_iq_remove(m->q, stream->id);
+ h2_ihash_remove(m->sready, stream->id);
+ h2_ihash_remove(m->sresume, stream->id);
+ h2_ihash_remove(m->streams, stream->id);
+ if (stream->input) {
+ m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input);
+ h2_beam_on_consumed(stream->input, NULL, NULL);
+ /* Let anyone blocked reading know that there is no more to come */
+ h2_beam_abort(stream->input);
+ /* Remove mutex after, so that abort still finds cond to signal */
+ h2_beam_mutex_set(stream->input, NULL, NULL, NULL);
+ }
+ h2_stream_cleanup(stream);
+
+ task = h2_ihash_get(m->tasks, stream->id);
+ if (task) {
+ if (!task->worker_done) {
+ /* task still running, cleanup once it is done */
+ if (rst_error) {
+ h2_task_rst(task, rst_error);
+ }
+ h2_ihash_add(m->shold, stream);
+ return;
+ }
+ else {
+ /* already finished */
+ task_destroy(m, task, 0);
+ }
+ }
+ h2_stream_destroy(stream);
}
-static void workers_unregister(h2_mplx *m) {
- h2_workers_unregister(m->workers, m);
+static int stream_done_iter(void *ctx, void *val)
+{
+ stream_done((h2_mplx*)ctx, val, 0);
+ return 0;
}
-apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
+typedef struct {
+ h2_mplx_stream_cb *cb;
+ void *ctx;
+} stream_iter_ctx_t;
+
+static int stream_iter_wrap(void *ctx, void *stream)
{
- apr_status_t status;
- workers_unregister(m);
+ stream_iter_ctx_t *x = ctx;
+ return x->cb(stream, x->ctx);
+}
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
- int attempts = 0;
+apr_status_t h2_mplx_stream_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx)
+{
+ apr_status_t status;
+ int acquired;
+
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+ stream_iter_ctx_t x;
+ x.cb = cb;
+ x.ctx = ctx;
+ h2_ihash_iter(m->streams, stream_iter_wrap, &x);
- release(m);
- while (apr_atomic_read32(&m->refs) > 0) {
- m->join_wait = wait;
- ap_log_cerror(APLOG_MARK, (attempts? APLOG_INFO : APLOG_DEBUG),
- 0, m->c,
- "h2_mplx(%ld): release_join, refs=%d, waiting...",
- m->id, m->refs);
- apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(10));
- if (++attempts >= 6) {
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
- APLOGNO(02952)
- "h2_mplx(%ld): join attempts exhausted, refs=%d",
- m->id, m->refs);
- break;
- }
- }
- if (m->join_wait) {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
- "h2_mplx(%ld): release_join -> destroy", m->id);
- }
- m->join_wait = NULL;
- apr_thread_mutex_unlock(m->lock);
- h2_mplx_destroy(m);
+ leave_mutex(m, acquired);
}
return status;
}
-void h2_mplx_abort(h2_mplx *m)
+static int task_print(void *ctx, void *val)
{
- apr_status_t status;
- AP_DEBUG_ASSERT(m);
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
- m->aborted = 1;
- h2_io_set_destroy_all(m->stream_ios);
- apr_thread_mutex_unlock(m->lock);
+ h2_mplx *m = ctx;
+ h2_task *task = val;
+
+ if (task && task->request) {
+ h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
+
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
+ "->03198: h2_stream(%s): %s %s %s -> %s %d"
+ "[orph=%d/started=%d/done=%d/frozen=%d]",
+ task->id, task->request->method,
+ task->request->authority, task->request->path,
+ task->response? "http" : (task->rst_error? "reset" : "?"),
+ task->response? task->response->http_status : task->rst_error,
+ (stream? 0 : 1), task->worker_started,
+ task->worker_done, task->frozen);
+ }
+ else if (task) {
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
+ "->03198: h2_stream(%ld-%d): NULL", m->id, task->stream_id);
}
- workers_unregister(m);
+ else {
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
+ "->03198: h2_stream(%ld-NULL): NULL", m->id);
+ }
+ return 1;
+}
+
+static int task_abort_connection(void *ctx, void *val)
+{
+ h2_task *task = val;
+ if (task->c) {
+ task->c->aborted = 1;
+ }
+ if (task->input.beam) {
+ h2_beam_abort(task->input.beam);
+ }
+ if (task->worker_started && !task->worker_done && task->output.beam) {
+ h2_beam_abort(task->output.beam);
+ }
+ return 1;
}
+static int report_stream_iter(void *ctx, void *val) {
+ h2_mplx *m = ctx;
+ h2_stream *stream = val;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld-%d): exists, started=%d, scheduled=%d, "
+ "submitted=%d, suspended=%d",
+ m->id, stream->id, stream->started, stream->scheduled,
+ stream->submitted, stream->suspended);
+ return 1;
+}
-h2_stream *h2_mplx_open_io(h2_mplx *m, int stream_id)
+apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
{
- h2_stream *stream = NULL;
- apr_status_t status;
- h2_io *io;
+ apr_status_t status;
+ int acquired;
- if (m->aborted) {
- return NULL;
- }
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
- apr_pool_t *stream_pool = m->spare_pool;
+ h2_workers_unregister(m->workers, m);
+
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+ int i, wait_secs = 5;
+
+ if (!h2_ihash_empty(m->streams) && APLOGctrace1(m->c)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): release_join with %d streams open, "
+ "%d streams resume, %d streams ready, %d tasks",
+ m->id, (int)h2_ihash_count(m->streams),
+ (int)h2_ihash_count(m->sresume),
+ (int)h2_ihash_count(m->sready),
+ (int)h2_ihash_count(m->tasks));
+ h2_ihash_iter(m->streams, report_stream_iter, m);
+ }
+
+ /* disable WINDOW_UPDATE callbacks */
+ h2_mplx_set_consumed_cb(m, NULL, NULL);
- if (!stream_pool) {
- apr_pool_create(&stream_pool, m->pool);
+ if (!h2_ihash_empty(m->shold)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%ld): start release_join with %d streams in hold",
+ m->id, (int)h2_ihash_count(m->shold));
}
- else {
- m->spare_pool = NULL;
+ if (!h2_ihash_empty(m->spurge)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%ld): start release_join with %d streams to purge",
+ m->id, (int)h2_ihash_count(m->spurge));
+ }
+
+ h2_iq_clear(m->q);
+ apr_thread_cond_broadcast(m->task_thawed);
+ while (!h2_ihash_iter(m->streams, stream_done_iter, m)) {
+ /* iterate until all streams have been removed */
+ }
+ AP_DEBUG_ASSERT(h2_ihash_empty(m->streams));
+
+ if (!h2_ihash_empty(m->shold)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%ld): 2. release_join with %d streams in "
+ "hold, %d workers busy, %d tasks",
+ m->id, (int)h2_ihash_count(m->shold),
+ m->workers_busy,
+ (int)h2_ihash_count(m->tasks));
+ }
+ if (!h2_ihash_empty(m->spurge)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%ld): 2. release_join with %d streams to purge",
+ m->id, (int)h2_ihash_count(m->spurge));
}
- stream = h2_stream_create(stream_id, stream_pool, m);
- stream->state = H2_STREAM_ST_OPEN;
+ /* If we still have busy workers, we cannot release our memory
+ * pool yet, as tasks have references to us.
+ * Any operation on the task slave connection will from now on
+ * be errored ECONNRESET/ABORTED, so processing them should fail
+ * and workers *should* return in a timely fashion.
+ */
+ for (i = 0; m->workers_busy > 0; ++i) {
+ h2_ihash_iter(m->tasks, task_abort_connection, m);
+
+ m->join_wait = wait;
+ status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs));
+
+ if (APR_STATUS_IS_TIMEUP(status)) {
+ if (i > 0) {
+ /* Oh, oh. Still we wait for assigned workers to report that
+ * they are done. Unless we have a bug, a worker seems to be hanging.
+ * If we exit now, all will be deallocated and the worker, once
+ * it does return, will walk all over freed memory...
+ */
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03198)
+ "h2_mplx(%ld): release, waiting for %d seconds now for "
+ "%d h2_workers to return, have still %d tasks outstanding",
+ m->id, i*wait_secs, m->workers_busy,
+ (int)h2_ihash_count(m->tasks));
+ if (i == 1) {
+ h2_ihash_iter(m->tasks, task_print, m);
+ }
+ }
+ h2_mplx_abort(m);
+ apr_thread_cond_broadcast(m->task_thawed);
+ }
+ }
- io = h2_io_set_get(m->stream_ios, stream_id);
- if (!io) {
- io = h2_io_create(stream_id, stream_pool, m->bucket_alloc);
- h2_io_set_add(m->stream_ios, io);
+ if (!h2_ihash_empty(m->tasks) && APLOGctrace1(m->c)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): 3. release_join with %d tasks",
+ m->id, (int)h2_ihash_count(m->tasks));
+ h2_ihash_iter(m->tasks, task_print, m);
}
- status = io? APR_SUCCESS : APR_ENOMEM;
- apr_thread_mutex_unlock(m->lock);
+ AP_DEBUG_ASSERT(h2_ihash_empty(m->shold));
+ if (!h2_ihash_empty(m->spurge)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%ld): 3. release_join %d streams to purge",
+ m->id, (int)h2_ihash_count(m->spurge));
+ purge_streams(m);
+ }
+
+ if (!h2_ihash_empty(m->tasks)) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
+ "h2_mplx(%ld): release_join -> destroy, "
+ "%d tasks still present",
+ m->id, (int)h2_ihash_count(m->tasks));
+ }
+ leave_mutex(m, acquired);
+ h2_mplx_destroy(m);
+ /* all gone */
}
- return stream;
+ return status;
}
-static void stream_destroy(h2_mplx *m, h2_stream *stream, h2_io *io)
+void h2_mplx_abort(h2_mplx *m)
{
- apr_pool_t *pool = h2_stream_detach_pool(stream);
- if (pool) {
- apr_pool_clear(pool);
- if (m->spare_pool) {
- apr_pool_destroy(m->spare_pool);
- }
- m->spare_pool = pool;
- }
- h2_stream_destroy(stream);
- if (io) {
- /* The pool is cleared/destroyed which also closes all
- * allocated file handles. Give this count back to our
- * file handle pool. */
- m->file_handles_allowed += io->files_handles_owned;
- h2_io_set_remove(m->stream_ios, io);
- h2_io_destroy(io);
+ int acquired;
+
+ AP_DEBUG_ASSERT(m);
+ if (!m->aborted && enter_mutex(m, &acquired) == APR_SUCCESS) {
+ m->aborted = 1;
+ h2_ngn_shed_abort(m->ngn_shed);
+ leave_mutex(m, acquired);
}
}
-apr_status_t h2_mplx_cleanup_stream(h2_mplx *m, h2_stream *stream)
+apr_status_t h2_mplx_stream_done(h2_mplx *m, h2_stream *stream)
{
- apr_status_t status;
+ apr_status_t status = APR_SUCCESS;
+ int acquired;
+
AP_DEBUG_ASSERT(m);
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream->id);
- if (!io || io->task_done) {
- /* No more io or task already done -> cleanup immediately */
- stream_destroy(m, stream, io);
- }
- else {
- /* Add stream to closed set for cleanup when task is done */
- h2_stream_set_add(m->closed, stream);
- }
- apr_thread_mutex_unlock(m->lock);
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%ld-%d): marking stream as done.",
+ m->id, stream->id);
+ stream_done(m, stream, stream->rst_error);
+ purge_streams(m);
+ leave_mutex(m, acquired);
}
return status;
}
-void h2_mplx_task_done(h2_mplx *m, int stream_id)
+void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx)
{
- apr_status_t status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
- h2_stream *stream = h2_stream_set_get(m->closed, stream_id);
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%ld): task(%d) done", m->id, stream_id);
+ m->input_consumed = cb;
+ m->input_consumed_ctx = ctx;
+}
+
+static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
+{
+ h2_mplx *m = ctx;
+ apr_status_t status;
+ h2_stream *stream;
+ int acquired;
+
+ AP_DEBUG_ASSERT(m);
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+ stream = h2_ihash_get(m->streams, beam->id);
if (stream) {
- /* stream was already closed by main connection and is in
- * zombie state. Now that the task is done with it, we
- * can free its resources. */
- h2_stream_set_remove(m->closed, stream);
- stream_destroy(m, stream, io);
+ have_out_data_for(m, stream, 0);
}
- else if (io) {
- /* main connection has not finished stream. Mark task as done
- * so that eventual cleanup can start immediately. */
- io->task_done = 1;
- }
- apr_thread_mutex_unlock(m->lock);
+ leave_mutex(m, acquired);
}
}
-apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block,
- int stream_id, apr_bucket_brigade *bb,
- struct apr_thread_cond_t *iowait)
+static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response)
{
- apr_status_t status;
- AP_DEBUG_ASSERT(m);
- if (m->aborted) {
+ apr_status_t status = APR_SUCCESS;
+ h2_task *task = h2_ihash_get(m->tasks, stream_id);
+ h2_stream *stream = h2_ihash_get(m->streams, stream_id);
+
+ if (!task || !stream) {
return APR_ECONNABORTED;
}
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io) {
- io->input_arrived = iowait;
- status = h2_io_in_read(io, bb, 0);
- while (status == APR_EAGAIN
- && !is_aborted(m, &status)
- && block == APR_BLOCK_READ) {
- apr_thread_cond_wait(io->input_arrived, m->lock);
- status = h2_io_in_read(io, bb, 0);
- }
- io->input_arrived = NULL;
- }
- else {
- status = APR_EOF;
+
+ status = h2_task_add_response(task, response);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
+ "h2_mplx(%s): add response: %d, rst=%d",
+ task->id, response->http_status, response->rst_error);
+ if (status != APR_SUCCESS) {
+ return status;
+ }
+
+ if (task->output.beam && !task->output.opened) {
+ h2_beam_buffer_size_set(task->output.beam, m->stream_max_mem);
+ h2_beam_timeout_set(task->output.beam, m->stream_timeout);
+ h2_beam_on_consumed(task->output.beam, stream_output_consumed, task);
+ h2_beam_on_produced(task->output.beam, output_produced, m);
+ m->tx_handles_reserved -= h2_beam_get_files_beamed(task->output.beam);
+ if (!task->output.copy_files) {
+ h2_beam_on_file_beam(task->output.beam, can_beam_file, m);
}
- apr_thread_mutex_unlock(m->lock);
+ h2_beam_mutex_set(task->output.beam, beam_enter, task->cond, m);
+ task->output.opened = 1;
+ }
+
+ if (response && response->http_status < 300) {
+ /* we might see some file buckets in the output, see
+ * if we have enough handles reserved. */
+ check_tx_reservation(m);
}
+ have_out_data_for(m, stream, 1);
return status;
}
-apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id,
- apr_bucket_brigade *bb)
+apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response)
{
apr_status_t status;
+ int acquired;
+
AP_DEBUG_ASSERT(m);
- if (m->aborted) {
- return APR_ECONNABORTED;
- }
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io) {
- status = h2_io_in_write(io, bb);
- if (io->input_arrived) {
- apr_thread_cond_signal(io->input_arrived);
- }
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+ if (m->aborted) {
+ status = APR_ECONNABORTED;
}
else {
- status = APR_EOF;
+ status = out_open(m, stream_id, response);
}
- apr_thread_mutex_unlock(m->lock);
+ leave_mutex(m, acquired);
}
return status;
}
-apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id)
+static apr_status_t out_close(h2_mplx *m, h2_task *task)
{
- apr_status_t status;
- AP_DEBUG_ASSERT(m);
- if (m->aborted) {
+ apr_status_t status = APR_SUCCESS;
+ h2_stream *stream;
+
+ if (!task) {
return APR_ECONNABORTED;
}
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io) {
- status = h2_io_in_close(io);
- if (io->input_arrived) {
- apr_thread_cond_signal(io->input_arrived);
- }
- }
- else {
- status = APR_ECONNABORTED;
- }
- apr_thread_mutex_unlock(m->lock);
+
+ stream = h2_ihash_get(m->streams, task->stream_id);
+ if (!stream) {
+ return APR_ECONNABORTED;
+ }
+
+ if (!task->response && !task->rst_error) {
+ /* In case a close comes before a response was created,
+ * insert an error one so that our streams can properly reset.
+ */
+ h2_response *r = h2_response_die(task->stream_id, 500,
+ task->request, m->pool);
+ status = out_open(m, task->stream_id, r);
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c, APLOGNO(03393)
+ "h2_mplx(%s): close, no response, no rst", task->id);
+ }
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
+ "h2_mplx(%s): close", task->id);
+ if (task->output.beam) {
+ status = h2_beam_close(task->output.beam);
+ h2_beam_log(task->output.beam, task->stream_id, "out_close", m->c,
+ APLOG_TRACE2);
}
+ output_consumed_signal(m, task);
+ have_out_data_for(m, stream, 0);
return status;
}
-typedef struct {
- h2_mplx_consumed_cb *cb;
- void *cb_ctx;
- int streams_updated;
-} update_ctx;
-
-static int update_window(void *ctx, h2_io *io)
-{
- if (io->input_consumed) {
- update_ctx *uctx = (update_ctx*)ctx;
- uctx->cb(uctx->cb_ctx, io->id, io->input_consumed);
- io->input_consumed = 0;
- ++uctx->streams_updated;
- }
- return 1;
-}
-
-apr_status_t h2_mplx_in_update_windows(h2_mplx *m,
- h2_mplx_consumed_cb *cb, void *cb_ctx)
+apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
+ apr_thread_cond_t *iowait)
{
apr_status_t status;
+ int acquired;
+
AP_DEBUG_ASSERT(m);
- if (m->aborted) {
- return APR_ECONNABORTED;
- }
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
- update_ctx ctx;
-
- ctx.cb = cb;
- ctx.cb_ctx = cb_ctx;
- ctx.streams_updated = 0;
-
- status = APR_EAGAIN;
- h2_io_set_iter(m->stream_ios, update_window, &ctx);
-
- if (ctx.streams_updated) {
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+ if (m->aborted) {
+ status = APR_ECONNABORTED;
+ }
+ else if (!h2_ihash_empty(m->sready) || !h2_ihash_empty(m->sresume)) {
status = APR_SUCCESS;
}
- apr_thread_mutex_unlock(m->lock);
+ else {
+ purge_streams(m);
+ m->added_output = iowait;
+ status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
+ if (APLOGctrace2(m->c)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%ld): trywait on data for %f ms)",
+ m->id, timeout/1000.0);
+ }
+ m->added_output = NULL;
+ }
+ leave_mutex(m, acquired);
}
return status;
}
-apr_status_t h2_mplx_out_readx(h2_mplx *m, int stream_id,
- h2_io_data_cb *cb, void *ctx,
- apr_size_t *plen, int *peos)
+static void have_out_data_for(h2_mplx *m, h2_stream *stream, int response)
+{
+ h2_ihash_t *set;
+ ap_assert(m);
+ ap_assert(stream);
+
+ set = response? m->sready : m->sresume;
+ if (!h2_ihash_get(set, stream->id)) {
+ h2_ihash_add(set, stream);
+ if (m->added_output) {
+ apr_thread_cond_signal(m->added_output);
+ }
+ }
+}
+
+apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx)
{
apr_status_t status;
+ int acquired;
+
AP_DEBUG_ASSERT(m);
- if (m->aborted) {
- return APR_ECONNABORTED;
- }
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io) {
- status = h2_io_out_readx(io, cb, ctx, plen, peos);
- if (status == APR_SUCCESS && io->output_drained) {
- apr_thread_cond_signal(io->output_drained);
- }
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+ if (m->aborted) {
+ status = APR_ECONNABORTED;
}
else {
- status = APR_ECONNABORTED;
+ h2_iq_sort(m->q, cmp, ctx);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): reprioritize tasks", m->id);
}
- apr_thread_mutex_unlock(m->lock);
+ leave_mutex(m, acquired);
}
return status;
}
-h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_stream_set *streams)
+apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream,
+ h2_stream_pri_cmp *cmp, void *ctx)
{
apr_status_t status;
- h2_stream *stream = NULL;
+ int do_registration = 0;
+ int acquired;
+
AP_DEBUG_ASSERT(m);
- if (m->aborted) {
- return NULL;
- }
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
- h2_io *io = h2_io_set_get_highest_prio(m->ready_ios);
- if (io) {
- h2_response *response = io->response;
- h2_io_set_remove(m->ready_ios, io);
-
- stream = h2_stream_set_get(streams, response->stream_id);
- if (stream) {
- h2_stream_set_response(stream, response, io->bbout);
- if (io->output_drained) {
- apr_thread_cond_signal(io->output_drained);
- }
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+ if (m->aborted) {
+ status = APR_ECONNABORTED;
+ }
+ else {
+ h2_ihash_add(m->streams, stream);
+ if (stream->response) {
+ /* already have a respone, schedule for submit */
+ h2_ihash_add(m->sready, stream);
}
else {
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, APR_NOTFOUND, m->c,
- APLOGNO(02953) "h2_mplx(%ld): stream for response %d",
- m->id, response->stream_id);
+ h2_beam_create(&stream->input, stream->pool, stream->id,
+ "input", 0);
+ if (!m->need_registration) {
+ m->need_registration = h2_iq_empty(m->q);
+ }
+ if (m->workers_busy < m->workers_max) {
+ do_registration = m->need_registration;
+ }
+ h2_iq_add(m->q, stream->id, cmp, ctx);
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
+ "h2_mplx(%ld-%d): process, body=%d",
+ m->c->id, stream->id, stream->request->body);
}
}
- apr_thread_mutex_unlock(m->lock);
+ leave_mutex(m, acquired);
+ }
+ if (do_registration) {
+ m->need_registration = 0;
+ h2_workers_register(m->workers, m);
}
- return stream;
+ return status;
}
-static apr_status_t out_write(h2_mplx *m, h2_io *io,
- ap_filter_t* f, apr_bucket_brigade *bb,
- struct apr_thread_cond_t *iowait)
+static h2_task *pop_task(h2_mplx *m)
{
- apr_status_t status = APR_SUCCESS;
- /* We check the memory footprint queued for this stream_id
- * and block if it exceeds our configured limit.
- * We will not split buckets to enforce the limit to the last
- * byte. After all, the bucket is already in memory.
- */
- while (!APR_BRIGADE_EMPTY(bb)
- && (status == APR_SUCCESS)
- && !is_aborted(m, &status)) {
-
- status = h2_io_out_write(io, bb, m->stream_max_mem,
- &m->file_handles_allowed);
+ h2_task *task = NULL;
+ h2_stream *stream;
+ int sid;
+ while (!m->aborted && !task && (m->workers_busy < m->workers_limit)
+ && (sid = h2_iq_shift(m->q)) > 0) {
- /* Wait for data to drain until there is room again */
- while (!APR_BRIGADE_EMPTY(bb)
- && iowait
- && status == APR_SUCCESS
- && (m->stream_max_mem <= h2_io_out_length(io))
- && !is_aborted(m, &status)) {
- io->output_drained = iowait;
- if (f) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
- "h2_mplx(%ld-%d): waiting for out drain",
- m->id, io->id);
+ stream = h2_ihash_get(m->streams, sid);
+ if (stream) {
+ conn_rec *slave, **pslave;
+ int new_conn = 0;
+
+ pslave = (conn_rec **)apr_array_pop(m->spare_slaves);
+ if (pslave) {
+ slave = *pslave;
+ }
+ else {
+ slave = h2_slave_create(m->c, m->pool, NULL);
+ new_conn = 1;
}
- apr_thread_cond_wait(io->output_drained, m->lock);
- io->output_drained = NULL;
+
+ slave->sbh = m->c->sbh;
+ slave->aborted = 0;
+ task = h2_task_create(slave, stream->request, stream->input, m);
+ h2_ihash_add(m->tasks, task);
+
+ m->c->keepalives++;
+ apr_table_setn(slave->notes, H2_TASK_ID_NOTE, task->id);
+ if (new_conn) {
+ h2_slave_run_pre_connection(slave, ap_get_conn_socket(slave));
+ }
+ stream->started = 1;
+ task->worker_started = 1;
+ task->started_at = apr_time_now();
+ if (sid > m->max_stream_started) {
+ m->max_stream_started = sid;
+ }
+
+ if (stream->input) {
+ h2_beam_timeout_set(stream->input, m->stream_timeout);
+ h2_beam_on_consumed(stream->input, stream_input_consumed, m);
+ h2_beam_on_file_beam(stream->input, can_beam_file, m);
+ h2_beam_mutex_set(stream->input, beam_enter, task->cond, m);
+ }
+
+ ++m->workers_busy;
}
}
- apr_brigade_cleanup(bb);
- return status;
+ return task;
}
-static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response,
- ap_filter_t* f, apr_bucket_brigade *bb,
- struct apr_thread_cond_t *iowait)
+h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more)
{
- apr_status_t status = APR_SUCCESS;
+ h2_task *task = NULL;
+ apr_status_t status;
+ int acquired;
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io) {
- if (f) {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c,
- "h2_mplx(%ld-%d): open response: %s",
- m->id, stream_id, response->status);
+ AP_DEBUG_ASSERT(m);
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+ if (m->aborted) {
+ *has_more = 0;
+ }
+ else {
+ task = pop_task(m);
+ *has_more = !h2_iq_empty(m->q);
}
- io->response = h2_response_copy(io->pool, response);
- h2_io_set_add(m->ready_ios, io);
- if (bb) {
- status = out_write(m, io, f, bb, iowait);
+ if (has_more && !task) {
+ m->need_registration = 1;
}
- have_out_data_for(m, stream_id);
+ leave_mutex(m, acquired);
+ }
+ return task;
+}
+
+static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
+{
+ if (task->frozen) {
+ /* this task was handed over to an engine for processing
+ * and the original worker has finished. That means the
+ * engine may start processing now. */
+ h2_task_thaw(task);
+ /* we do not want the task to block on writing response
+ * bodies into the mplx. */
+ h2_task_set_io_blocking(task, 0);
+ apr_thread_cond_broadcast(m->task_thawed);
+ return;
}
else {
- status = APR_ECONNABORTED;
+ h2_stream *stream;
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): task(%s) done", m->id, task->id);
+ out_close(m, task);
+ stream = h2_ihash_get(m->streams, task->stream_id);
+
+ if (ngn) {
+ apr_off_t bytes = 0;
+ if (task->output.beam) {
+ h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
+ bytes += h2_beam_get_buffered(task->output.beam);
+ }
+ if (bytes > 0) {
+ /* we need to report consumed and current buffered output
+ * to the engine. The request will be streamed out or cancelled,
+ * no more data is coming from it and the engine should update
+ * its calculations before we destroy this information. */
+ h2_req_engine_out_consumed(ngn, task->c, bytes);
+ }
+ }
+
+ if (task->engine) {
+ if (!h2_req_engine_is_shutdown(task->engine)) {
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+ "h2_mplx(%ld): task(%s) has not-shutdown "
+ "engine(%s)", m->id, task->id,
+ h2_req_engine_get_id(task->engine));
+ }
+ h2_ngn_shed_done_ngn(m->ngn_shed, task->engine);
+ }
+
+ if (!m->aborted && stream && m->redo_tasks
+ && h2_ihash_get(m->redo_tasks, task->stream_id)) {
+ /* reset and schedule again */
+ h2_task_redo(task);
+ h2_ihash_remove(m->redo_tasks, task->stream_id);
+ h2_iq_add(m->q, task->stream_id, NULL, NULL);
+ return;
+ }
+
+ task->worker_done = 1;
+ task->done_at = apr_time_now();
+ if (task->output.beam) {
+ h2_beam_on_consumed(task->output.beam, NULL, NULL);
+ h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL);
+ }
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%s): request done, %f ms elapsed", task->id,
+ (task->done_at - task->started_at) / 1000.0);
+ if (task->started_at > m->last_idle_block) {
+ /* this task finished without causing an 'idle block', e.g.
+ * a block by flow control.
+ */
+ if (task->done_at- m->last_limit_change >= m->limit_change_interval
+ && m->workers_limit < m->workers_max) {
+ /* Well behaving stream, allow it more workers */
+ m->workers_limit = H2MIN(m->workers_limit * 2,
+ m->workers_max);
+ m->last_limit_change = task->done_at;
+ m->need_registration = 1;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): increase worker limit to %d",
+ m->id, m->workers_limit);
+ }
+ }
+
+ if (stream) {
+ /* hang around until the stream deregisters */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%s): task_done, stream still open",
+ task->id);
+ /* more data will not arrive, resume the stream */
+ have_out_data_for(m, stream, 0);
+ }
+ else {
+ /* stream no longer active, was it placed in hold? */
+ stream = h2_ihash_get(m->shold, task->stream_id);
+ if (stream) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%s): task_done, stream in hold",
+ task->id);
+ /* We cannot destroy the stream here since this is
+ * called from a worker thread and freeing memory pools
+ * is only safe in the only thread using it (and its
+ * parent pool / allocator) */
+ h2_ihash_remove(m->shold, stream->id);
+ h2_ihash_add(m->spurge, stream);
+ }
+ else {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%s): task_done, stream not found",
+ task->id);
+ task_destroy(m, task, 0);
+ }
+
+ if (m->join_wait) {
+ apr_thread_cond_signal(m->join_wait);
+ }
+ }
}
- return status;
}
-apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response,
- ap_filter_t* f, apr_bucket_brigade *bb,
- struct apr_thread_cond_t *iowait)
+void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask)
{
- apr_status_t status;
- AP_DEBUG_ASSERT(m);
- if (m->aborted) {
- return APR_ECONNABORTED;
+ int acquired;
+
+ if (enter_mutex(m, &acquired) == APR_SUCCESS) {
+ task_done(m, task, NULL);
+ --m->workers_busy;
+ if (ptask) {
+ /* caller wants another task */
+ *ptask = pop_task(m);
+ }
+ leave_mutex(m, acquired);
}
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
- status = out_open(m, stream_id, response, f, bb, iowait);
- if (m->aborted) {
- return APR_ECONNABORTED;
+}
+
+/*******************************************************************************
+ * h2_mplx DoS protection
+ ******************************************************************************/
+
+static int latest_repeatable_unsubmitted_iter(void *data, void *val)
+{
+ task_iter_ctx *ctx = data;
+ h2_task *task = val;
+ if (!task->worker_done && h2_task_can_redo(task)
+ && !h2_ihash_get(ctx->m->redo_tasks, task->stream_id)) {
+ /* this task occupies a worker, the response has not been submitted yet,
+ * not been cancelled and it is a repeatable request
+ * -> it can be re-scheduled later */
+ if (!ctx->task || ctx->task->started_at < task->started_at) {
+ /* we did not have one or this one was started later */
+ ctx->task = task;
}
- apr_thread_mutex_unlock(m->lock);
}
- return status;
+ return 1;
}
+static h2_task *get_latest_repeatable_unsubmitted_task(h2_mplx *m)
+{
+ task_iter_ctx ctx;
+ ctx.m = m;
+ ctx.task = NULL;
+ h2_ihash_iter(m->tasks, latest_repeatable_unsubmitted_iter, &ctx);
+ return ctx.task;
+}
-apr_status_t h2_mplx_out_write(h2_mplx *m, int stream_id,
- ap_filter_t* f, apr_bucket_brigade *bb,
- struct apr_thread_cond_t *iowait)
+static int timed_out_busy_iter(void *data, void *val)
{
- apr_status_t status;
- AP_DEBUG_ASSERT(m);
- if (m->aborted) {
- return APR_ECONNABORTED;
+ task_iter_ctx *ctx = data;
+ h2_task *task = val;
+ if (!task->worker_done
+ && (ctx->now - task->started_at) > ctx->m->stream_timeout) {
+ /* timed out stream occupying a worker, found */
+ ctx->task = task;
+ return 0;
}
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
- if (!m->aborted) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io) {
- status = out_write(m, io, f, bb, iowait);
- have_out_data_for(m, stream_id);
- if (m->aborted) {
- return APR_ECONNABORTED;
- }
- }
- else {
- status = APR_ECONNABORTED;
- }
- }
-
- if (m->lock) {
- apr_thread_mutex_unlock(m->lock);
+ return 1;
+}
+
+static h2_task *get_timed_out_busy_task(h2_mplx *m)
+{
+ task_iter_ctx ctx;
+ ctx.m = m;
+ ctx.task = NULL;
+ ctx.now = apr_time_now();
+ h2_ihash_iter(m->tasks, timed_out_busy_iter, &ctx);
+ return ctx.task;
+}
+
+static apr_status_t unschedule_slow_tasks(h2_mplx *m)
+{
+ h2_task *task;
+ int n;
+
+ if (!m->redo_tasks) {
+ m->redo_tasks = h2_ihash_create(m->pool, offsetof(h2_task, stream_id));
+ }
+ /* Try to get rid of streams that occupy workers. Look for safe requests
+ * that are repeatable. If none found, fail the connection.
+ */
+ n = (m->workers_busy - m->workers_limit - h2_ihash_count(m->redo_tasks));
+ while (n > 0 && (task = get_latest_repeatable_unsubmitted_task(m))) {
+ h2_task_rst(task, H2_ERR_CANCEL);
+ h2_ihash_add(m->redo_tasks, task);
+ --n;
+ }
+
+ if ((m->workers_busy - h2_ihash_count(m->redo_tasks)) > m->workers_limit) {
+ task = get_timed_out_busy_task(m);
+ if (task) {
+ /* Too many busy workers, unable to cancel enough streams
+ * and with a busy, timed out stream, we tell the client
+ * to go away... */
+ return APR_TIMEUP;
}
}
- return status;
+ return APR_SUCCESS;
}
-apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id)
+apr_status_t h2_mplx_idle(h2_mplx *m)
{
- apr_status_t status;
- AP_DEBUG_ASSERT(m);
- if (m->aborted) {
- return APR_ECONNABORTED;
- }
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
- if (!m->aborted) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io) {
- if (!io->response->ngheader) {
- /* In case a close comes before a response was created,
- * insert an error one so that our streams can properly
- * reset.
- */
- h2_response *r = h2_response_create(stream_id,
- "500", NULL, m->pool);
- status = out_open(m, stream_id, r, NULL, NULL, NULL);
+ apr_status_t status = APR_SUCCESS;
+ apr_time_t now;
+ int acquired;
+
+ if (enter_mutex(m, &acquired) == APR_SUCCESS) {
+ apr_size_t scount = h2_ihash_count(m->streams);
+ if (scount > 0 && m->workers_busy) {
+ /* If we have streams in connection state 'IDLE', meaning
+ * all streams are ready to sent data out, but lack
+ * WINDOW_UPDATEs.
+ *
+ * This is ok, unless we have streams that still occupy
+ * h2 workers. As worker threads are a scarce resource,
+ * we need to take measures that we do not get DoSed.
+ *
+ * This is what we call an 'idle block'. Limit the amount
+ * of busy workers we allow for this connection until it
+ * well behaves.
+ */
+ now = apr_time_now();
+ m->last_idle_block = now;
+ if (m->workers_limit > 2
+ && now - m->last_limit_change >= m->limit_change_interval) {
+ if (m->workers_limit > 16) {
+ m->workers_limit = 16;
}
- status = h2_io_out_close(io);
- have_out_data_for(m, stream_id);
- if (m->aborted) {
- /* if we were the last output, the whole session might
- * have gone down in the meantime.
- */
- return APR_SUCCESS;
+ else if (m->workers_limit > 8) {
+ m->workers_limit = 8;
+ }
+ else if (m->workers_limit > 4) {
+ m->workers_limit = 4;
}
+ else if (m->workers_limit > 2) {
+ m->workers_limit = 2;
+ }
+ m->last_limit_change = now;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): decrease worker limit to %d",
+ m->id, m->workers_limit);
}
- else {
- status = APR_ECONNABORTED;
+
+ if (m->workers_busy > m->workers_limit) {
+ status = unschedule_slow_tasks(m);
}
}
- apr_thread_mutex_unlock(m->lock);
+ leave_mutex(m, acquired);
}
return status;
}
-int h2_mplx_in_has_eos_for(h2_mplx *m, int stream_id)
+/*******************************************************************************
+ * HTTP/2 request engines
+ ******************************************************************************/
+
+typedef struct {
+ h2_mplx * m;
+ h2_req_engine *ngn;
+ int streams_updated;
+} ngn_update_ctx;
+
+static int ngn_update_window(void *ctx, void *val)
{
- int has_eos = 0;
- apr_status_t status;
- AP_DEBUG_ASSERT(m);
- if (m->aborted) {
- return 0;
- }
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io) {
- has_eos = h2_io_in_has_eos_for(io);
- }
- apr_thread_mutex_unlock(m->lock);
+ ngn_update_ctx *uctx = ctx;
+ h2_task *task = val;
+ if (task && task->assigned == uctx->ngn
+ && output_consumed_signal(uctx->m, task)) {
+ ++uctx->streams_updated;
}
- return has_eos;
+ return 1;
}
-int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id)
+static apr_status_t ngn_out_update_windows(h2_mplx *m, h2_req_engine *ngn)
{
- apr_status_t status;
- int has_data = 0;
- AP_DEBUG_ASSERT(m);
- if (m->aborted) {
- return 0;
- }
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io) {
- has_data = h2_io_out_has_data(io);
- }
- apr_thread_mutex_unlock(m->lock);
- }
- return has_data;
+ ngn_update_ctx ctx;
+
+ ctx.m = m;
+ ctx.ngn = ngn;
+ ctx.streams_updated = 0;
+ h2_ihash_iter(m->tasks, ngn_update_window, &ctx);
+
+ return ctx.streams_updated? APR_SUCCESS : APR_EAGAIN;
}
-apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
- apr_thread_cond_t *iowait)
+apr_status_t h2_mplx_req_engine_push(const char *ngn_type,
+ request_rec *r,
+ http2_req_engine_init *einit)
{
apr_status_t status;
- AP_DEBUG_ASSERT(m);
- if (m->aborted) {
+ h2_mplx *m;
+ h2_task *task;
+ int acquired;
+
+ task = h2_ctx_rget_task(r);
+ if (!task) {
return APR_ECONNABORTED;
}
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
- m->added_output = iowait;
- status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
- if (APLOGctrace2(m->c)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%ld): trywait on data for %f ms)",
- m->id, timeout/1000.0);
+ m = task->mplx;
+ task->r = r;
+
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+ h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
+
+ if (stream) {
+ status = h2_ngn_shed_push_task(m->ngn_shed, ngn_type, task, einit);
}
- m->added_output = NULL;
- apr_thread_mutex_unlock(m->lock);
+ else {
+ status = APR_ECONNABORTED;
+ }
+ leave_mutex(m, acquired);
}
return status;
}
-static void have_out_data_for(h2_mplx *m, int stream_id)
+apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn,
+ apr_read_type_e block,
+ apr_uint32_t capacity,
+ request_rec **pr)
+{
+ h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn);
+ h2_mplx *m = h2_ngn_shed_get_ctx(shed);
+ apr_status_t status;
+ h2_task *task = NULL;
+ int acquired;
+
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+ int want_shutdown = (block == APR_BLOCK_READ);
+
+ /* Take this opportunity to update output consummation
+ * for this engine */
+ ngn_out_update_windows(m, ngn);
+
+ if (want_shutdown && !h2_iq_empty(m->q)) {
+ /* For a blocking read, check first if requests are to be
+ * had and, if not, wait a short while before doing the
+ * blocking, and if unsuccessful, terminating read.
+ */
+ status = h2_ngn_shed_pull_task(shed, ngn, capacity, 1, &task);
+ if (APR_STATUS_IS_EAGAIN(status)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): start block engine pull", m->id);
+ apr_thread_cond_timedwait(m->task_thawed, m->lock,
+ apr_time_from_msec(20));
+ status = h2_ngn_shed_pull_task(shed, ngn, capacity, 1, &task);
+ }
+ }
+ else {
+ status = h2_ngn_shed_pull_task(shed, ngn, capacity,
+ want_shutdown, &task);
+ }
+ leave_mutex(m, acquired);
+ }
+ *pr = task? task->r : NULL;
+ return status;
+}
+
+void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn)
{
- (void)stream_id;
- AP_DEBUG_ASSERT(m);
- if (m->added_output) {
- apr_thread_cond_signal(m->added_output);
+ h2_task *task = h2_ctx_cget_task(r_conn);
+
+ if (task) {
+ h2_mplx *m = task->mplx;
+ int acquired;
+
+ if (enter_mutex(m, &acquired) == APR_SUCCESS) {
+ ngn_out_update_windows(m, ngn);
+ h2_ngn_shed_done_task(m->ngn_shed, ngn, task);
+ if (task->engine) {
+ /* cannot report that as done until engine returns */
+ }
+ else {
+ task_done(m, task, ngn);
+ }
+ /* Take this opportunity to update output consummation
+ * for this engine */
+ leave_mutex(m, acquired);
+ }
}
}
-apr_status_t h2_mplx_do_task(h2_mplx *m, struct h2_task *task)
+/*******************************************************************************
+ * mplx master events dispatching
+ ******************************************************************************/
+
+static int update_window(void *ctx, void *val)
+{
+ input_consumed_signal(ctx, val);
+ return 1;
+}
+
+apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m,
+ stream_ev_callback *on_resume,
+ stream_ev_callback *on_response,
+ void *on_ctx)
{
apr_status_t status;
+ int acquired;
+ int streams[32];
+ h2_stream *stream;
+ h2_task *task;
+ size_t i, n;
+
AP_DEBUG_ASSERT(m);
- if (m->aborted) {
- return APR_ECONNABORTED;
- }
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
- /* TODO: needs to sort queue by priority */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx: do task(%s)", task->id);
- h2_tq_append(m->q, task);
- apr_thread_mutex_unlock(m->lock);
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
+ "h2_mplx(%ld): dispatch events", m->id);
+
+ /* update input windows for streams */
+ h2_ihash_iter(m->streams, update_window, m);
+
+ if (on_response && !h2_ihash_empty(m->sready)) {
+ n = h2_ihash_ishift(m->sready, streams, H2_ALEN(streams));
+ for (i = 0; i < n; ++i) {
+ stream = h2_ihash_get(m->streams, streams[i]);
+ if (!stream) {
+ continue;
+ }
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
+ "h2_mplx(%ld-%d): on_response",
+ m->id, stream->id);
+ task = h2_ihash_get(m->tasks, stream->id);
+ if (task) {
+ task->response_sent = 1;
+ if (task->rst_error) {
+ h2_stream_rst(stream, task->rst_error);
+ }
+ else {
+ AP_DEBUG_ASSERT(task->response);
+ status = h2_stream_add_response(stream, task->response,
+ task->output.beam);
+ if (status != APR_SUCCESS) {
+ h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
+ }
+ if (!h2_response_get_final(task->response)) {
+ /* the final response needs still to arrive */
+ task->response = NULL;
+ }
+ }
+ }
+ else {
+ /* We have the stream ready without a task. This happens
+ * when we fail streams early. A response should already
+ * be present. */
+ AP_DEBUG_ASSERT(stream->response || stream->rst_error);
+ }
+ status = on_response(on_ctx, stream->id);
+ }
+ }
+
+ if (on_resume && !h2_ihash_empty(m->sresume)) {
+ n = h2_ihash_ishift(m->sresume, streams, H2_ALEN(streams));
+ for (i = 0; i < n; ++i) {
+ stream = h2_ihash_get(m->streams, streams[i]);
+ if (!stream) {
+ continue;
+ }
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
+ "h2_mplx(%ld-%d): on_resume",
+ m->id, stream->id);
+ task = h2_ihash_get(m->tasks, stream->id);
+ if (task && task->rst_error) {
+ h2_stream_rst(stream, task->rst_error);
+ }
+ h2_stream_set_suspended(stream, 0);
+ status = on_resume(on_ctx, stream->id);
+ }
+ }
+
+ leave_mutex(m, acquired);
}
- workers_register(m);
return status;
}
-h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more)
+apr_status_t h2_mplx_suspend_stream(h2_mplx *m, int stream_id)
{
- h2_task *task = NULL;
apr_status_t status;
+ h2_stream *stream;
+ h2_task *task;
+ int acquired;
+
AP_DEBUG_ASSERT(m);
- if (m->aborted) {
- *has_more = 0;
- return NULL;
- }
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
- task = h2_tq_pop_first(m->q);
- if (task) {
- h2_task_set_started(task);
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+ stream = h2_ihash_get(m->streams, stream_id);
+ if (stream && !h2_ihash_get(m->sresume, stream->id)) {
+ /* not marked for resume again already */
+ h2_stream_set_suspended(stream, 1);
+ task = h2_ihash_get(m->tasks, stream->id);
+ if (stream->started && (!task || task->worker_done)) {
+ h2_ihash_add(m->sresume, stream);
+ }
}
- *has_more = !h2_tq_empty(m->q);
- apr_thread_mutex_unlock(m->lock);
+ leave_mutex(m, acquired);
}
- return task;
+ return status;
}
-apr_status_t h2_mplx_create_task(h2_mplx *m, struct h2_stream *stream)
+int h2_mplx_is_busy(h2_mplx *m)
{
apr_status_t status;
- AP_DEBUG_ASSERT(m);
- if (m->aborted) {
- return APR_ECONNABORTED;
- }
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
- conn_rec *c = h2_conn_create(m->c, stream->pool);
- stream->task = h2_task_create(m->id, stream->id,
- stream->pool, m, c);
-
- apr_thread_mutex_unlock(m->lock);
+ int acquired, busy = 1;
+
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+ if (h2_ihash_empty(m->streams)) {
+ busy = 0;
+ }
+ if (h2_iq_empty(m->q) && h2_ihash_empty(m->tasks)) {
+ busy = 0;
+ }
+ leave_mutex(m, acquired);
}
- return status;
+ return busy;
}
-