1 /* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
17 #include <apr_atomic.h>
18 #include <apr_thread_mutex.h>
19 #include <apr_thread_cond.h>
21 #include <mpm_common.h>
23 #include <http_core.h>
26 #include "h2_private.h"
28 #include "h2_request.h"
29 #include "h2_task_queue.h"
30 #include "h2_worker.h"
31 #include "h2_workers.h"
34 static int in_list(h2_workers *workers, h2_mplx *m)
37 for (e = H2_MPLX_LIST_FIRST(&workers->mplxs);
38 e != H2_MPLX_LIST_SENTINEL(&workers->mplxs);
39 e = H2_MPLX_NEXT(e)) {
47 static void cleanup_zombies(h2_workers *workers, int lock)
50 apr_thread_mutex_lock(workers->lock);
52 while (!H2_WORKER_LIST_EMPTY(&workers->zombies)) {
53 h2_worker *zombie = H2_WORKER_LIST_FIRST(&workers->zombies);
54 H2_WORKER_REMOVE(zombie);
55 ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
56 "h2_workers: cleanup zombie %d", zombie->id);
57 h2_worker_destroy(zombie);
60 apr_thread_mutex_unlock(workers->lock);
65 * Get the next task for the given worker. Will block until a task arrives
66 * or the max_wait timer expires and more than min workers exist.
67 * The previous h2_mplx instance might be passed in and will be served
68 * with preference, since we can ask it for the next task without aquiring
69 * the h2_workers lock.
71 static apr_status_t get_mplx_next(h2_worker *worker, h2_mplx **pm,
72 const h2_request **preq, void *ctx)
75 apr_time_t max_wait, start_wait;
76 h2_workers *workers = (h2_workers *)ctx;
78 max_wait = apr_time_from_sec(apr_atomic_read32(&workers->max_idle_secs));
79 start_wait = apr_time_now();
81 status = apr_thread_mutex_lock(workers->lock);
82 if (status == APR_SUCCESS) {
83 const h2_request *req = NULL;
87 ++workers->idle_worker_count;
88 ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
89 "h2_worker(%d): looking for work", h2_worker_get_id(worker));
91 while (!req && !h2_worker_is_aborted(worker) && !workers->aborted) {
93 /* Get the next h2_mplx to process that has a task to hand out.
94 * If it does, place it at the end of the queu and return the
96 * If it (currently) has no tasks, remove it so that it needs
97 * to register again for scheduling.
98 * If we run out of h2_mplx in the queue, we need to wait for
99 * new mplx to arrive. Depending on how many workers do exist,
100 * we do a timed wait or block indefinitely.
103 while (!req && !H2_MPLX_LIST_EMPTY(&workers->mplxs)) {
104 m = H2_MPLX_LIST_FIRST(&workers->mplxs);
107 req = h2_mplx_pop_request(m, &has_more);
110 H2_MPLX_LIST_INSERT_TAIL(&workers->mplxs, m);
113 has_more = !H2_MPLX_LIST_EMPTY(&workers->mplxs);
120 /* Need to wait for a new mplx to arrive.
122 cleanup_zombies(workers, 0);
124 if (workers->worker_count > workers->min_size) {
125 apr_time_t now = apr_time_now();
126 if (now >= (start_wait + max_wait)) {
127 /* waited long enough without getting a task. */
128 if (workers->worker_count > workers->min_size) {
129 ap_log_error(APLOG_MARK, APLOG_TRACE3, 0,
131 "h2_workers: aborting idle worker");
132 h2_worker_abort(worker);
136 ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
137 "h2_worker(%d): waiting signal, "
138 "worker_count=%d", worker->id,
139 (int)workers->worker_count);
140 apr_thread_cond_timedwait(workers->mplx_added,
141 workers->lock, max_wait);
144 ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
145 "h2_worker(%d): waiting signal (eternal), "
146 "worker_count=%d", worker->id,
147 (int)workers->worker_count);
148 apr_thread_cond_wait(workers->mplx_added, workers->lock);
153 /* Here, we either have gotten task and mplx for the worker or
154 * needed to give up with more than enough workers.
157 ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
158 "h2_worker(%d): start request(%ld-%d)",
159 h2_worker_get_id(worker), m->id, req->id);
163 if (has_more && workers->idle_worker_count > 1) {
164 apr_thread_cond_signal(workers->mplx_added);
166 status = APR_SUCCESS;
172 --workers->idle_worker_count;
173 apr_thread_mutex_unlock(workers->lock);
179 static void worker_done(h2_worker *worker, void *ctx)
181 h2_workers *workers = (h2_workers *)ctx;
182 apr_status_t status = apr_thread_mutex_lock(workers->lock);
183 if (status == APR_SUCCESS) {
184 ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
185 "h2_worker(%d): done", h2_worker_get_id(worker));
186 H2_WORKER_REMOVE(worker);
187 --workers->worker_count;
188 H2_WORKER_LIST_INSERT_TAIL(&workers->zombies, worker);
190 apr_thread_mutex_unlock(workers->lock);
194 static apr_status_t add_worker(h2_workers *workers)
196 h2_worker *w = h2_worker_create(workers->next_worker_id++,
197 workers->pool, workers->thread_attr,
198 get_mplx_next, worker_done, workers);
202 ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
203 "h2_workers: adding worker(%d)", h2_worker_get_id(w));
204 ++workers->worker_count;
205 H2_WORKER_LIST_INSERT_TAIL(&workers->workers, w);
209 static apr_status_t h2_workers_start(h2_workers *workers)
211 apr_status_t status = apr_thread_mutex_lock(workers->lock);
212 if (status == APR_SUCCESS) {
213 ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
214 "h2_workers: starting");
216 while (workers->worker_count < workers->min_size
217 && status == APR_SUCCESS) {
218 status = add_worker(workers);
220 apr_thread_mutex_unlock(workers->lock);
225 h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool,
226 int min_size, int max_size,
227 apr_size_t max_tx_handles)
234 AP_DEBUG_ASSERT(server_pool);
236 /* let's have our own pool that will be parent to all h2_worker
237 * instances we create. This happens in various threads, but always
238 * guarded by our lock. Without this pool, all subpool creations would
239 * happen on the pool handed to us, which we do not guard.
241 apr_pool_create(&pool, server_pool);
242 workers = apr_pcalloc(pool, sizeof(h2_workers));
245 workers->pool = pool;
246 workers->min_size = min_size;
247 workers->max_size = max_size;
248 apr_atomic_set32(&workers->max_idle_secs, 10);
250 workers->max_tx_handles = max_tx_handles;
251 workers->spare_tx_handles = workers->max_tx_handles;
253 apr_threadattr_create(&workers->thread_attr, workers->pool);
254 if (ap_thread_stacksize != 0) {
255 apr_threadattr_stacksize_set(workers->thread_attr,
256 ap_thread_stacksize);
257 ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, s,
258 "h2_workers: using stacksize=%ld",
259 (long)ap_thread_stacksize);
262 APR_RING_INIT(&workers->workers, h2_worker, link);
263 APR_RING_INIT(&workers->zombies, h2_worker, link);
264 APR_RING_INIT(&workers->mplxs, h2_mplx, link);
266 status = apr_thread_mutex_create(&workers->lock,
267 APR_THREAD_MUTEX_DEFAULT,
269 if (status == APR_SUCCESS) {
270 status = apr_thread_cond_create(&workers->mplx_added, workers->pool);
273 if (status == APR_SUCCESS) {
274 status = apr_thread_mutex_create(&workers->tx_lock,
275 APR_THREAD_MUTEX_DEFAULT,
279 if (status == APR_SUCCESS) {
280 status = h2_workers_start(workers);
283 if (status != APR_SUCCESS) {
284 h2_workers_destroy(workers);
291 void h2_workers_destroy(h2_workers *workers)
293 /* before we go, cleanup any zombie workers that may have accumulated */
294 cleanup_zombies(workers, 1);
296 if (workers->mplx_added) {
297 apr_thread_cond_destroy(workers->mplx_added);
298 workers->mplx_added = NULL;
301 apr_thread_mutex_destroy(workers->lock);
302 workers->lock = NULL;
304 while (!H2_MPLX_LIST_EMPTY(&workers->mplxs)) {
305 h2_mplx *m = H2_MPLX_LIST_FIRST(&workers->mplxs);
308 while (!H2_WORKER_LIST_EMPTY(&workers->workers)) {
309 h2_worker *w = H2_WORKER_LIST_FIRST(&workers->workers);
313 apr_pool_destroy(workers->pool);
314 /* workers is gone */
318 apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m)
320 apr_status_t status = apr_thread_mutex_lock(workers->lock);
321 if (status == APR_SUCCESS) {
322 ap_log_error(APLOG_MARK, APLOG_TRACE3, status, workers->s,
323 "h2_workers: register mplx(%ld)", m->id);
324 if (in_list(workers, m)) {
328 H2_MPLX_LIST_INSERT_TAIL(&workers->mplxs, m);
329 status = APR_SUCCESS;
332 if (workers->idle_worker_count > 0) {
333 apr_thread_cond_signal(workers->mplx_added);
335 else if (workers->worker_count < workers->max_size) {
336 ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
337 "h2_workers: got %d worker, adding 1",
338 workers->worker_count);
342 /* cleanup any zombie workers that may have accumulated */
343 cleanup_zombies(workers, 0);
345 apr_thread_mutex_unlock(workers->lock);
350 apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m)
352 apr_status_t status = apr_thread_mutex_lock(workers->lock);
353 if (status == APR_SUCCESS) {
355 if (in_list(workers, m)) {
357 status = APR_SUCCESS;
359 /* cleanup any zombie workers that may have accumulated */
360 cleanup_zombies(workers, 0);
362 apr_thread_mutex_unlock(workers->lock);
367 void h2_workers_set_max_idle_secs(h2_workers *workers, int idle_secs)
369 if (idle_secs <= 0) {
370 ap_log_error(APLOG_MARK, APLOG_WARNING, 0, workers->s,
371 APLOGNO(02962) "h2_workers: max_worker_idle_sec value of %d"
372 " is not valid, ignored.", idle_secs);
375 apr_atomic_set32(&workers->max_idle_secs, idle_secs);
378 apr_size_t h2_workers_tx_reserve(h2_workers *workers, apr_size_t count)
380 apr_status_t status = apr_thread_mutex_lock(workers->tx_lock);
381 if (status == APR_SUCCESS) {
382 count = H2MIN(workers->spare_tx_handles, count);
383 workers->spare_tx_handles -= count;
384 ap_log_error(APLOG_MARK, APLOG_TRACE2, 0, workers->s,
385 "h2_workers: reserved %d tx handles, %d/%d left",
386 (int)count, (int)workers->spare_tx_handles,
387 (int)workers->max_tx_handles);
388 apr_thread_mutex_unlock(workers->tx_lock);
394 void h2_workers_tx_free(h2_workers *workers, apr_size_t count)
396 apr_status_t status = apr_thread_mutex_lock(workers->tx_lock);
397 if (status == APR_SUCCESS) {
398 workers->spare_tx_handles += count;
399 ap_log_error(APLOG_MARK, APLOG_TRACE2, 0, workers->s,
400 "h2_workers: freed %d tx handles, %d/%d left",
401 (int)count, (int)workers->spare_tx_handles,
402 (int)workers->max_tx_handles);
403 apr_thread_mutex_unlock(workers->tx_lock);