1 /* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
17 #include <apr_atomic.h>
18 #include <apr_thread_mutex.h>
19 #include <apr_thread_cond.h>
21 #include <mpm_common.h>
23 #include <http_core.h>
27 #include "h2_private.h"
30 #include "h2_workers.h"
33 typedef struct h2_slot h2_slot;
42 apr_thread_mutex_t *lock;
43 apr_thread_cond_t *not_idle;
46 static h2_slot *pop_slot(h2_slot **phead)
48 /* Atomically pop a slot from the list */
50 h2_slot *first = *phead;
54 if (apr_atomic_casptr((void*)phead, first->next, first) == first) {
61 static void push_slot(h2_slot **phead, h2_slot *slot)
63 /* Atomically push a slot to the list */
64 ap_assert(!slot->next);
66 h2_slot *next = slot->next = *phead;
67 if (apr_atomic_casptr((void*)phead, slot, next) == next) {
73 static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx);
75 static apr_status_t activate_slot(h2_workers *workers, h2_slot *slot)
79 slot->workers = workers;
84 status = apr_thread_mutex_create(&slot->lock,
85 APR_THREAD_MUTEX_DEFAULT,
87 if (status != APR_SUCCESS) {
88 push_slot(&workers->free, slot);
93 if (!slot->not_idle) {
94 status = apr_thread_cond_create(&slot->not_idle, workers->pool);
95 if (status != APR_SUCCESS) {
96 push_slot(&workers->free, slot);
101 ap_log_error(APLOG_MARK, APLOG_TRACE2, 0, workers->s,
102 "h2_workers: new thread for slot %d", slot->id);
103 /* thread will either immediately start work or add itself
104 * to the idle queue */
105 apr_thread_create(&slot->thread, workers->thread_attr, slot_run, slot,
108 push_slot(&workers->free, slot);
112 apr_atomic_inc32(&workers->worker_count);
116 static apr_status_t add_worker(h2_workers *workers)
118 h2_slot *slot = pop_slot(&workers->free);
120 return activate_slot(workers, slot);
125 static void wake_idle_worker(h2_workers *workers)
127 h2_slot *slot = pop_slot(&workers->idle);
129 apr_thread_mutex_lock(slot->lock);
130 apr_thread_cond_signal(slot->not_idle);
131 apr_thread_mutex_unlock(slot->lock);
133 else if (workers->dynamic) {
138 static void cleanup_zombies(h2_workers *workers)
141 while ((slot = pop_slot(&workers->zombies))) {
144 apr_thread_join(&status, slot->thread);
147 apr_atomic_dec32(&workers->worker_count);
149 push_slot(&workers->free, slot);
153 static apr_status_t slot_pull_task(h2_slot *slot, h2_mplx *m)
157 rv = h2_mplx_pop_task(m, &slot->task);
159 /* Ok, we got something to give back to the worker for execution.
160 * If we still have idle workers, we let the worker be sticky,
161 * e.g. making it poll the task's h2_mplx instance for more work
162 * before asking back here. */
163 slot->sticks = slot->workers->max_workers;
170 static h2_fifo_op_t mplx_peek(void *head, void *ctx)
175 if (slot_pull_task(slot, m) == APR_EAGAIN) {
176 wake_idle_worker(slot->workers);
177 return H2_FIFO_OP_REPUSH;
179 return H2_FIFO_OP_PULL;
183 * Get the next task for the given worker. Will block until a task arrives
184 * or the max_wait timer expires and more than min workers exist.
186 static apr_status_t get_next(h2_slot *slot)
188 h2_workers *workers = slot->workers;
192 while (!slot->aborted) {
194 status = h2_fifo_try_peek(workers->mplxs, mplx_peek, slot);
195 if (status == APR_EOF) {
204 cleanup_zombies(workers);
206 apr_thread_mutex_lock(slot->lock);
207 push_slot(&workers->idle, slot);
208 apr_thread_cond_wait(slot->not_idle, slot->lock);
209 apr_thread_mutex_unlock(slot->lock);
214 static void slot_done(h2_slot *slot)
216 push_slot(&(slot->workers->zombies), slot);
220 static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx)
222 h2_slot *slot = wctx;
224 while (!slot->aborted) {
226 /* Get a h2_task from the mplxs queue. */
230 h2_task_do(slot->task, thread, slot->id);
232 /* Report the task as done. If stickyness is left, offer the
233 * mplx the opportunity to give us back a new task right away.
235 if (!slot->aborted && (--slot->sticks > 0)) {
236 h2_mplx_task_done(slot->task->mplx, slot->task, &slot->task);
239 h2_mplx_task_done(slot->task->mplx, slot->task, NULL);
249 static apr_status_t workers_pool_cleanup(void *data)
251 h2_workers *workers = data;
254 if (!workers->aborted) {
255 workers->aborted = 1;
256 /* abort all idle slots */
258 slot = pop_slot(&workers->idle);
260 apr_thread_mutex_lock(slot->lock);
262 apr_thread_cond_signal(slot->not_idle);
263 apr_thread_mutex_unlock(slot->lock);
270 h2_fifo_term(workers->mplxs);
271 h2_fifo_interrupt(workers->mplxs);
273 cleanup_zombies(workers);
278 h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool,
279 int min_workers, int max_workers,
288 ap_assert(server_pool);
290 /* let's have our own pool that will be parent to all h2_worker
291 * instances we create. This happens in various threads, but always
292 * guarded by our lock. Without this pool, all subpool creations would
293 * happen on the pool handed to us, which we do not guard.
295 apr_pool_create(&pool, server_pool);
296 apr_pool_tag(pool, "h2_workers");
297 workers = apr_pcalloc(pool, sizeof(h2_workers));
303 workers->pool = pool;
304 workers->min_workers = min_workers;
305 workers->max_workers = max_workers;
306 workers->max_idle_secs = (idle_secs > 0)? idle_secs : 10;
308 /* FIXME: the fifo set we use here has limited capacity. Once the
309 * set is full, connections with new requests do a wait. Unfortunately,
310 * we have optimizations in place there that makes such waiting "unfair"
311 * in the sense that it may take connections a looong time to get scheduled.
313 * Need to rewrite this to use one of our double-linked lists and a mutex
314 * to have unlimited capacity and fair scheduling.
316 * For now, we just make enough room to have many connections inside one
319 status = h2_fifo_set_create(&workers->mplxs, pool, 8 * 1024);
320 if (status != APR_SUCCESS) {
324 status = apr_threadattr_create(&workers->thread_attr, workers->pool);
325 if (status != APR_SUCCESS) {
329 if (ap_thread_stacksize != 0) {
330 apr_threadattr_stacksize_set(workers->thread_attr,
331 ap_thread_stacksize);
332 ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, s,
333 "h2_workers: using stacksize=%ld",
334 (long)ap_thread_stacksize);
337 status = apr_thread_mutex_create(&workers->lock,
338 APR_THREAD_MUTEX_DEFAULT,
340 if (status == APR_SUCCESS) {
341 n = workers->nslots = workers->max_workers;
342 workers->slots = apr_pcalloc(workers->pool, n * sizeof(h2_slot));
343 if (workers->slots == NULL) {
347 for (i = 0; i < n; ++i) {
348 workers->slots[i].id = i;
351 if (status == APR_SUCCESS) {
352 /* we activate all for now, TODO: support min_workers again.
353 * do this in reverse for vanity reasons so slot 0 will most
354 * likely be at head of idle queue. */
355 n = workers->max_workers;
356 for (i = n-1; i >= 0; --i) {
357 status = activate_slot(workers, &workers->slots[i]);
359 /* the rest of the slots go on the free list */
360 for(i = n; i < workers->nslots; ++i) {
361 push_slot(&workers->free, &workers->slots[i]);
363 workers->dynamic = (workers->worker_count < workers->max_workers);
365 if (status == APR_SUCCESS) {
366 apr_pool_pre_cleanup_register(pool, workers, workers_pool_cleanup);
372 apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m)
374 apr_status_t status = h2_fifo_push(workers->mplxs, m);
375 wake_idle_worker(workers);
379 apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m)
381 return h2_fifo_remove(workers->mplxs, m);