return status;
}
+static void register_if_needed(h2_mplx *m)
+{
+ if (!m->is_registered && !h2_iq_empty(m->q)) {
+ apr_status_t status = h2_workers_register(m->workers, m);
+ if (status == APR_SUCCESS) {
+ m->is_registered = 1;
+ }
+ else {
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, status, m->c, APLOGNO()
+ "h2_mplx(%ld): register at workers", m->id);
+ }
+ }
+}
+
apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream,
h2_stream_pri_cmp *cmp, void *ctx)
{
if (h2_stream_is_ready(stream)) {
/* already have a response */
check_data_for(m, stream->id);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
H2_STRM_MSG(stream, "process, add to readyq"));
}
else {
- h2_iq_add(m->q, stream->id, cmp, ctx);
- if (!m->is_registered) {
- if (h2_workers_register(m->workers, m) == APR_SUCCESS) {
- m->is_registered = 1;
- }
- }
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
+ h2_iq_add(m->q, stream->id, cmp, ctx);
+ register_if_needed(m);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
H2_STRM_MSG(stream, "process, added to q"));
}
}
/* caller wants another task */
*ptask = next_stream_task(m);
}
- if (!m->is_registered && !h2_iq_empty(m->q)) {
- if (h2_workers_register(m->workers, m) == APR_SUCCESS) {
- m->is_registered = 1;
- }
- }
+ register_if_needed(m);
leave_mutex(m, acquired);
}
}
status = unschedule_slow_tasks(m);
}
}
+ register_if_needed(m);
leave_mutex(m, acquired);
}
return status;
}
}
+static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx);
+
+static apr_status_t activate_slot(h2_workers *workers, h2_slot *slot)
+{
+ apr_status_t status;
+
+ slot->workers = workers;
+ slot->aborted = 0;
+ slot->task = NULL;
+ if (!slot->not_idle) {
+ status = apr_thread_cond_create(&slot->not_idle, workers->pool);
+ if (status != APR_SUCCESS) {
+ push_slot(&workers->free, slot);
+ return status;
+ }
+ }
+
+ /* thread will either immediately start work or add itself
+ * to the idle queue */
+ apr_thread_create(&slot->thread, workers->thread_attr, slot_run, slot,
+ workers->pool);
+ if (!slot->thread) {
+ push_slot(&workers->free, slot);
+ return APR_ENOMEM;
+ }
+
+ ++workers->worker_count;
+ return APR_SUCCESS;
+}
+
+static apr_status_t add_worker(h2_workers *workers)
+{
+ h2_slot *slot = pop_slot(&workers->free);
+ if (slot) {
+ return activate_slot(workers, slot);
+ }
+ return APR_EAGAIN;
+}
+
static void wake_idle_worker(h2_workers *workers)
{
h2_slot *slot = pop_slot(&workers->idle);
apr_thread_cond_signal(slot->not_idle);
apr_thread_mutex_unlock(workers->lock);
}
+ else if (workers->dynamic) {
+ add_worker(workers);
+ }
}
static void cleanup_zombies(h2_workers *workers)
}
apr_thread_mutex_lock(workers->lock);
- ++workers->idle_workers;
cleanup_zombies(workers);
- if (slot->next == NULL) {
- push_slot(&workers->idle, slot);
- }
+
+ ++workers->idle_workers;
+ push_slot(&workers->idle, slot);
apr_thread_cond_wait(slot->not_idle, workers->lock);
+ --workers->idle_workers;
+
apr_thread_mutex_unlock(workers->lock);
}
return APR_EOF;
return NULL;
}
-static apr_status_t activate_slot(h2_workers *workers)
-{
- h2_slot *slot = pop_slot(&workers->free);
- if (slot) {
- apr_status_t status;
-
- slot->workers = workers;
- slot->aborted = 0;
- slot->task = NULL;
- if (!slot->not_idle) {
- status = apr_thread_cond_create(&slot->not_idle, workers->pool);
- if (status != APR_SUCCESS) {
- push_slot(&workers->free, slot);
- return status;
- }
- }
-
- apr_thread_create(&slot->thread, workers->thread_attr, slot_run, slot,
- workers->pool);
- if (!slot->thread) {
- push_slot(&workers->free, slot);
- return APR_ENOMEM;
- }
-
- ++workers->worker_count;
- return APR_SUCCESS;
- }
- return APR_EAGAIN;
-}
-
static apr_status_t workers_pool_cleanup(void *data)
{
h2_workers *workers = data;
apr_status_t status;
h2_workers *workers;
apr_pool_t *pool;
- int i;
+ int i, n;
ap_assert(s);
ap_assert(server_pool);
workers->max_workers = max_workers;
workers->max_idle_secs = (idle_secs > 0)? idle_secs : 10;
- status = h2_fifo_create(&workers->mplxs, pool, workers->max_workers);
+ status = h2_fifo_create(&workers->mplxs, pool, 2 * workers->max_workers);
if (status != APR_SUCCESS) {
return NULL;
}
APR_THREAD_MUTEX_DEFAULT,
workers->pool);
if (status == APR_SUCCESS) {
- int n = workers->nslots = workers->max_workers;
+ n = workers->nslots = workers->max_workers;
workers->slots = apr_pcalloc(workers->pool, n * sizeof(h2_slot));
if (workers->slots == NULL) {
+ workers->nslots = 0;
status = APR_ENOMEM;
}
- }
- if (status == APR_SUCCESS) {
- workers->free = &workers->slots[0];
- for (i = 0; i < workers->nslots-1; ++i) {
- workers->slots[i].next = &workers->slots[i+1];
+ for (i = 0; i < n; ++i) {
workers->slots[i].id = i;
}
- while (workers->worker_count < workers->max_workers
- && status == APR_SUCCESS) {
- status = activate_slot(workers);
+ }
+ if (status == APR_SUCCESS) {
+ /* we activate all for now, TODO: support min_workers again.
+ * do this in reverse for vanity reasons so slot 0 will most
+ * likely be at head of idle queue. */
+ n = workers->max_workers;
+ for (i = n-1; i >= 0; --i) {
+ status = activate_slot(workers, &workers->slots[i]);
+ }
+ /* the rest of the slots go on the free list */
+ for(i = n; i < workers->nslots; ++i) {
+ push_slot(&workers->free, &workers->slots[i]);
}
+ workers->dynamic = (workers->worker_count < workers->max_workers);
}
if (status == APR_SUCCESS) {
apr_pool_pre_cleanup_register(pool, workers, workers_pool_cleanup);
apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m)
{
- apr_status_t status;
- if ((status = h2_fifo_try_push(workers->mplxs, m)) != APR_SUCCESS) {
- ap_log_error(APLOG_MARK, APLOG_TRACE3, status, workers->s,
- "h2_workers: unable to push mplx(%ld)", m->id);
- }
+ apr_status_t status = h2_fifo_push(workers->mplxs, m);
wake_idle_worker(workers);
return status;
}