]> granicus.if.org Git - apache/blob - server/mpm/worker/fdqueue.c
add comment from event MPM's fdqueue ap_pop_pool() regarding
[apache] / server / mpm / worker / fdqueue.c
1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2  * contributor license agreements.  See the NOTICE file distributed with
3  * this work for additional information regarding copyright ownership.
4  * The ASF licenses this file to You under the Apache License, Version 2.0
5  * (the "License"); you may not use this file except in compliance with
6  * the License.  You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "fdqueue.h"
18 #include "apr_atomic.h"
19
20 typedef struct recycled_pool {
21     apr_pool_t *pool;
22     struct recycled_pool *next;
23 } recycled_pool;
24
25 struct fd_queue_info_t {
26     apr_uint32_t idlers;
27     apr_thread_mutex_t *idlers_mutex;
28     apr_thread_cond_t *wait_for_idler;
29     int terminated;
30     int max_idlers;
31     recycled_pool  *recycled_pools;
32 };
33
34 static apr_status_t queue_info_cleanup(void *data_)
35 {
36     fd_queue_info_t *qi = data_;
37     apr_thread_cond_destroy(qi->wait_for_idler);
38     apr_thread_mutex_destroy(qi->idlers_mutex);
39
40     /* Clean up any pools in the recycled list */
41     for (;;) {
42         struct recycled_pool *first_pool = qi->recycled_pools;
43         if (first_pool == NULL) {
44             break;
45         }
46         if (apr_atomic_casptr((volatile void**)&(qi->recycled_pools), first_pool->next,
47                               first_pool) == first_pool) {
48             apr_pool_destroy(first_pool->pool);
49         }
50     }
51
52     return APR_SUCCESS;
53 }
54
55 apr_status_t ap_queue_info_create(fd_queue_info_t **queue_info,
56                                   apr_pool_t *pool, int max_idlers)
57 {
58     apr_status_t rv;
59     fd_queue_info_t *qi;
60
61     qi = apr_pcalloc(pool, sizeof(*qi));
62
63     rv = apr_thread_mutex_create(&qi->idlers_mutex, APR_THREAD_MUTEX_DEFAULT,
64                                  pool);
65     if (rv != APR_SUCCESS) {
66         return rv;
67     }
68     rv = apr_thread_cond_create(&qi->wait_for_idler, pool);
69     if (rv != APR_SUCCESS) {
70         return rv;
71     }
72     qi->recycled_pools = NULL;
73     qi->max_idlers = max_idlers;
74     apr_pool_cleanup_register(pool, qi, queue_info_cleanup,
75                               apr_pool_cleanup_null);
76
77     *queue_info = qi;
78
79     return APR_SUCCESS;
80 }
81
82 apr_status_t ap_queue_info_set_idle(fd_queue_info_t *queue_info,
83                                     apr_pool_t *pool_to_recycle)
84 {
85     apr_status_t rv;
86     int prev_idlers;
87
88     /* If we have been given a pool to recycle, atomically link
89      * it into the queue_info's list of recycled pools
90      */
91     if (pool_to_recycle) {
92         struct recycled_pool *new_recycle;
93         new_recycle = (struct recycled_pool *)apr_palloc(pool_to_recycle,
94                                                          sizeof(*new_recycle));
95         new_recycle->pool = pool_to_recycle;
96         for (;;) {
97             /* Save queue_info->recycled_pool in local variable next because
98              * new_recycle->next can be changed after apr_atomic_casptr
99              * function call. For gory details see PR 44402.
100              */
101             struct recycled_pool *next = queue_info->recycled_pools;
102             new_recycle->next = next;
103             if (apr_atomic_casptr((volatile void**)&(queue_info->recycled_pools),
104                                   new_recycle, next) == next) {
105                 break;
106             }
107         }
108     }
109
110     /* Atomically increment the count of idle workers */
111     for (;;) {
112         prev_idlers = queue_info->idlers;
113         if (apr_atomic_cas32(&(queue_info->idlers), prev_idlers + 1,
114                              prev_idlers) == prev_idlers) {
115             break;
116         }
117     }
118
119     /* If this thread just made the idle worker count nonzero,
120      * wake up the listener. */
121     if (prev_idlers == 0) {
122         rv = apr_thread_mutex_lock(queue_info->idlers_mutex);
123         if (rv != APR_SUCCESS) {
124             return rv;
125         }
126         rv = apr_thread_cond_signal(queue_info->wait_for_idler);
127         if (rv != APR_SUCCESS) {
128             apr_thread_mutex_unlock(queue_info->idlers_mutex);
129             return rv;
130         }
131         rv = apr_thread_mutex_unlock(queue_info->idlers_mutex);
132         if (rv != APR_SUCCESS) {
133             return rv;
134         }
135     }
136
137     return APR_SUCCESS;
138 }
139
140 apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t *queue_info,
141                                           apr_pool_t **recycled_pool)
142 {
143     apr_status_t rv;
144
145     *recycled_pool = NULL;
146
147     /* Block if the count of idle workers is zero */
148     if (queue_info->idlers == 0) {
149         rv = apr_thread_mutex_lock(queue_info->idlers_mutex);
150         if (rv != APR_SUCCESS) {
151             return rv;
152         }
153         /* Re-check the idle worker count to guard against a
154          * race condition.  Now that we're in the mutex-protected
155          * region, one of two things may have happened:
156          *   - If the idle worker count is still zero, the
157          *     workers are all still busy, so it's safe to
158          *     block on a condition variable.
159          *   - If the idle worker count is nonzero, then a
160          *     worker has become idle since the first check
161          *     of queue_info->idlers above.  It's possible
162          *     that the worker has also signaled the condition
163          *     variable--and if so, the listener missed it
164          *     because it wasn't yet blocked on the condition
165          *     variable.  But if the idle worker count is
166          *     now nonzero, it's safe for this function to
167          *     return immediately.
168          */
169         if (queue_info->idlers == 0) {
170             rv = apr_thread_cond_wait(queue_info->wait_for_idler,
171                                   queue_info->idlers_mutex);
172             if (rv != APR_SUCCESS) {
173                 apr_status_t rv2;
174                 rv2 = apr_thread_mutex_unlock(queue_info->idlers_mutex);
175                 if (rv2 != APR_SUCCESS) {
176                     return rv2;
177                 }
178                 return rv;
179             }
180         }
181         rv = apr_thread_mutex_unlock(queue_info->idlers_mutex);
182         if (rv != APR_SUCCESS) {
183             return rv;
184         }
185     }
186
187     /* Atomically decrement the idle worker count */
188     apr_atomic_dec32(&(queue_info->idlers));
189
190     /* Atomically pop a pool from the recycled list */
191
192     /* This function is safe only as long as it is single threaded because
193      * it reaches into the queue and accesses "next" which can change.
194      * We are OK today because it is only called from the listener thread.
195      * cas-based pushes do not have the same limitation - any number can
196      * happen concurrently with a single cas-based pop.
197      */
198
199     for (;;) {
200         struct recycled_pool *first_pool = queue_info->recycled_pools;
201         if (first_pool == NULL) {
202             break;
203         }
204         if (apr_atomic_casptr((volatile void**)&(queue_info->recycled_pools), first_pool->next,
205                               first_pool) == first_pool) {
206             *recycled_pool = first_pool->pool;
207             break;
208         }
209     }
210
211     if (queue_info->terminated) {
212         return APR_EOF;
213     }
214     else {
215         return APR_SUCCESS;
216     }
217 }
218
219 apr_status_t ap_queue_info_term(fd_queue_info_t *queue_info)
220 {
221     apr_status_t rv;
222     rv = apr_thread_mutex_lock(queue_info->idlers_mutex);
223     if (rv != APR_SUCCESS) {
224         return rv;
225     }
226     queue_info->terminated = 1;
227     apr_thread_cond_broadcast(queue_info->wait_for_idler);
228     return apr_thread_mutex_unlock(queue_info->idlers_mutex);
229 }
230
231 /**
232  * Detects when the fd_queue_t is full. This utility function is expected
233  * to be called from within critical sections, and is not threadsafe.
234  */
235 #define ap_queue_full(queue) ((queue)->nelts == (queue)->bounds)
236
237 /**
238  * Detects when the fd_queue_t is empty. This utility function is expected
239  * to be called from within critical sections, and is not threadsafe.
240  */
241 #define ap_queue_empty(queue) ((queue)->nelts == 0)
242
243 /**
244  * Callback routine that is called to destroy this
245  * fd_queue_t when its pool is destroyed.
246  */
247 static apr_status_t ap_queue_destroy(void *data)
248 {
249     fd_queue_t *queue = data;
250
251     /* Ignore errors here, we can't do anything about them anyway.
252      * XXX: We should at least try to signal an error here, it is
253      * indicative of a programmer error. -aaron */
254     apr_thread_cond_destroy(queue->not_empty);
255     apr_thread_mutex_destroy(queue->one_big_mutex);
256
257     return APR_SUCCESS;
258 }
259
260 /**
261  * Initialize the fd_queue_t.
262  */
263 apr_status_t ap_queue_init(fd_queue_t *queue, int queue_capacity, apr_pool_t *a)
264 {
265     int i;
266     apr_status_t rv;
267
268     if ((rv = apr_thread_mutex_create(&queue->one_big_mutex,
269                                       APR_THREAD_MUTEX_DEFAULT, a)) != APR_SUCCESS) {
270         return rv;
271     }
272     if ((rv = apr_thread_cond_create(&queue->not_empty, a)) != APR_SUCCESS) {
273         return rv;
274     }
275
276     queue->data = apr_palloc(a, queue_capacity * sizeof(fd_queue_elem_t));
277     queue->bounds = queue_capacity;
278     queue->nelts = 0;
279
280     /* Set all the sockets in the queue to NULL */
281     for (i = 0; i < queue_capacity; ++i)
282         queue->data[i].sd = NULL;
283
284     apr_pool_cleanup_register(a, queue, ap_queue_destroy, apr_pool_cleanup_null);
285
286     return APR_SUCCESS;
287 }
288
289 /**
290  * Push a new socket onto the queue.
291  *
292  * precondition: ap_queue_info_wait_for_idler has already been called
293  *               to reserve an idle worker thread
294  */
295 apr_status_t ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t *p)
296 {
297     fd_queue_elem_t *elem;
298     apr_status_t rv;
299
300     if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
301         return rv;
302     }
303
304     AP_DEBUG_ASSERT(!queue->terminated);
305     AP_DEBUG_ASSERT(!ap_queue_full(queue));
306
307     elem = &queue->data[queue->nelts];
308     elem->sd = sd;
309     elem->p = p;
310     queue->nelts++;
311
312     apr_thread_cond_signal(queue->not_empty);
313
314     if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) {
315         return rv;
316     }
317
318     return APR_SUCCESS;
319 }
320
321 /**
322  * Retrieves the next available socket from the queue. If there are no
323  * sockets available, it will block until one becomes available.
324  * Once retrieved, the socket is placed into the address specified by
325  * 'sd'.
326  */
327 apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p)
328 {
329     fd_queue_elem_t *elem;
330     apr_status_t rv;
331
332     if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
333         return rv;
334     }
335
336     /* Keep waiting until we wake up and find that the queue is not empty. */
337     if (ap_queue_empty(queue)) {
338         if (!queue->terminated) {
339             apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex);
340         }
341         /* If we wake up and it's still empty, then we were interrupted */
342         if (ap_queue_empty(queue)) {
343             rv = apr_thread_mutex_unlock(queue->one_big_mutex);
344             if (rv != APR_SUCCESS) {
345                 return rv;
346             }
347             if (queue->terminated) {
348                 return APR_EOF; /* no more elements ever again */
349             }
350             else {
351                 return APR_EINTR;
352             }
353         }
354     }
355
356     elem = &queue->data[--queue->nelts];
357     *sd = elem->sd;
358     *p = elem->p;
359 #ifdef AP_DEBUG
360     elem->sd = NULL;
361     elem->p = NULL;
362 #endif /* AP_DEBUG */
363
364     rv = apr_thread_mutex_unlock(queue->one_big_mutex);
365     return rv;
366 }
367
368 apr_status_t ap_queue_interrupt_all(fd_queue_t *queue)
369 {
370     apr_status_t rv;
371
372     if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
373         return rv;
374     }
375     apr_thread_cond_broadcast(queue->not_empty);
376     return apr_thread_mutex_unlock(queue->one_big_mutex);
377 }
378
379 apr_status_t ap_queue_term(fd_queue_t *queue)
380 {
381     apr_status_t rv;
382
383     if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
384         return rv;
385     }
386     /* we must hold one_big_mutex when setting this... otherwise,
387      * we could end up setting it and waking everybody up just after a
388      * would-be popper checks it but right before they block
389      */
390     queue->terminated = 1;
391     if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) {
392         return rv;
393     }
394     return ap_queue_interrupt_all(queue);
395 }