1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2 * contributor license agreements. See the NOTICE file distributed with
3 * this work for additional information regarding copyright ownership.
4 * The ASF licenses this file to You under the Apache License, Version 2.0
5 * (the "License"); you may not use this file except in compliance with
6 * the License. You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
18 #include <apr_atomic.h>
19 #include <apr_thread_mutex.h>
20 #include <apr_thread_cond.h>
22 #include <mpm_common.h>
24 #include <http_core.h>
28 #include "h2_private.h"
31 #include "h2_workers.h"
34 typedef struct h2_slot h2_slot;
43 apr_thread_mutex_t *lock;
44 apr_thread_cond_t *not_idle;
47 static h2_slot *pop_slot(h2_slot **phead)
49 /* Atomically pop a slot from the list */
51 h2_slot *first = *phead;
55 if (apr_atomic_casptr((void*)phead, first->next, first) == first) {
62 static void push_slot(h2_slot **phead, h2_slot *slot)
64 /* Atomically push a slot to the list */
65 ap_assert(!slot->next);
67 h2_slot *next = slot->next = *phead;
68 if (apr_atomic_casptr((void*)phead, slot, next) == next) {
74 static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx);
76 static apr_status_t activate_slot(h2_workers *workers, h2_slot *slot)
80 slot->workers = workers;
85 status = apr_thread_mutex_create(&slot->lock,
86 APR_THREAD_MUTEX_DEFAULT,
88 if (status != APR_SUCCESS) {
89 push_slot(&workers->free, slot);
94 if (!slot->not_idle) {
95 status = apr_thread_cond_create(&slot->not_idle, workers->pool);
96 if (status != APR_SUCCESS) {
97 push_slot(&workers->free, slot);
102 ap_log_error(APLOG_MARK, APLOG_TRACE2, 0, workers->s,
103 "h2_workers: new thread for slot %d", slot->id);
104 /* thread will either immediately start work or add itself
105 * to the idle queue */
106 apr_thread_create(&slot->thread, workers->thread_attr, slot_run, slot,
109 push_slot(&workers->free, slot);
113 apr_atomic_inc32(&workers->worker_count);
117 static apr_status_t add_worker(h2_workers *workers)
119 h2_slot *slot = pop_slot(&workers->free);
121 return activate_slot(workers, slot);
126 static void wake_idle_worker(h2_workers *workers)
128 h2_slot *slot = pop_slot(&workers->idle);
130 apr_thread_mutex_lock(slot->lock);
131 apr_thread_cond_signal(slot->not_idle);
132 apr_thread_mutex_unlock(slot->lock);
134 else if (workers->dynamic) {
139 static void cleanup_zombies(h2_workers *workers)
142 while ((slot = pop_slot(&workers->zombies))) {
145 apr_thread_join(&status, slot->thread);
148 apr_atomic_dec32(&workers->worker_count);
150 push_slot(&workers->free, slot);
154 static apr_status_t slot_pull_task(h2_slot *slot, h2_mplx *m)
158 rv = h2_mplx_pop_task(m, &slot->task);
160 /* Ok, we got something to give back to the worker for execution.
161 * If we still have idle workers, we let the worker be sticky,
162 * e.g. making it poll the task's h2_mplx instance for more work
163 * before asking back here. */
164 slot->sticks = slot->workers->max_workers;
171 static h2_fifo_op_t mplx_peek(void *head, void *ctx)
176 if (slot_pull_task(slot, m) == APR_EAGAIN) {
177 wake_idle_worker(slot->workers);
178 return H2_FIFO_OP_REPUSH;
180 return H2_FIFO_OP_PULL;
184 * Get the next task for the given worker. Will block until a task arrives
185 * or the max_wait timer expires and more than min workers exist.
187 static apr_status_t get_next(h2_slot *slot)
189 h2_workers *workers = slot->workers;
193 while (!slot->aborted) {
195 status = h2_fifo_try_peek(workers->mplxs, mplx_peek, slot);
196 if (status == APR_EOF) {
205 cleanup_zombies(workers);
207 apr_thread_mutex_lock(slot->lock);
208 push_slot(&workers->idle, slot);
209 apr_thread_cond_wait(slot->not_idle, slot->lock);
210 apr_thread_mutex_unlock(slot->lock);
215 static void slot_done(h2_slot *slot)
217 push_slot(&(slot->workers->zombies), slot);
221 static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx)
223 h2_slot *slot = wctx;
225 while (!slot->aborted) {
227 /* Get a h2_task from the mplxs queue. */
231 h2_task_do(slot->task, thread, slot->id);
233 /* Report the task as done. If stickyness is left, offer the
234 * mplx the opportunity to give us back a new task right away.
236 if (!slot->aborted && (--slot->sticks > 0)) {
237 h2_mplx_task_done(slot->task->mplx, slot->task, &slot->task);
240 h2_mplx_task_done(slot->task->mplx, slot->task, NULL);
250 static apr_status_t workers_pool_cleanup(void *data)
252 h2_workers *workers = data;
255 if (!workers->aborted) {
256 workers->aborted = 1;
257 /* abort all idle slots */
259 slot = pop_slot(&workers->idle);
261 apr_thread_mutex_lock(slot->lock);
263 apr_thread_cond_signal(slot->not_idle);
264 apr_thread_mutex_unlock(slot->lock);
271 h2_fifo_term(workers->mplxs);
272 h2_fifo_interrupt(workers->mplxs);
274 cleanup_zombies(workers);
279 h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool,
280 int min_workers, int max_workers,
289 ap_assert(server_pool);
291 /* let's have our own pool that will be parent to all h2_worker
292 * instances we create. This happens in various threads, but always
293 * guarded by our lock. Without this pool, all subpool creations would
294 * happen on the pool handed to us, which we do not guard.
296 apr_pool_create(&pool, server_pool);
297 apr_pool_tag(pool, "h2_workers");
298 workers = apr_pcalloc(pool, sizeof(h2_workers));
304 workers->pool = pool;
305 workers->min_workers = min_workers;
306 workers->max_workers = max_workers;
307 workers->max_idle_secs = (idle_secs > 0)? idle_secs : 10;
309 /* FIXME: the fifo set we use here has limited capacity. Once the
310 * set is full, connections with new requests do a wait. Unfortunately,
311 * we have optimizations in place there that makes such waiting "unfair"
312 * in the sense that it may take connections a looong time to get scheduled.
314 * Need to rewrite this to use one of our double-linked lists and a mutex
315 * to have unlimited capacity and fair scheduling.
317 * For now, we just make enough room to have many connections inside one
320 status = h2_fifo_set_create(&workers->mplxs, pool, 8 * 1024);
321 if (status != APR_SUCCESS) {
325 status = apr_threadattr_create(&workers->thread_attr, workers->pool);
326 if (status != APR_SUCCESS) {
330 if (ap_thread_stacksize != 0) {
331 apr_threadattr_stacksize_set(workers->thread_attr,
332 ap_thread_stacksize);
333 ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, s,
334 "h2_workers: using stacksize=%ld",
335 (long)ap_thread_stacksize);
338 status = apr_thread_mutex_create(&workers->lock,
339 APR_THREAD_MUTEX_DEFAULT,
341 if (status == APR_SUCCESS) {
342 n = workers->nslots = workers->max_workers;
343 workers->slots = apr_pcalloc(workers->pool, n * sizeof(h2_slot));
344 if (workers->slots == NULL) {
348 for (i = 0; i < n; ++i) {
349 workers->slots[i].id = i;
352 if (status == APR_SUCCESS) {
353 /* we activate all for now, TODO: support min_workers again.
354 * do this in reverse for vanity reasons so slot 0 will most
355 * likely be at head of idle queue. */
356 n = workers->max_workers;
357 for (i = n-1; i >= 0; --i) {
358 status = activate_slot(workers, &workers->slots[i]);
360 /* the rest of the slots go on the free list */
361 for(i = n; i < workers->nslots; ++i) {
362 push_slot(&workers->free, &workers->slots[i]);
364 workers->dynamic = (workers->worker_count < workers->max_workers);
366 if (status == APR_SUCCESS) {
367 apr_pool_pre_cleanup_register(pool, workers, workers_pool_cleanup);
373 apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m)
375 apr_status_t status = h2_fifo_push(workers->mplxs, m);
376 wake_idle_worker(workers);
380 apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m)
382 return h2_fifo_remove(workers->mplxs, m);