]> granicus.if.org Git - apache/blob - modules/http2/h2_workers.c
On the trunk:
[apache] / modules / http2 / h2_workers.c
1 /* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <assert.h>
17 #include <apr_atomic.h>
18 #include <apr_thread_mutex.h>
19 #include <apr_thread_cond.h>
20
21 #include <mpm_common.h>
22 #include <httpd.h>
23 #include <http_core.h>
24 #include <http_log.h>
25
26 #include "h2.h"
27 #include "h2_private.h"
28 #include "h2_mplx.h"
29 #include "h2_task.h"
30 #include "h2_workers.h"
31 #include "h2_util.h"
32
33 typedef struct h2_slot h2_slot;
34 struct h2_slot {
35     int id;
36     h2_slot *next;
37     h2_workers *workers;
38     int aborted;
39     int sticks;
40     h2_task *task;
41     apr_thread_t *thread;
42     apr_thread_cond_t *not_idle;
43 };
44
45 static h2_slot *pop_slot(h2_slot **phead) 
46 {
47     /* Atomically pop a slot from the list */
48     for (;;) {
49         h2_slot *first = *phead;
50         if (first == NULL) {
51             return NULL;
52         }
53         if (apr_atomic_casptr((void*)phead, first->next, first) == first) {
54             first->next = NULL;
55             return first;
56         }
57     }
58 }
59
60 static void push_slot(h2_slot **phead, h2_slot *slot)
61 {
62     /* Atomically push a slot to the list */
63     ap_assert(!slot->next);
64     for (;;) {
65         h2_slot *next = slot->next = *phead;
66         if (apr_atomic_casptr((void*)phead, slot, next) == next) {
67             return;
68         }
69     }
70 }
71
72 static void wake_idle_worker(h2_workers *workers) 
73 {
74     h2_slot *slot = pop_slot(&workers->idle);
75     if (slot) {
76         apr_thread_mutex_lock(workers->lock);
77         apr_thread_cond_signal(slot->not_idle);
78         apr_thread_mutex_unlock(workers->lock);
79     }
80 }
81
82 static void cleanup_zombies(h2_workers *workers)
83 {
84     h2_slot *slot;
85     while ((slot = pop_slot(&workers->zombies))) {
86         if (slot->thread) {
87             apr_status_t status;
88             apr_thread_join(&status, slot->thread);
89             slot->thread = NULL;
90         }
91         --workers->worker_count;
92         push_slot(&workers->free, slot);
93     }
94 }
95
96 static apr_status_t slot_pull_task(h2_slot *slot, h2_mplx *m)
97 {
98     int has_more;
99     slot->task = h2_mplx_pop_task(m, &has_more);
100     if (slot->task) {
101         /* Ok, we got something to give back to the worker for execution. 
102          * If we still have idle workers, we let the worker be sticky, 
103          * e.g. making it poll the task's h2_mplx instance for more work 
104          * before asking back here. */
105         slot->sticks = slot->workers->max_workers;
106         return has_more? APR_EAGAIN : APR_SUCCESS;            
107     }
108     slot->sticks = 0;
109     return APR_EOF;
110 }
111
112 static h2_fifo_op_t mplx_peek(void *head, void *ctx)
113 {
114     h2_mplx *m = head;
115     h2_slot *slot = ctx;
116     
117     if (slot_pull_task(slot, m) == APR_EAGAIN) {
118         wake_idle_worker(slot->workers);
119         return H2_FIFO_OP_REPUSH;
120     } 
121     return H2_FIFO_OP_PULL;
122 }
123
124 /**
125  * Get the next task for the given worker. Will block until a task arrives
126  * or the max_wait timer expires and more than min workers exist.
127  */
128 static apr_status_t get_next(h2_slot *slot)
129 {
130     h2_workers *workers = slot->workers;
131     apr_status_t status;
132     
133     slot->task = NULL;
134     while (!slot->aborted) {
135         if (!slot->task) {
136             status = h2_fifo_try_peek(workers->mplxs, mplx_peek, slot);
137         }
138         
139         if (slot->task) {
140             return APR_SUCCESS;
141         }
142         
143         apr_thread_mutex_lock(workers->lock);
144         ++workers->idle_workers;
145         cleanup_zombies(workers);
146         if (slot->next == NULL) {
147             push_slot(&workers->idle, slot);
148         }
149         apr_thread_cond_wait(slot->not_idle, workers->lock);
150         apr_thread_mutex_unlock(workers->lock);
151     }
152     return APR_EOF;
153 }
154
155 static void slot_done(h2_slot *slot)
156 {
157     push_slot(&(slot->workers->zombies), slot);
158 }
159
160
161 static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx)
162 {
163     h2_slot *slot = wctx;
164     
165     while (!slot->aborted) {
166
167         /* Get a h2_task from the mplxs queue. */
168         get_next(slot);
169         while (slot->task) {
170         
171             h2_task_do(slot->task, thread, slot->id);
172             
173             /* Report the task as done. If stickyness is left, offer the
174              * mplx the opportunity to give us back a new task right away.
175              */
176             if (!slot->aborted && (--slot->sticks > 0)) {
177                 h2_mplx_task_done(slot->task->mplx, slot->task, &slot->task);
178             }
179             else {
180                 h2_mplx_task_done(slot->task->mplx, slot->task, NULL);
181                 slot->task = NULL;
182             }
183         }
184     }
185
186     slot_done(slot);
187     return NULL;
188 }
189
190 static apr_status_t activate_slot(h2_workers *workers)
191 {
192     h2_slot *slot = pop_slot(&workers->free);
193     if (slot) {
194         apr_status_t status;
195         
196         slot->workers = workers;
197         slot->aborted = 0;
198         slot->task = NULL;
199         if (!slot->not_idle) {
200             status = apr_thread_cond_create(&slot->not_idle, workers->pool);
201             if (status != APR_SUCCESS) {
202                 push_slot(&workers->free, slot);
203                 return status;
204             }
205         }
206         
207         apr_thread_create(&slot->thread, workers->thread_attr, slot_run, slot, 
208                           workers->pool);
209         if (!slot->thread) {
210             push_slot(&workers->free, slot);
211             return APR_ENOMEM;
212         }
213
214         ++workers->worker_count;
215         return APR_SUCCESS;
216     }
217     return APR_EAGAIN;
218 }
219
220 static apr_status_t workers_pool_cleanup(void *data)
221 {
222     h2_workers *workers = data;
223     h2_slot *slot;
224     
225     if (!workers->aborted) {
226         apr_thread_mutex_lock(workers->lock);
227         workers->aborted = 1;
228         /* before we go, cleanup any zombies and abort the rest */
229         cleanup_zombies(workers);
230         for (;;) {
231             slot = pop_slot(&workers->idle);
232             if (slot) {
233                 slot->aborted = 1;
234                 apr_thread_cond_signal(slot->not_idle);
235             }
236             else {
237                 break;
238             }
239         }
240         apr_thread_mutex_unlock(workers->lock);
241
242         h2_fifo_term(workers->mplxs);
243         h2_fifo_interrupt(workers->mplxs);
244     }
245     return APR_SUCCESS;
246 }
247
248 h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool,
249                               int min_workers, int max_workers,
250                               int idle_secs)
251 {
252     apr_status_t status;
253     h2_workers *workers;
254     apr_pool_t *pool;
255     int i;
256
257     ap_assert(s);
258     ap_assert(server_pool);
259
260     /* let's have our own pool that will be parent to all h2_worker
261      * instances we create. This happens in various threads, but always
262      * guarded by our lock. Without this pool, all subpool creations would
263      * happen on the pool handed to us, which we do not guard.
264      */
265     apr_pool_create(&pool, server_pool);
266     apr_pool_tag(pool, "h2_workers");
267     workers = apr_pcalloc(pool, sizeof(h2_workers));
268     if (!workers) {
269         return NULL;
270     }
271     
272     workers->s = s;
273     workers->pool = pool;
274     workers->min_workers = min_workers;
275     workers->max_workers = max_workers;
276     workers->max_idle_secs = (idle_secs > 0)? idle_secs : 10;
277
278     status = h2_fifo_create(&workers->mplxs, pool, workers->max_workers);
279     if (status != APR_SUCCESS) {
280         return NULL;
281     }
282     
283     status = apr_threadattr_create(&workers->thread_attr, workers->pool);
284     if (status != APR_SUCCESS) {
285         return NULL;
286     }
287     
288     if (ap_thread_stacksize != 0) {
289         apr_threadattr_stacksize_set(workers->thread_attr,
290                                      ap_thread_stacksize);
291         ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, s,
292                      "h2_workers: using stacksize=%ld", 
293                      (long)ap_thread_stacksize);
294     }
295     
296     status = apr_thread_mutex_create(&workers->lock,
297                                      APR_THREAD_MUTEX_DEFAULT,
298                                      workers->pool);
299     if (status == APR_SUCCESS) {        
300         int n = workers->nslots = workers->max_workers;
301         workers->slots = apr_pcalloc(workers->pool, n * sizeof(h2_slot));
302         if (workers->slots == NULL) {
303             status = APR_ENOMEM;
304         }
305     }
306     if (status == APR_SUCCESS) {        
307         workers->free = &workers->slots[0];
308         for (i = 0; i < workers->nslots-1; ++i) {
309             workers->slots[i].next = &workers->slots[i+1];
310             workers->slots[i].id = i;
311         }
312         while (workers->worker_count < workers->max_workers 
313                && status == APR_SUCCESS) {
314             status = activate_slot(workers);
315         }
316     }
317     if (status == APR_SUCCESS) {
318         apr_pool_pre_cleanup_register(pool, workers, workers_pool_cleanup);    
319         return workers;
320     }
321     return NULL;
322 }
323
324 apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m)
325 {
326     apr_status_t status;
327     if ((status = h2_fifo_try_push(workers->mplxs, m)) != APR_SUCCESS) {
328         ap_log_error(APLOG_MARK, APLOG_TRACE3, status, workers->s,
329                      "h2_workers: unable to push mplx(%ld)", m->id);
330     } 
331     wake_idle_worker(workers);
332     return status;
333 }
334
335 apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m)
336 {
337     return h2_fifo_remove(workers->mplxs, m);
338 }