1 /* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
17 #include <apr_atomic.h>
18 #include <apr_thread_mutex.h>
19 #include <apr_thread_cond.h>
21 #include <mpm_common.h>
23 #include <http_core.h>
27 #include "h2_private.h"
30 #include "h2_workers.h"
33 typedef struct h2_slot h2_slot;
42 apr_thread_cond_t *not_idle;
45 static h2_slot *pop_slot(h2_slot **phead)
47 /* Atomically pop a slot from the list */
49 h2_slot *first = *phead;
53 if (apr_atomic_casptr((void*)phead, first->next, first) == first) {
60 static void push_slot(h2_slot **phead, h2_slot *slot)
62 /* Atomically push a slot to the list */
63 ap_assert(!slot->next);
65 h2_slot *next = slot->next = *phead;
66 if (apr_atomic_casptr((void*)phead, slot, next) == next) {
72 static void wake_idle_worker(h2_workers *workers)
74 h2_slot *slot = pop_slot(&workers->idle);
76 apr_thread_mutex_lock(workers->lock);
77 apr_thread_cond_signal(slot->not_idle);
78 apr_thread_mutex_unlock(workers->lock);
82 static void cleanup_zombies(h2_workers *workers)
85 while ((slot = pop_slot(&workers->zombies))) {
88 apr_thread_join(&status, slot->thread);
91 --workers->worker_count;
92 push_slot(&workers->free, slot);
96 static apr_status_t slot_pull_task(h2_slot *slot, h2_mplx *m)
99 slot->task = h2_mplx_pop_task(m, &has_more);
101 /* Ok, we got something to give back to the worker for execution.
102 * If we still have idle workers, we let the worker be sticky,
103 * e.g. making it poll the task's h2_mplx instance for more work
104 * before asking back here. */
105 slot->sticks = slot->workers->max_workers;
106 return has_more? APR_EAGAIN : APR_SUCCESS;
112 static h2_fifo_op_t mplx_peek(void *head, void *ctx)
117 if (slot_pull_task(slot, m) == APR_EAGAIN) {
118 wake_idle_worker(slot->workers);
119 return H2_FIFO_OP_REPUSH;
121 return H2_FIFO_OP_PULL;
125 * Get the next task for the given worker. Will block until a task arrives
126 * or the max_wait timer expires and more than min workers exist.
128 static apr_status_t get_next(h2_slot *slot)
130 h2_workers *workers = slot->workers;
134 while (!slot->aborted) {
136 status = h2_fifo_try_peek(workers->mplxs, mplx_peek, slot);
143 apr_thread_mutex_lock(workers->lock);
144 ++workers->idle_workers;
145 cleanup_zombies(workers);
146 if (slot->next == NULL) {
147 push_slot(&workers->idle, slot);
149 apr_thread_cond_wait(slot->not_idle, workers->lock);
150 apr_thread_mutex_unlock(workers->lock);
155 static void slot_done(h2_slot *slot)
157 push_slot(&(slot->workers->zombies), slot);
161 static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx)
163 h2_slot *slot = wctx;
165 while (!slot->aborted) {
167 /* Get a h2_task from the mplxs queue. */
171 h2_task_do(slot->task, thread, slot->id);
173 /* Report the task as done. If stickyness is left, offer the
174 * mplx the opportunity to give us back a new task right away.
176 if (!slot->aborted && (--slot->sticks > 0)) {
177 h2_mplx_task_done(slot->task->mplx, slot->task, &slot->task);
180 h2_mplx_task_done(slot->task->mplx, slot->task, NULL);
190 static apr_status_t activate_slot(h2_workers *workers)
192 h2_slot *slot = pop_slot(&workers->free);
196 slot->workers = workers;
199 if (!slot->not_idle) {
200 status = apr_thread_cond_create(&slot->not_idle, workers->pool);
201 if (status != APR_SUCCESS) {
202 push_slot(&workers->free, slot);
207 apr_thread_create(&slot->thread, workers->thread_attr, slot_run, slot,
210 push_slot(&workers->free, slot);
214 ++workers->worker_count;
220 static apr_status_t workers_pool_cleanup(void *data)
222 h2_workers *workers = data;
225 if (!workers->aborted) {
226 apr_thread_mutex_lock(workers->lock);
227 workers->aborted = 1;
228 /* before we go, cleanup any zombies and abort the rest */
229 cleanup_zombies(workers);
231 slot = pop_slot(&workers->idle);
234 apr_thread_cond_signal(slot->not_idle);
240 apr_thread_mutex_unlock(workers->lock);
242 h2_fifo_term(workers->mplxs);
243 h2_fifo_interrupt(workers->mplxs);
248 h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool,
249 int min_workers, int max_workers,
258 ap_assert(server_pool);
260 /* let's have our own pool that will be parent to all h2_worker
261 * instances we create. This happens in various threads, but always
262 * guarded by our lock. Without this pool, all subpool creations would
263 * happen on the pool handed to us, which we do not guard.
265 apr_pool_create(&pool, server_pool);
266 apr_pool_tag(pool, "h2_workers");
267 workers = apr_pcalloc(pool, sizeof(h2_workers));
273 workers->pool = pool;
274 workers->min_workers = min_workers;
275 workers->max_workers = max_workers;
276 workers->max_idle_secs = (idle_secs > 0)? idle_secs : 10;
278 status = h2_fifo_create(&workers->mplxs, pool, workers->max_workers);
279 if (status != APR_SUCCESS) {
283 status = apr_threadattr_create(&workers->thread_attr, workers->pool);
284 if (status != APR_SUCCESS) {
288 if (ap_thread_stacksize != 0) {
289 apr_threadattr_stacksize_set(workers->thread_attr,
290 ap_thread_stacksize);
291 ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, s,
292 "h2_workers: using stacksize=%ld",
293 (long)ap_thread_stacksize);
296 status = apr_thread_mutex_create(&workers->lock,
297 APR_THREAD_MUTEX_DEFAULT,
299 if (status == APR_SUCCESS) {
300 int n = workers->nslots = workers->max_workers;
301 workers->slots = apr_pcalloc(workers->pool, n * sizeof(h2_slot));
302 if (workers->slots == NULL) {
306 if (status == APR_SUCCESS) {
307 workers->free = &workers->slots[0];
308 for (i = 0; i < workers->nslots-1; ++i) {
309 workers->slots[i].next = &workers->slots[i+1];
310 workers->slots[i].id = i;
312 while (workers->worker_count < workers->max_workers
313 && status == APR_SUCCESS) {
314 status = activate_slot(workers);
317 if (status == APR_SUCCESS) {
318 apr_pool_pre_cleanup_register(pool, workers, workers_pool_cleanup);
324 apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m)
327 if ((status = h2_fifo_try_push(workers->mplxs, m)) != APR_SUCCESS) {
328 ap_log_error(APLOG_MARK, APLOG_TRACE3, status, workers->s,
329 "h2_workers: unable to push mplx(%ld)", m->id);
331 wake_idle_worker(workers);
335 apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m)
337 return h2_fifo_remove(workers->mplxs, m);