]> granicus.if.org Git - apache/blob - modules/http2/h2_workers.c
f0020c81ebb83d4bb8ce06bd76803bbb9d82012e
[apache] / modules / http2 / h2_workers.c
1 /* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <assert.h>
17 #include <apr_atomic.h>
18 #include <apr_thread_mutex.h>
19 #include <apr_thread_cond.h>
20
21 #include <mpm_common.h>
22 #include <httpd.h>
23 #include <http_core.h>
24 #include <http_log.h>
25
26 #include "h2_private.h"
27 #include "h2_mplx.h"
28 #include "h2_request.h"
29 #include "h2_task_queue.h"
30 #include "h2_worker.h"
31 #include "h2_workers.h"
32
33
34 static int in_list(h2_workers *workers, h2_mplx *m)
35 {
36     h2_mplx *e;
37     for (e = H2_MPLX_LIST_FIRST(&workers->mplxs); 
38          e != H2_MPLX_LIST_SENTINEL(&workers->mplxs);
39          e = H2_MPLX_NEXT(e)) {
40         if (e == m) {
41             return 1;
42         }
43     }
44     return 0;
45 }
46
47 static void cleanup_zombies(h2_workers *workers, int lock)
48 {
49     if (lock) {
50         apr_thread_mutex_lock(workers->lock);
51     }
52     while (!H2_WORKER_LIST_EMPTY(&workers->zombies)) {
53         h2_worker *zombie = H2_WORKER_LIST_FIRST(&workers->zombies);
54         H2_WORKER_REMOVE(zombie);
55         ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
56                       "h2_workers: cleanup zombie %d", zombie->id);
57         h2_worker_destroy(zombie);
58     }
59     if (lock) {
60         apr_thread_mutex_unlock(workers->lock);
61     }
62 }
63
64 /**
65  * Get the next task for the given worker. Will block until a task arrives
66  * or the max_wait timer expires and more than min workers exist.
67  * The previous h2_mplx instance might be passed in and will be served
68  * with preference, since we can ask it for the next task without aquiring
69  * the h2_workers lock.
70  */
71 static apr_status_t get_mplx_next(h2_worker *worker, h2_mplx **pm, 
72                                   const h2_request **preq, void *ctx)
73 {
74     apr_status_t status;
75     apr_time_t max_wait, start_wait;
76     h2_workers *workers = (h2_workers *)ctx;
77     
78     max_wait = apr_time_from_sec(apr_atomic_read32(&workers->max_idle_secs));
79     start_wait = apr_time_now();
80     
81     status = apr_thread_mutex_lock(workers->lock);
82     if (status == APR_SUCCESS) {
83         const h2_request *req = NULL;
84         h2_mplx *m = NULL;
85         int has_more = 0;
86
87         ++workers->idle_worker_count;
88         ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
89                      "h2_worker(%d): looking for work", h2_worker_get_id(worker));
90         
91         while (!req && !h2_worker_is_aborted(worker) && !workers->aborted) {
92             
93             /* Get the next h2_mplx to process that has a task to hand out.
94              * If it does, place it at the end of the queu and return the
95              * task to the worker.
96              * If it (currently) has no tasks, remove it so that it needs
97              * to register again for scheduling.
98              * If we run out of h2_mplx in the queue, we need to wait for
99              * new mplx to arrive. Depending on how many workers do exist,
100              * we do a timed wait or block indefinitely.
101              */
102             m = NULL;
103             while (!req && !H2_MPLX_LIST_EMPTY(&workers->mplxs)) {
104                 m = H2_MPLX_LIST_FIRST(&workers->mplxs);
105                 H2_MPLX_REMOVE(m);
106                 
107                 req = h2_mplx_pop_request(m, &has_more);
108                 if (req) {
109                     if (has_more) {
110                         H2_MPLX_LIST_INSERT_TAIL(&workers->mplxs, m);
111                     }
112                     else {
113                         has_more = !H2_MPLX_LIST_EMPTY(&workers->mplxs);
114                     }
115                     break;
116                 }
117             }
118             
119             if (!req) {
120                 /* Need to wait for a new mplx to arrive.
121                  */
122                 cleanup_zombies(workers, 0);
123                 
124                 if (workers->worker_count > workers->min_size) {
125                     apr_time_t now = apr_time_now();
126                     if (now >= (start_wait + max_wait)) {
127                         /* waited long enough without getting a task. */
128                         if (workers->worker_count > workers->min_size) {
129                             ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, 
130                                          workers->s,
131                                          "h2_workers: aborting idle worker");
132                             h2_worker_abort(worker);
133                             break;
134                         }
135                     }
136                     ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
137                                  "h2_worker(%d): waiting signal, "
138                                  "worker_count=%d", worker->id, 
139                                  (int)workers->worker_count);
140                     apr_thread_cond_timedwait(workers->mplx_added,
141                                               workers->lock, max_wait);
142                 }
143                 else {
144                     ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
145                                  "h2_worker(%d): waiting signal (eternal), "
146                                  "worker_count=%d", worker->id, 
147                                  (int)workers->worker_count);
148                     apr_thread_cond_wait(workers->mplx_added, workers->lock);
149                 }
150             }
151         }
152         
153         /* Here, we either have gotten task and mplx for the worker or
154          * needed to give up with more than enough workers.
155          */
156         if (req) {
157             ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
158                          "h2_worker(%d): start request(%ld-%d)",
159                          h2_worker_get_id(worker), m->id, req->id);
160             *pm = m;
161             *preq = req;
162             
163             if (has_more && workers->idle_worker_count > 1) {
164                 apr_thread_cond_signal(workers->mplx_added);
165             }
166             status = APR_SUCCESS;
167         }
168         else {
169             status = APR_EOF;
170         }
171         
172         --workers->idle_worker_count;
173         apr_thread_mutex_unlock(workers->lock);
174     }
175     
176     return status;
177 }
178
179 static void worker_done(h2_worker *worker, void *ctx)
180 {
181     h2_workers *workers = (h2_workers *)ctx;
182     apr_status_t status = apr_thread_mutex_lock(workers->lock);
183     if (status == APR_SUCCESS) {
184         ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
185                      "h2_worker(%d): done", h2_worker_get_id(worker));
186         H2_WORKER_REMOVE(worker);
187         --workers->worker_count;
188         H2_WORKER_LIST_INSERT_TAIL(&workers->zombies, worker);
189         
190         apr_thread_mutex_unlock(workers->lock);
191     }
192 }
193
194 static apr_status_t add_worker(h2_workers *workers)
195 {
196     h2_worker *w = h2_worker_create(workers->next_worker_id++,
197                                     workers->pool, workers->thread_attr,
198                                     get_mplx_next, worker_done, workers);
199     if (!w) {
200         return APR_ENOMEM;
201     }
202     ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
203                  "h2_workers: adding worker(%d)", h2_worker_get_id(w));
204     ++workers->worker_count;
205     H2_WORKER_LIST_INSERT_TAIL(&workers->workers, w);
206     return APR_SUCCESS;
207 }
208
209 static apr_status_t h2_workers_start(h2_workers *workers)
210 {
211     apr_status_t status = apr_thread_mutex_lock(workers->lock);
212     if (status == APR_SUCCESS) {
213         ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
214                       "h2_workers: starting");
215
216         while (workers->worker_count < workers->min_size
217                && status == APR_SUCCESS) {
218             status = add_worker(workers);
219         }
220         apr_thread_mutex_unlock(workers->lock);
221     }
222     return status;
223 }
224
225 h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool,
226                               int min_size, int max_size,
227                               apr_size_t max_tx_handles)
228 {
229     apr_status_t status;
230     h2_workers *workers;
231     apr_pool_t *pool;
232
233     AP_DEBUG_ASSERT(s);
234     AP_DEBUG_ASSERT(server_pool);
235
236     /* let's have our own pool that will be parent to all h2_worker
237      * instances we create. This happens in various threads, but always
238      * guarded by our lock. Without this pool, all subpool creations would
239      * happen on the pool handed to us, which we do not guard.
240      */
241     apr_pool_create(&pool, server_pool);
242     workers = apr_pcalloc(pool, sizeof(h2_workers));
243     if (workers) {
244         workers->s = s;
245         workers->pool = pool;
246         workers->min_size = min_size;
247         workers->max_size = max_size;
248         apr_atomic_set32(&workers->max_idle_secs, 10);
249         
250         workers->max_tx_handles = max_tx_handles;
251         workers->spare_tx_handles = workers->max_tx_handles;
252         
253         apr_threadattr_create(&workers->thread_attr, workers->pool);
254         if (ap_thread_stacksize != 0) {
255             apr_threadattr_stacksize_set(workers->thread_attr,
256                                          ap_thread_stacksize);
257             ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, s,
258                          "h2_workers: using stacksize=%ld", 
259                          (long)ap_thread_stacksize);
260         }
261         
262         APR_RING_INIT(&workers->workers, h2_worker, link);
263         APR_RING_INIT(&workers->zombies, h2_worker, link);
264         APR_RING_INIT(&workers->mplxs, h2_mplx, link);
265         
266         status = apr_thread_mutex_create(&workers->lock,
267                                          APR_THREAD_MUTEX_DEFAULT,
268                                          workers->pool);
269         if (status == APR_SUCCESS) {
270             status = apr_thread_cond_create(&workers->mplx_added, workers->pool);
271         }
272         
273         if (status == APR_SUCCESS) {
274             status = apr_thread_mutex_create(&workers->tx_lock,
275                                              APR_THREAD_MUTEX_DEFAULT,
276                                              workers->pool);
277         }
278         
279         if (status == APR_SUCCESS) {
280             status = h2_workers_start(workers);
281         }
282         
283         if (status != APR_SUCCESS) {
284             h2_workers_destroy(workers);
285             workers = NULL;
286         }
287     }
288     return workers;
289 }
290
291 void h2_workers_destroy(h2_workers *workers)
292 {
293     /* before we go, cleanup any zombie workers that may have accumulated */
294     cleanup_zombies(workers, 1);
295     
296     if (workers->mplx_added) {
297         apr_thread_cond_destroy(workers->mplx_added);
298         workers->mplx_added = NULL;
299     }
300     if (workers->lock) {
301         apr_thread_mutex_destroy(workers->lock);
302         workers->lock = NULL;
303     }
304     while (!H2_MPLX_LIST_EMPTY(&workers->mplxs)) {
305         h2_mplx *m = H2_MPLX_LIST_FIRST(&workers->mplxs);
306         H2_MPLX_REMOVE(m);
307     }
308     while (!H2_WORKER_LIST_EMPTY(&workers->workers)) {
309         h2_worker *w = H2_WORKER_LIST_FIRST(&workers->workers);
310         H2_WORKER_REMOVE(w);
311     }
312     if (workers->pool) {
313         apr_pool_destroy(workers->pool);
314         /* workers is gone */
315     }
316 }
317
318 apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m)
319 {
320     apr_status_t status = apr_thread_mutex_lock(workers->lock);
321     if (status == APR_SUCCESS) {
322         ap_log_error(APLOG_MARK, APLOG_TRACE3, status, workers->s,
323                      "h2_workers: register mplx(%ld)", m->id);
324         if (in_list(workers, m)) {
325             status = APR_EAGAIN;
326         }
327         else {
328             H2_MPLX_LIST_INSERT_TAIL(&workers->mplxs, m);
329             status = APR_SUCCESS;
330         }
331         
332         if (workers->idle_worker_count > 0) { 
333             apr_thread_cond_signal(workers->mplx_added);
334         }
335         else if (workers->worker_count < workers->max_size) {
336             ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
337                          "h2_workers: got %d worker, adding 1", 
338                          workers->worker_count);
339             add_worker(workers);
340         }
341         
342         /* cleanup any zombie workers that may have accumulated */
343         cleanup_zombies(workers, 0);
344         
345         apr_thread_mutex_unlock(workers->lock);
346     }
347     return status;
348 }
349
350 apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m)
351 {
352     apr_status_t status = apr_thread_mutex_lock(workers->lock);
353     if (status == APR_SUCCESS) {
354         status = APR_EAGAIN;
355         if (in_list(workers, m)) {
356             H2_MPLX_REMOVE(m);
357             status = APR_SUCCESS;
358         }
359         /* cleanup any zombie workers that may have accumulated */
360         cleanup_zombies(workers, 0);
361         
362         apr_thread_mutex_unlock(workers->lock);
363     }
364     return status;
365 }
366
367 void h2_workers_set_max_idle_secs(h2_workers *workers, int idle_secs)
368 {
369     if (idle_secs <= 0) {
370         ap_log_error(APLOG_MARK, APLOG_WARNING, 0, workers->s,
371                      APLOGNO(02962) "h2_workers: max_worker_idle_sec value of %d"
372                      " is not valid, ignored.", idle_secs);
373         return;
374     }
375     apr_atomic_set32(&workers->max_idle_secs, idle_secs);
376 }
377
378 apr_size_t h2_workers_tx_reserve(h2_workers *workers, apr_size_t count)
379 {
380     apr_status_t status = apr_thread_mutex_lock(workers->tx_lock);
381     if (status == APR_SUCCESS) {
382         count = H2MIN(workers->spare_tx_handles, count);
383         workers->spare_tx_handles -= count;
384         ap_log_error(APLOG_MARK, APLOG_TRACE2, 0, workers->s,
385                      "h2_workers: reserved %d tx handles, %d/%d left", 
386                      (int)count, (int)workers->spare_tx_handles,
387                      (int)workers->max_tx_handles);
388         apr_thread_mutex_unlock(workers->tx_lock);
389         return count;
390     }
391     return 0;
392 }
393
394 void h2_workers_tx_free(h2_workers *workers, apr_size_t count)
395 {
396     apr_status_t status = apr_thread_mutex_lock(workers->tx_lock);
397     if (status == APR_SUCCESS) {
398         workers->spare_tx_handles += count;
399         ap_log_error(APLOG_MARK, APLOG_TRACE2, 0, workers->s,
400                      "h2_workers: freed %d tx handles, %d/%d left", 
401                      (int)count, (int)workers->spare_tx_handles,
402                      (int)workers->max_tx_handles);
403         apr_thread_mutex_unlock(workers->tx_lock);
404     }
405 }
406