]> granicus.if.org Git - apache/blob - modules/http2/h2_workers.c
On the 2.4.x branch:
[apache] / modules / http2 / h2_workers.c
1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2  * contributor license agreements.  See the NOTICE file distributed with
3  * this work for additional information regarding copyright ownership.
4  * The ASF licenses this file to You under the Apache License, Version 2.0
5  * (the "License"); you may not use this file except in compliance with
6  * the License.  You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <assert.h>
18 #include <apr_atomic.h>
19 #include <apr_thread_mutex.h>
20 #include <apr_thread_cond.h>
21
22 #include <mpm_common.h>
23 #include <httpd.h>
24 #include <http_core.h>
25 #include <http_log.h>
26
27 #include "h2.h"
28 #include "h2_private.h"
29 #include "h2_mplx.h"
30 #include "h2_task.h"
31 #include "h2_workers.h"
32 #include "h2_util.h"
33
34 typedef struct h2_slot h2_slot;
35 struct h2_slot {
36     int id;
37     h2_slot *next;
38     h2_workers *workers;
39     int aborted;
40     int sticks;
41     h2_task *task;
42     apr_thread_t *thread;
43     apr_thread_mutex_t *lock;
44     apr_thread_cond_t *not_idle;
45 };
46
47 static h2_slot *pop_slot(h2_slot **phead) 
48 {
49     /* Atomically pop a slot from the list */
50     for (;;) {
51         h2_slot *first = *phead;
52         if (first == NULL) {
53             return NULL;
54         }
55         if (apr_atomic_casptr((void*)phead, first->next, first) == first) {
56             first->next = NULL;
57             return first;
58         }
59     }
60 }
61
62 static void push_slot(h2_slot **phead, h2_slot *slot)
63 {
64     /* Atomically push a slot to the list */
65     ap_assert(!slot->next);
66     for (;;) {
67         h2_slot *next = slot->next = *phead;
68         if (apr_atomic_casptr((void*)phead, slot, next) == next) {
69             return;
70         }
71     }
72 }
73
74 static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx);
75
76 static apr_status_t activate_slot(h2_workers *workers, h2_slot *slot) 
77 {
78     apr_status_t status;
79     
80     slot->workers = workers;
81     slot->aborted = 0;
82     slot->task = NULL;
83
84     if (!slot->lock) {
85         status = apr_thread_mutex_create(&slot->lock,
86                                          APR_THREAD_MUTEX_DEFAULT,
87                                          workers->pool);
88         if (status != APR_SUCCESS) {
89             push_slot(&workers->free, slot);
90             return status;
91         }
92     }
93
94     if (!slot->not_idle) {
95         status = apr_thread_cond_create(&slot->not_idle, workers->pool);
96         if (status != APR_SUCCESS) {
97             push_slot(&workers->free, slot);
98             return status;
99         }
100     }
101     
102     ap_log_error(APLOG_MARK, APLOG_TRACE2, 0, workers->s,
103                  "h2_workers: new thread for slot %d", slot->id); 
104     /* thread will either immediately start work or add itself
105      * to the idle queue */
106     apr_thread_create(&slot->thread, workers->thread_attr, slot_run, slot, 
107                       workers->pool);
108     if (!slot->thread) {
109         push_slot(&workers->free, slot);
110         return APR_ENOMEM;
111     }
112     
113     apr_atomic_inc32(&workers->worker_count);
114     return APR_SUCCESS;
115 }
116
117 static apr_status_t add_worker(h2_workers *workers)
118 {
119     h2_slot *slot = pop_slot(&workers->free);
120     if (slot) {
121         return activate_slot(workers, slot);
122     }
123     return APR_EAGAIN;
124 }
125
126 static void wake_idle_worker(h2_workers *workers) 
127 {
128     h2_slot *slot = pop_slot(&workers->idle);
129     if (slot) {
130         apr_thread_mutex_lock(slot->lock);
131         apr_thread_cond_signal(slot->not_idle);
132         apr_thread_mutex_unlock(slot->lock);
133     }
134     else if (workers->dynamic) {
135         add_worker(workers);
136     }
137 }
138
139 static void cleanup_zombies(h2_workers *workers)
140 {
141     h2_slot *slot;
142     while ((slot = pop_slot(&workers->zombies))) {
143         if (slot->thread) {
144             apr_status_t status;
145             apr_thread_join(&status, slot->thread);
146             slot->thread = NULL;
147         }
148         apr_atomic_dec32(&workers->worker_count);
149         slot->next = NULL;
150         push_slot(&workers->free, slot);
151     }
152 }
153
154 static apr_status_t slot_pull_task(h2_slot *slot, h2_mplx *m)
155 {
156     apr_status_t rv;
157     
158     rv = h2_mplx_pop_task(m, &slot->task);
159     if (slot->task) {
160         /* Ok, we got something to give back to the worker for execution. 
161          * If we still have idle workers, we let the worker be sticky, 
162          * e.g. making it poll the task's h2_mplx instance for more work 
163          * before asking back here. */
164         slot->sticks = slot->workers->max_workers;
165         return rv;            
166     }
167     slot->sticks = 0;
168     return APR_EOF;
169 }
170
171 static h2_fifo_op_t mplx_peek(void *head, void *ctx)
172 {
173     h2_mplx *m = head;
174     h2_slot *slot = ctx;
175     
176     if (slot_pull_task(slot, m) == APR_EAGAIN) {
177         wake_idle_worker(slot->workers);
178         return H2_FIFO_OP_REPUSH;
179     } 
180     return H2_FIFO_OP_PULL;
181 }
182
183 /**
184  * Get the next task for the given worker. Will block until a task arrives
185  * or the max_wait timer expires and more than min workers exist.
186  */
187 static apr_status_t get_next(h2_slot *slot)
188 {
189     h2_workers *workers = slot->workers;
190     apr_status_t status;
191     
192     slot->task = NULL;
193     while (!slot->aborted) {
194         if (!slot->task) {
195             status = h2_fifo_try_peek(workers->mplxs, mplx_peek, slot);
196             if (status == APR_EOF) {
197                 return status;
198             }
199         }
200         
201         if (slot->task) {
202             return APR_SUCCESS;
203         }
204         
205         cleanup_zombies(workers);
206
207         apr_thread_mutex_lock(slot->lock);
208         push_slot(&workers->idle, slot);
209         apr_thread_cond_wait(slot->not_idle, slot->lock);
210         apr_thread_mutex_unlock(slot->lock);
211     }
212     return APR_EOF;
213 }
214
215 static void slot_done(h2_slot *slot)
216 {
217     push_slot(&(slot->workers->zombies), slot);
218 }
219
220
221 static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx)
222 {
223     h2_slot *slot = wctx;
224     
225     while (!slot->aborted) {
226
227         /* Get a h2_task from the mplxs queue. */
228         get_next(slot);
229         while (slot->task) {
230         
231             h2_task_do(slot->task, thread, slot->id);
232             
233             /* Report the task as done. If stickyness is left, offer the
234              * mplx the opportunity to give us back a new task right away.
235              */
236             if (!slot->aborted && (--slot->sticks > 0)) {
237                 h2_mplx_task_done(slot->task->mplx, slot->task, &slot->task);
238             }
239             else {
240                 h2_mplx_task_done(slot->task->mplx, slot->task, NULL);
241                 slot->task = NULL;
242             }
243         }
244     }
245
246     slot_done(slot);
247     return NULL;
248 }
249
250 static apr_status_t workers_pool_cleanup(void *data)
251 {
252     h2_workers *workers = data;
253     h2_slot *slot;
254     
255     if (!workers->aborted) {
256         workers->aborted = 1;
257         /* abort all idle slots */
258         for (;;) {
259             slot = pop_slot(&workers->idle);
260             if (slot) {
261                 apr_thread_mutex_lock(slot->lock);
262                 slot->aborted = 1;
263                 apr_thread_cond_signal(slot->not_idle);
264                 apr_thread_mutex_unlock(slot->lock);
265             }
266             else {
267                 break;
268             }
269         }
270
271         h2_fifo_term(workers->mplxs);
272         h2_fifo_interrupt(workers->mplxs);
273
274         cleanup_zombies(workers);
275     }
276     return APR_SUCCESS;
277 }
278
279 h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool,
280                               int min_workers, int max_workers,
281                               int idle_secs)
282 {
283     apr_status_t status;
284     h2_workers *workers;
285     apr_pool_t *pool;
286     int i, n;
287
288     ap_assert(s);
289     ap_assert(server_pool);
290
291     /* let's have our own pool that will be parent to all h2_worker
292      * instances we create. This happens in various threads, but always
293      * guarded by our lock. Without this pool, all subpool creations would
294      * happen on the pool handed to us, which we do not guard.
295      */
296     apr_pool_create(&pool, server_pool);
297     apr_pool_tag(pool, "h2_workers");
298     workers = apr_pcalloc(pool, sizeof(h2_workers));
299     if (!workers) {
300         return NULL;
301     }
302     
303     workers->s = s;
304     workers->pool = pool;
305     workers->min_workers = min_workers;
306     workers->max_workers = max_workers;
307     workers->max_idle_secs = (idle_secs > 0)? idle_secs : 10;
308
309     /* FIXME: the fifo set we use here has limited capacity. Once the
310      * set is full, connections with new requests do a wait. Unfortunately,
311      * we have optimizations in place there that makes such waiting "unfair"
312      * in the sense that it may take connections a looong time to get scheduled.
313      *
314      * Need to rewrite this to use one of our double-linked lists and a mutex
315      * to have unlimited capacity and fair scheduling.
316      *
317      * For now, we just make enough room to have many connections inside one
318      * process.
319      */
320     status = h2_fifo_set_create(&workers->mplxs, pool, 8 * 1024);
321     if (status != APR_SUCCESS) {
322         return NULL;
323     }
324     
325     status = apr_threadattr_create(&workers->thread_attr, workers->pool);
326     if (status != APR_SUCCESS) {
327         return NULL;
328     }
329     
330     if (ap_thread_stacksize != 0) {
331         apr_threadattr_stacksize_set(workers->thread_attr,
332                                      ap_thread_stacksize);
333         ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, s,
334                      "h2_workers: using stacksize=%ld", 
335                      (long)ap_thread_stacksize);
336     }
337     
338     status = apr_thread_mutex_create(&workers->lock,
339                                      APR_THREAD_MUTEX_DEFAULT,
340                                      workers->pool);
341     if (status == APR_SUCCESS) {        
342         n = workers->nslots = workers->max_workers;
343         workers->slots = apr_pcalloc(workers->pool, n * sizeof(h2_slot));
344         if (workers->slots == NULL) {
345             workers->nslots = 0;
346             status = APR_ENOMEM;
347         }
348         for (i = 0; i < n; ++i) {
349             workers->slots[i].id = i;
350         }
351     }
352     if (status == APR_SUCCESS) {
353         /* we activate all for now, TODO: support min_workers again.
354          * do this in reverse for vanity reasons so slot 0 will most
355          * likely be at head of idle queue. */
356         n = workers->max_workers;
357         for (i = n-1; i >= 0; --i) {
358             status = activate_slot(workers, &workers->slots[i]);
359         }
360         /* the rest of the slots go on the free list */
361         for(i = n; i < workers->nslots; ++i) {
362             push_slot(&workers->free, &workers->slots[i]);
363         }
364         workers->dynamic = (workers->worker_count < workers->max_workers);
365     }
366     if (status == APR_SUCCESS) {
367         apr_pool_pre_cleanup_register(pool, workers, workers_pool_cleanup);    
368         return workers;
369     }
370     return NULL;
371 }
372
373 apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m)
374 {
375     apr_status_t status = h2_fifo_push(workers->mplxs, m);
376     wake_idle_worker(workers);
377     return status;
378 }
379
380 apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m)
381 {
382     return h2_fifo_remove(workers->mplxs, m);
383 }