]> granicus.if.org Git - apache/blob - modules/http2/h2_workers.c
e976df94cf6996f9b370c625570c475871a2334a
[apache] / modules / http2 / h2_workers.c
1 /* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <assert.h>
17 #include <apr_atomic.h>
18 #include <apr_thread_mutex.h>
19 #include <apr_thread_cond.h>
20
21 #include <mpm_common.h>
22 #include <httpd.h>
23 #include <http_core.h>
24 #include <http_log.h>
25
26 #include "h2.h"
27 #include "h2_private.h"
28 #include "h2_mplx.h"
29 #include "h2_task.h"
30 #include "h2_workers.h"
31 #include "h2_util.h"
32
33 typedef struct h2_slot h2_slot;
34 struct h2_slot {
35     int id;
36     h2_slot *next;
37     h2_workers *workers;
38     int aborted;
39     int sticks;
40     h2_task *task;
41     apr_thread_t *thread;
42     apr_thread_mutex_t *lock;
43     apr_thread_cond_t *not_idle;
44 };
45
46 static h2_slot *pop_slot(h2_slot **phead) 
47 {
48     /* Atomically pop a slot from the list */
49     for (;;) {
50         h2_slot *first = *phead;
51         if (first == NULL) {
52             return NULL;
53         }
54         if (apr_atomic_casptr((void*)phead, first->next, first) == first) {
55             first->next = NULL;
56             return first;
57         }
58     }
59 }
60
61 static void push_slot(h2_slot **phead, h2_slot *slot)
62 {
63     /* Atomically push a slot to the list */
64     ap_assert(!slot->next);
65     for (;;) {
66         h2_slot *next = slot->next = *phead;
67         if (apr_atomic_casptr((void*)phead, slot, next) == next) {
68             return;
69         }
70     }
71 }
72
73 static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx);
74
75 static apr_status_t activate_slot(h2_workers *workers, h2_slot *slot) 
76 {
77     apr_status_t status;
78     
79     slot->workers = workers;
80     slot->aborted = 0;
81     slot->task = NULL;
82
83     if (!slot->lock) {
84         status = apr_thread_mutex_create(&slot->lock,
85                                          APR_THREAD_MUTEX_DEFAULT,
86                                          workers->pool);
87         if (status != APR_SUCCESS) {
88             push_slot(&workers->free, slot);
89             return status;
90         }
91     }
92
93     if (!slot->not_idle) {
94         status = apr_thread_cond_create(&slot->not_idle, workers->pool);
95         if (status != APR_SUCCESS) {
96             push_slot(&workers->free, slot);
97             return status;
98         }
99     }
100     
101     ap_log_error(APLOG_MARK, APLOG_TRACE2, 0, workers->s,
102                  "h2_workers: new thread for slot %d", slot->id); 
103     /* thread will either immediately start work or add itself
104      * to the idle queue */
105     apr_thread_create(&slot->thread, workers->thread_attr, slot_run, slot, 
106                       workers->pool);
107     if (!slot->thread) {
108         push_slot(&workers->free, slot);
109         return APR_ENOMEM;
110     }
111     
112     apr_atomic_inc32(&workers->worker_count);
113     return APR_SUCCESS;
114 }
115
116 static apr_status_t add_worker(h2_workers *workers)
117 {
118     h2_slot *slot = pop_slot(&workers->free);
119     if (slot) {
120         return activate_slot(workers, slot);
121     }
122     return APR_EAGAIN;
123 }
124
125 static void wake_idle_worker(h2_workers *workers) 
126 {
127     h2_slot *slot = pop_slot(&workers->idle);
128     if (slot) {
129         apr_thread_mutex_lock(slot->lock);
130         apr_thread_cond_signal(slot->not_idle);
131         apr_thread_mutex_unlock(slot->lock);
132     }
133     else if (workers->dynamic) {
134         add_worker(workers);
135     }
136 }
137
138 static void cleanup_zombies(h2_workers *workers)
139 {
140     h2_slot *slot;
141     while ((slot = pop_slot(&workers->zombies))) {
142         if (slot->thread) {
143             apr_status_t status;
144             apr_thread_join(&status, slot->thread);
145             slot->thread = NULL;
146         }
147         apr_atomic_dec32(&workers->worker_count);
148         slot->next = NULL;
149         push_slot(&workers->free, slot);
150     }
151 }
152
153 static apr_status_t slot_pull_task(h2_slot *slot, h2_mplx *m)
154 {
155     apr_status_t rv;
156     
157     rv = h2_mplx_pop_task(m, &slot->task);
158     if (slot->task) {
159         /* Ok, we got something to give back to the worker for execution. 
160          * If we still have idle workers, we let the worker be sticky, 
161          * e.g. making it poll the task's h2_mplx instance for more work 
162          * before asking back here. */
163         slot->sticks = slot->workers->max_workers;
164         return rv;            
165     }
166     slot->sticks = 0;
167     return APR_EOF;
168 }
169
170 static h2_fifo_op_t mplx_peek(void *head, void *ctx)
171 {
172     h2_mplx *m = head;
173     h2_slot *slot = ctx;
174     
175     if (slot_pull_task(slot, m) == APR_EAGAIN) {
176         wake_idle_worker(slot->workers);
177         return H2_FIFO_OP_REPUSH;
178     } 
179     return H2_FIFO_OP_PULL;
180 }
181
182 /**
183  * Get the next task for the given worker. Will block until a task arrives
184  * or the max_wait timer expires and more than min workers exist.
185  */
186 static apr_status_t get_next(h2_slot *slot)
187 {
188     h2_workers *workers = slot->workers;
189     apr_status_t status;
190     
191     slot->task = NULL;
192     while (!slot->aborted) {
193         if (!slot->task) {
194             status = h2_fifo_try_peek(workers->mplxs, mplx_peek, slot);
195             if (status == APR_EOF) {
196                 return status;
197             }
198         }
199         
200         if (slot->task) {
201             return APR_SUCCESS;
202         }
203         
204         cleanup_zombies(workers);
205
206         apr_thread_mutex_lock(slot->lock);
207         push_slot(&workers->idle, slot);
208         apr_thread_cond_wait(slot->not_idle, slot->lock);
209         apr_thread_mutex_unlock(slot->lock);
210     }
211     return APR_EOF;
212 }
213
214 static void slot_done(h2_slot *slot)
215 {
216     push_slot(&(slot->workers->zombies), slot);
217 }
218
219
220 static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx)
221 {
222     h2_slot *slot = wctx;
223     
224     while (!slot->aborted) {
225
226         /* Get a h2_task from the mplxs queue. */
227         get_next(slot);
228         while (slot->task) {
229         
230             h2_task_do(slot->task, thread, slot->id);
231             
232             /* Report the task as done. If stickyness is left, offer the
233              * mplx the opportunity to give us back a new task right away.
234              */
235             if (!slot->aborted && (--slot->sticks > 0)) {
236                 h2_mplx_task_done(slot->task->mplx, slot->task, &slot->task);
237             }
238             else {
239                 h2_mplx_task_done(slot->task->mplx, slot->task, NULL);
240                 slot->task = NULL;
241             }
242         }
243     }
244
245     slot_done(slot);
246     return NULL;
247 }
248
249 static apr_status_t workers_pool_cleanup(void *data)
250 {
251     h2_workers *workers = data;
252     h2_slot *slot;
253     
254     if (!workers->aborted) {
255         workers->aborted = 1;
256         /* abort all idle slots */
257         for (;;) {
258             slot = pop_slot(&workers->idle);
259             if (slot) {
260                 apr_thread_mutex_lock(slot->lock);
261                 slot->aborted = 1;
262                 apr_thread_cond_signal(slot->not_idle);
263                 apr_thread_mutex_unlock(slot->lock);
264             }
265             else {
266                 break;
267             }
268         }
269
270         h2_fifo_term(workers->mplxs);
271         h2_fifo_interrupt(workers->mplxs);
272
273         cleanup_zombies(workers);
274     }
275     return APR_SUCCESS;
276 }
277
278 h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool,
279                               int min_workers, int max_workers,
280                               int idle_secs)
281 {
282     apr_status_t status;
283     h2_workers *workers;
284     apr_pool_t *pool;
285     int i, n;
286
287     ap_assert(s);
288     ap_assert(server_pool);
289
290     /* let's have our own pool that will be parent to all h2_worker
291      * instances we create. This happens in various threads, but always
292      * guarded by our lock. Without this pool, all subpool creations would
293      * happen on the pool handed to us, which we do not guard.
294      */
295     apr_pool_create(&pool, server_pool);
296     apr_pool_tag(pool, "h2_workers");
297     workers = apr_pcalloc(pool, sizeof(h2_workers));
298     if (!workers) {
299         return NULL;
300     }
301     
302     workers->s = s;
303     workers->pool = pool;
304     workers->min_workers = min_workers;
305     workers->max_workers = max_workers;
306     workers->max_idle_secs = (idle_secs > 0)? idle_secs : 10;
307
308     /* FIXME: the fifo set we use here has limited capacity. Once the
309      * set is full, connections with new requests do a wait. Unfortunately,
310      * we have optimizations in place there that makes such waiting "unfair"
311      * in the sense that it may take connections a looong time to get scheduled.
312      *
313      * Need to rewrite this to use one of our double-linked lists and a mutex
314      * to have unlimited capacity and fair scheduling.
315      *
316      * For now, we just make enough room to have many connections inside one
317      * process.
318      */
319     status = h2_fifo_set_create(&workers->mplxs, pool, 8 * 1024);
320     if (status != APR_SUCCESS) {
321         return NULL;
322     }
323     
324     status = apr_threadattr_create(&workers->thread_attr, workers->pool);
325     if (status != APR_SUCCESS) {
326         return NULL;
327     }
328     
329     if (ap_thread_stacksize != 0) {
330         apr_threadattr_stacksize_set(workers->thread_attr,
331                                      ap_thread_stacksize);
332         ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, s,
333                      "h2_workers: using stacksize=%ld", 
334                      (long)ap_thread_stacksize);
335     }
336     
337     status = apr_thread_mutex_create(&workers->lock,
338                                      APR_THREAD_MUTEX_DEFAULT,
339                                      workers->pool);
340     if (status == APR_SUCCESS) {        
341         n = workers->nslots = workers->max_workers;
342         workers->slots = apr_pcalloc(workers->pool, n * sizeof(h2_slot));
343         if (workers->slots == NULL) {
344             workers->nslots = 0;
345             status = APR_ENOMEM;
346         }
347         for (i = 0; i < n; ++i) {
348             workers->slots[i].id = i;
349         }
350     }
351     if (status == APR_SUCCESS) {
352         /* we activate all for now, TODO: support min_workers again.
353          * do this in reverse for vanity reasons so slot 0 will most
354          * likely be at head of idle queue. */
355         n = workers->max_workers;
356         for (i = n-1; i >= 0; --i) {
357             status = activate_slot(workers, &workers->slots[i]);
358         }
359         /* the rest of the slots go on the free list */
360         for(i = n; i < workers->nslots; ++i) {
361             push_slot(&workers->free, &workers->slots[i]);
362         }
363         workers->dynamic = (workers->worker_count < workers->max_workers);
364     }
365     if (status == APR_SUCCESS) {
366         apr_pool_pre_cleanup_register(pool, workers, workers_pool_cleanup);    
367         return workers;
368     }
369     return NULL;
370 }
371
372 apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m)
373 {
374     apr_status_t status = h2_fifo_push(workers->mplxs, m);
375     wake_idle_worker(workers);
376     return status;
377 }
378
379 apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m)
380 {
381     return h2_fifo_remove(workers->mplxs, m);
382 }