1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2 * contributor license agreements. See the NOTICE file distributed with
3 * this work for additional information regarding copyright ownership.
4 * The ASF licenses this file to You under the Apache License, Version 2.0
5 * (the "License"); you may not use this file except in compliance with
6 * the License. You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
18 #include "apr_atomic.h"
20 typedef struct recycled_pool {
22 struct recycled_pool *next;
25 struct fd_queue_info_t {
26 volatile apr_uint32_t idlers;
27 apr_thread_mutex_t *idlers_mutex;
28 apr_thread_cond_t *wait_for_idler;
31 recycled_pool *recycled_pools;
34 static apr_status_t queue_info_cleanup(void *data_)
36 fd_queue_info_t *qi = data_;
37 apr_thread_cond_destroy(qi->wait_for_idler);
38 apr_thread_mutex_destroy(qi->idlers_mutex);
40 /* Clean up any pools in the recycled list */
42 struct recycled_pool *first_pool = qi->recycled_pools;
43 if (first_pool == NULL) {
46 if (apr_atomic_casptr((void*)&(qi->recycled_pools), first_pool->next,
47 first_pool) == first_pool) {
48 apr_pool_destroy(first_pool->pool);
55 apr_status_t ap_queue_info_create(fd_queue_info_t **queue_info,
56 apr_pool_t *pool, int max_idlers)
61 qi = apr_pcalloc(pool, sizeof(*qi));
63 rv = apr_thread_mutex_create(&qi->idlers_mutex, APR_THREAD_MUTEX_DEFAULT,
65 if (rv != APR_SUCCESS) {
68 rv = apr_thread_cond_create(&qi->wait_for_idler, pool);
69 if (rv != APR_SUCCESS) {
72 qi->recycled_pools = NULL;
73 qi->max_idlers = max_idlers;
74 apr_pool_cleanup_register(pool, qi, queue_info_cleanup,
75 apr_pool_cleanup_null);
82 apr_status_t ap_queue_info_set_idle(fd_queue_info_t *queue_info,
83 apr_pool_t *pool_to_recycle)
87 /* If we have been given a pool to recycle, atomically link
88 * it into the queue_info's list of recycled pools
90 if (pool_to_recycle) {
91 struct recycled_pool *new_recycle;
92 new_recycle = (struct recycled_pool *)apr_palloc(pool_to_recycle,
93 sizeof(*new_recycle));
94 new_recycle->pool = pool_to_recycle;
96 /* Save queue_info->recycled_pool in local variable next because
97 * new_recycle->next can be changed after apr_atomic_casptr
98 * function call. For gory details see PR 44402.
100 struct recycled_pool *next = queue_info->recycled_pools;
101 new_recycle->next = next;
102 if (apr_atomic_casptr((void*)&(queue_info->recycled_pools),
103 new_recycle, next) == next) {
109 /* If this thread makes the idle worker count nonzero,
110 * wake up the listener. */
111 if (apr_atomic_inc32(&queue_info->idlers) == 0) {
112 rv = apr_thread_mutex_lock(queue_info->idlers_mutex);
113 if (rv != APR_SUCCESS) {
116 rv = apr_thread_cond_signal(queue_info->wait_for_idler);
117 if (rv != APR_SUCCESS) {
118 apr_thread_mutex_unlock(queue_info->idlers_mutex);
121 rv = apr_thread_mutex_unlock(queue_info->idlers_mutex);
122 if (rv != APR_SUCCESS) {
130 apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t *queue_info,
131 apr_pool_t **recycled_pool)
135 *recycled_pool = NULL;
137 /* Block if the count of idle workers is zero */
138 if (queue_info->idlers == 0) {
139 rv = apr_thread_mutex_lock(queue_info->idlers_mutex);
140 if (rv != APR_SUCCESS) {
143 /* Re-check the idle worker count to guard against a
144 * race condition. Now that we're in the mutex-protected
145 * region, one of two things may have happened:
146 * - If the idle worker count is still zero, the
147 * workers are all still busy, so it's safe to
148 * block on a condition variable, BUT
149 * we need to check for idle worker count again
150 * when we are signaled since it can happen that
151 * we are signaled by a worker thread that went idle
152 * but received a context switch before it could
153 * tell us. If it does signal us later once it is on
154 * CPU again there might be no idle worker left.
156 * https://issues.apache.org/bugzilla/show_bug.cgi?id=45605#c4
157 * - If the idle worker count is nonzero, then a
158 * worker has become idle since the first check
159 * of queue_info->idlers above. It's possible
160 * that the worker has also signaled the condition
161 * variable--and if so, the listener missed it
162 * because it wasn't yet blocked on the condition
163 * variable. But if the idle worker count is
164 * now nonzero, it's safe for this function to
165 * return immediately.
167 while (queue_info->idlers == 0) {
168 rv = apr_thread_cond_wait(queue_info->wait_for_idler,
169 queue_info->idlers_mutex);
170 if (rv != APR_SUCCESS) {
172 rv2 = apr_thread_mutex_unlock(queue_info->idlers_mutex);
173 if (rv2 != APR_SUCCESS) {
179 rv = apr_thread_mutex_unlock(queue_info->idlers_mutex);
180 if (rv != APR_SUCCESS) {
185 /* Atomically decrement the idle worker count */
186 apr_atomic_dec32(&(queue_info->idlers));
188 /* Atomically pop a pool from the recycled list */
190 /* This function is safe only as long as it is single threaded because
191 * it reaches into the queue and accesses "next" which can change.
192 * We are OK today because it is only called from the listener thread.
193 * cas-based pushes do not have the same limitation - any number can
194 * happen concurrently with a single cas-based pop.
198 struct recycled_pool *first_pool = queue_info->recycled_pools;
199 if (first_pool == NULL) {
202 if (apr_atomic_casptr((void*)&(queue_info->recycled_pools), first_pool->next,
203 first_pool) == first_pool) {
204 *recycled_pool = first_pool->pool;
209 if (queue_info->terminated) {
217 apr_status_t ap_queue_info_term(fd_queue_info_t *queue_info)
220 rv = apr_thread_mutex_lock(queue_info->idlers_mutex);
221 if (rv != APR_SUCCESS) {
224 queue_info->terminated = 1;
225 apr_thread_cond_broadcast(queue_info->wait_for_idler);
226 return apr_thread_mutex_unlock(queue_info->idlers_mutex);
230 * Detects when the fd_queue_t is full. This utility function is expected
231 * to be called from within critical sections, and is not threadsafe.
233 #define ap_queue_full(queue) ((queue)->nelts == (queue)->bounds)
236 * Detects when the fd_queue_t is empty. This utility function is expected
237 * to be called from within critical sections, and is not threadsafe.
239 #define ap_queue_empty(queue) ((queue)->nelts == 0)
242 * Callback routine that is called to destroy this
243 * fd_queue_t when its pool is destroyed.
245 static apr_status_t ap_queue_destroy(void *data)
247 fd_queue_t *queue = data;
249 /* Ignore errors here, we can't do anything about them anyway.
250 * XXX: We should at least try to signal an error here, it is
251 * indicative of a programmer error. -aaron */
252 apr_thread_cond_destroy(queue->not_empty);
253 apr_thread_mutex_destroy(queue->one_big_mutex);
259 * Initialize the fd_queue_t.
261 apr_status_t ap_queue_init(fd_queue_t *queue, int queue_capacity, apr_pool_t *a)
266 if ((rv = apr_thread_mutex_create(&queue->one_big_mutex,
267 APR_THREAD_MUTEX_DEFAULT, a)) != APR_SUCCESS) {
270 if ((rv = apr_thread_cond_create(&queue->not_empty, a)) != APR_SUCCESS) {
274 queue->data = apr_palloc(a, queue_capacity * sizeof(fd_queue_elem_t));
275 queue->bounds = queue_capacity;
280 /* Set all the sockets in the queue to NULL */
281 for (i = 0; i < queue_capacity; ++i)
282 queue->data[i].sd = NULL;
284 apr_pool_cleanup_register(a, queue, ap_queue_destroy, apr_pool_cleanup_null);
290 * Push a new socket onto the queue.
292 * precondition: ap_queue_info_wait_for_idler has already been called
293 * to reserve an idle worker thread
295 apr_status_t ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t *p)
297 fd_queue_elem_t *elem;
300 if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
304 AP_DEBUG_ASSERT(!queue->terminated);
305 AP_DEBUG_ASSERT(!ap_queue_full(queue));
307 elem = &queue->data[queue->in];
309 if (queue->in >= queue->bounds)
310 queue->in -= queue->bounds;
315 apr_thread_cond_signal(queue->not_empty);
317 if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) {
325 * Retrieves the next available socket from the queue. If there are no
326 * sockets available, it will block until one becomes available.
327 * Once retrieved, the socket is placed into the address specified by
330 apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p)
332 fd_queue_elem_t *elem;
335 if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
339 /* Keep waiting until we wake up and find that the queue is not empty. */
340 if (ap_queue_empty(queue)) {
341 if (!queue->terminated) {
342 apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex);
344 /* If we wake up and it's still empty, then we were interrupted */
345 if (ap_queue_empty(queue)) {
346 rv = apr_thread_mutex_unlock(queue->one_big_mutex);
347 if (rv != APR_SUCCESS) {
350 if (queue->terminated) {
351 return APR_EOF; /* no more elements ever again */
359 elem = &queue->data[queue->out];
361 if (queue->out >= queue->bounds)
362 queue->out -= queue->bounds;
369 #endif /* AP_DEBUG */
371 rv = apr_thread_mutex_unlock(queue->one_big_mutex);
375 static apr_status_t queue_interrupt_all(fd_queue_t *queue, int term)
379 if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
382 /* we must hold one_big_mutex when setting this... otherwise,
383 * we could end up setting it and waking everybody up just after a
384 * would-be popper checks it but right before they block
387 queue->terminated = 1;
389 apr_thread_cond_broadcast(queue->not_empty);
390 return apr_thread_mutex_unlock(queue->one_big_mutex);
393 apr_status_t ap_queue_interrupt_all(fd_queue_t *queue)
395 return queue_interrupt_all(queue, 0);
398 apr_status_t ap_queue_term(fd_queue_t *queue)
400 return queue_interrupt_all(queue, 1);