]> granicus.if.org Git - apache/blob - server/mpm/experimental/event/fdqueue.c
Silence compiler warnings in the same manner as for the
[apache] / server / mpm / experimental / event / fdqueue.c
1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2  * contributor license agreements.  See the NOTICE file distributed with
3  * this work for additional information regarding copyright ownership.
4  * The ASF licenses this file to You under the Apache License, Version 2.0
5  * (the "License"); you may not use this file except in compliance with
6  * the License.  You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "fdqueue.h"
18 #include "apr_atomic.h"
19
20 typedef struct recycled_pool
21 {
22     apr_pool_t *pool;
23     struct recycled_pool *next;
24 } recycled_pool;
25
26 struct fd_queue_info_t
27 {
28     apr_int32_t idlers;      /**
29                                   * 0 or positive: number of idle worker threads
30                               * negative: number of threads blocked waiting
31                               *           for an idle worker
32                               */
33     apr_thread_mutex_t *idlers_mutex;
34     apr_thread_cond_t *wait_for_idler;
35     int terminated;
36     int max_idlers;
37     recycled_pool *recycled_pools;
38 };
39
40 static apr_status_t queue_info_cleanup(void *data_)
41 {
42     fd_queue_info_t *qi = data_;
43     apr_thread_cond_destroy(qi->wait_for_idler);
44     apr_thread_mutex_destroy(qi->idlers_mutex);
45
46     /* Clean up any pools in the recycled list */
47     for (;;) {
48         struct recycled_pool *first_pool = qi->recycled_pools;
49         if (first_pool == NULL) {
50             break;
51         }
52         if (apr_atomic_casptr
53             ((void*) &(qi->recycled_pools), first_pool->next,
54              first_pool) == first_pool) {
55             apr_pool_destroy(first_pool->pool);
56         }
57     }
58
59     return APR_SUCCESS;
60 }
61
62 apr_status_t ap_queue_info_create(fd_queue_info_t ** queue_info,
63                                   apr_pool_t * pool, int max_idlers)
64 {
65     apr_status_t rv;
66     fd_queue_info_t *qi;
67
68     qi = apr_pcalloc(pool, sizeof(*qi));
69
70     rv = apr_thread_mutex_create(&qi->idlers_mutex, APR_THREAD_MUTEX_DEFAULT,
71                                  pool);
72     if (rv != APR_SUCCESS) {
73         return rv;
74     }
75     rv = apr_thread_cond_create(&qi->wait_for_idler, pool);
76     if (rv != APR_SUCCESS) {
77         return rv;
78     }
79     qi->recycled_pools = NULL;
80     qi->max_idlers = max_idlers;
81     apr_pool_cleanup_register(pool, qi, queue_info_cleanup,
82                               apr_pool_cleanup_null);
83
84     *queue_info = qi;
85
86     return APR_SUCCESS;
87 }
88
89 apr_status_t ap_queue_info_set_idle(fd_queue_info_t * queue_info,
90                                     apr_pool_t * pool_to_recycle)
91 {
92     apr_status_t rv;
93     int prev_idlers;
94
95     ap_push_pool(queue_info, pool_to_recycle);
96
97     /* Atomically increment the count of idle workers */
98     /*
99      * TODO: The atomics expect unsigned whereas we're using signed.
100      *       Need to double check that they work as expected or else
101      *       rework how we determine blocked.
102      */
103     prev_idlers = apr_atomic_inc32((apr_uint32_t *)&(queue_info->idlers));
104
105     /* If other threads are waiting on a worker, wake one up */
106     if (prev_idlers < 0) {
107         rv = apr_thread_mutex_lock(queue_info->idlers_mutex);
108         if (rv != APR_SUCCESS) {
109             AP_DEBUG_ASSERT(0);
110             return rv;
111         }
112         rv = apr_thread_cond_signal(queue_info->wait_for_idler);
113         if (rv != APR_SUCCESS) {
114             apr_thread_mutex_unlock(queue_info->idlers_mutex);
115             return rv;
116         }
117         rv = apr_thread_mutex_unlock(queue_info->idlers_mutex);
118         if (rv != APR_SUCCESS) {
119             return rv;
120         }
121     }
122
123     return APR_SUCCESS;
124 }
125
126 apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t * queue_info)
127 {
128     apr_status_t rv;
129     int prev_idlers;
130
131     /* Atomically decrement the idle worker count, saving the old value */
132     /* See TODO in ap_queue_info_set_idle() */
133     prev_idlers = apr_atomic_add32((apr_uint32_t *)&(queue_info->idlers), -1);
134
135     /* Block if there weren't any idle workers */
136     if (prev_idlers <= 0) {
137         rv = apr_thread_mutex_lock(queue_info->idlers_mutex);
138         if (rv != APR_SUCCESS) {
139             AP_DEBUG_ASSERT(0);
140             /* See TODO in ap_queue_info_set_idle() */
141             apr_atomic_inc32((apr_uint32_t *)&(queue_info->idlers));    /* back out dec */
142             return rv;
143         }
144         /* Re-check the idle worker count to guard against a
145          * race condition.  Now that we're in the mutex-protected
146          * region, one of two things may have happened:
147          *   - If the idle worker count is still negative, the
148          *     workers are all still busy, so it's safe to
149          *     block on a condition variable.
150          *   - If the idle worker count is non-negative, then a
151          *     worker has become idle since the first check
152          *     of queue_info->idlers above.  It's possible
153          *     that the worker has also signaled the condition
154          *     variable--and if so, the listener missed it
155          *     because it wasn't yet blocked on the condition
156          *     variable.  But if the idle worker count is
157          *     now non-negative, it's safe for this function to
158          *     return immediately.
159          *
160          *     A negative value in queue_info->idlers tells how many
161          *     threads are waiting on an idle worker.
162          */
163         if (queue_info->idlers < 0) {
164             rv = apr_thread_cond_wait(queue_info->wait_for_idler,
165                                       queue_info->idlers_mutex);
166             if (rv != APR_SUCCESS) {
167                 apr_status_t rv2;
168                 AP_DEBUG_ASSERT(0);
169                 rv2 = apr_thread_mutex_unlock(queue_info->idlers_mutex);
170                 if (rv2 != APR_SUCCESS) {
171                     return rv2;
172                 }
173                 return rv;
174             }
175         }
176         rv = apr_thread_mutex_unlock(queue_info->idlers_mutex);
177         if (rv != APR_SUCCESS) {
178             return rv;
179         }
180     }
181
182     if (queue_info->terminated) {
183         return APR_EOF;
184     }
185     else {
186         return APR_SUCCESS;
187     }
188 }
189
190
191 void ap_push_pool(fd_queue_info_t * queue_info,
192                                     apr_pool_t * pool_to_recycle)
193 {
194     /* If we have been given a pool to recycle, atomically link
195      * it into the queue_info's list of recycled pools
196      */
197     if (pool_to_recycle) {
198         struct recycled_pool *new_recycle;
199         new_recycle = (struct recycled_pool *) apr_palloc(pool_to_recycle,
200                                                           sizeof
201                                                           (*new_recycle));
202         new_recycle->pool = pool_to_recycle;
203         for (;;) {
204             /*
205              * Save queue_info->recycled_pool in local variable next because
206              * new_recycle->next can be changed after apr_atomic_casptr
207              * function call. For gory details see PR 44402.
208              */
209             struct recycled_pool *next = queue_info->recycled_pools;
210             new_recycle->next = next;
211             if (apr_atomic_casptr
212                 ((void*) &(queue_info->recycled_pools),
213                  new_recycle, next) == next) {
214                 break;
215             }
216         }
217     }
218 }
219
220 void ap_pop_pool(apr_pool_t ** recycled_pool, fd_queue_info_t * queue_info)
221 {
222     /* Atomically pop a pool from the recycled list */
223
224     /* This function is safe only as long as it is single threaded because
225      * it reaches into the queue and accesses "next" which can change.
226      * We are OK today because it is only called from the listener thread.
227      * cas-based pushes do not have the same limitation - any number can
228      * happen concurrently with a single cas-based pop.
229      */
230
231     *recycled_pool = NULL;
232
233
234     /* Atomically pop a pool from the recycled list */
235     for (;;) {
236         struct recycled_pool *first_pool = queue_info->recycled_pools;
237         if (first_pool == NULL) {
238             break;
239         }
240         if (apr_atomic_casptr
241             ((void*) &(queue_info->recycled_pools),
242              first_pool->next, first_pool) == first_pool) {
243             *recycled_pool = first_pool->pool;
244             break;
245         }
246     }
247 }
248
249 apr_status_t ap_queue_info_term(fd_queue_info_t * queue_info)
250 {
251     apr_status_t rv;
252     rv = apr_thread_mutex_lock(queue_info->idlers_mutex);
253     if (rv != APR_SUCCESS) {
254         return rv;
255     }
256     queue_info->terminated = 1;
257     apr_thread_cond_broadcast(queue_info->wait_for_idler);
258     return apr_thread_mutex_unlock(queue_info->idlers_mutex);
259 }
260
261 /**
262  * Detects when the fd_queue_t is full. This utility function is expected
263  * to be called from within critical sections, and is not threadsafe.
264  */
265 #define ap_queue_full(queue) ((queue)->nelts == (queue)->bounds)
266
267 /**
268  * Detects when the fd_queue_t is empty. This utility function is expected
269  * to be called from within critical sections, and is not threadsafe.
270  */
271 #define ap_queue_empty(queue) ((queue)->nelts == 0 && APR_RING_EMPTY(&queue->timers ,timer_event_t, link))
272
273 /**
274  * Callback routine that is called to destroy this
275  * fd_queue_t when its pool is destroyed.
276  */
277 static apr_status_t ap_queue_destroy(void *data)
278 {
279     fd_queue_t *queue = data;
280
281     /* Ignore errors here, we can't do anything about them anyway.
282      * XXX: We should at least try to signal an error here, it is
283      * indicative of a programmer error. -aaron */
284     apr_thread_cond_destroy(queue->not_empty);
285     apr_thread_mutex_destroy(queue->one_big_mutex);
286
287     return APR_SUCCESS;
288 }
289
290 /**
291  * Initialize the fd_queue_t.
292  */
293 apr_status_t ap_queue_init(fd_queue_t * queue, int queue_capacity,
294                            apr_pool_t * a)
295 {
296     int i;
297     apr_status_t rv;
298
299     if ((rv = apr_thread_mutex_create(&queue->one_big_mutex,
300                                       APR_THREAD_MUTEX_DEFAULT,
301                                       a)) != APR_SUCCESS) {
302         return rv;
303     }
304     if ((rv = apr_thread_cond_create(&queue->not_empty, a)) != APR_SUCCESS) {
305         return rv;
306     }
307
308     APR_RING_INIT(&queue->timers, timer_event_t, link);
309
310     queue->data = apr_palloc(a, queue_capacity * sizeof(fd_queue_elem_t));
311     queue->bounds = queue_capacity;
312     queue->nelts = 0;
313
314     /* Set all the sockets in the queue to NULL */
315     for (i = 0; i < queue_capacity; ++i)
316         queue->data[i].sd = NULL;
317
318     apr_pool_cleanup_register(a, queue, ap_queue_destroy,
319                               apr_pool_cleanup_null);
320
321     return APR_SUCCESS;
322 }
323
324 /**
325  * Push a new socket onto the queue.
326  *
327  * precondition: ap_queue_info_wait_for_idler has already been called
328  *               to reserve an idle worker thread
329  */
330 apr_status_t ap_queue_push(fd_queue_t * queue, apr_socket_t * sd,
331                            conn_state_t * cs, apr_pool_t * p)
332 {
333     fd_queue_elem_t *elem;
334     apr_status_t rv;
335
336     if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
337         return rv;
338     }
339
340     AP_DEBUG_ASSERT(!queue->terminated);
341     AP_DEBUG_ASSERT(!ap_queue_full(queue));
342
343     elem = &queue->data[queue->nelts];
344     elem->sd = sd;
345     elem->cs = cs;
346     elem->p = p;
347     queue->nelts++;
348
349     apr_thread_cond_signal(queue->not_empty);
350
351     if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) {
352         return rv;
353     }
354
355     return APR_SUCCESS;
356 }
357
358 apr_status_t ap_queue_push_timer(fd_queue_t * queue, timer_event_t *te)
359 {
360     apr_status_t rv;
361     
362     if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
363         return rv;
364     }
365     
366     AP_DEBUG_ASSERT(!queue->terminated);
367
368     APR_RING_INSERT_TAIL(&queue->timers, te, timer_event_t, link);
369
370     apr_thread_cond_signal(queue->not_empty);
371     
372     if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) {
373         return rv;
374     }
375     
376     return APR_SUCCESS;
377 }
378
379 /**
380  * Retrieves the next available socket from the queue. If there are no
381  * sockets available, it will block until one becomes available.
382  * Once retrieved, the socket is placed into the address specified by
383  * 'sd'.
384  */
385 apr_status_t ap_queue_pop_something(fd_queue_t * queue, apr_socket_t ** sd,
386                                     conn_state_t ** cs, apr_pool_t ** p,
387                                     timer_event_t ** te_out)
388 {
389     fd_queue_elem_t *elem;
390     apr_status_t rv;
391
392     if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
393         return rv;
394     }
395
396     /* Keep waiting until we wake up and find that the queue is not empty. */
397     if (ap_queue_empty(queue)) {
398         if (!queue->terminated) {
399             apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex);
400         }
401         /* If we wake up and it's still empty, then we were interrupted */
402         if (ap_queue_empty(queue)) {
403             rv = apr_thread_mutex_unlock(queue->one_big_mutex);
404             if (rv != APR_SUCCESS) {
405                 return rv;
406             }
407             if (queue->terminated) {
408                 return APR_EOF; /* no more elements ever again */
409             }
410             else {
411                 return APR_EINTR;
412             }
413         }
414     }
415
416     *te_out = NULL;
417
418     if (!APR_RING_EMPTY(&queue->timers, timer_event_t, link)) {
419         *te_out = APR_RING_FIRST(&queue->timers);
420         APR_RING_REMOVE(*te_out, link);
421     }
422     else {
423         elem = &queue->data[--queue->nelts];
424         *sd = elem->sd;
425         *cs = elem->cs;
426         *p = elem->p;
427 #ifdef AP_DEBUG
428         elem->sd = NULL;
429         elem->p = NULL;
430 #endif /* AP_DEBUG */
431     }
432     
433     rv = apr_thread_mutex_unlock(queue->one_big_mutex);
434     return rv;
435 }
436
437 apr_status_t ap_queue_interrupt_all(fd_queue_t * queue)
438 {
439     apr_status_t rv;
440
441     if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
442         return rv;
443     }
444     apr_thread_cond_broadcast(queue->not_empty);
445     return apr_thread_mutex_unlock(queue->one_big_mutex);
446 }
447
448 apr_status_t ap_queue_term(fd_queue_t * queue)
449 {
450     apr_status_t rv;
451
452     if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
453         return rv;
454     }
455     /* we must hold one_big_mutex when setting this... otherwise,
456      * we could end up setting it and waking everybody up just after a
457      * would-be popper checks it but right before they block
458      */
459     queue->terminated = 1;
460     if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) {
461         return rv;
462     }
463     return ap_queue_interrupt_all(queue);
464 }