static int server_limit = 0; /* ServerLimit */
static int thread_limit = 0; /* ThreadLimit */
static int had_healthy_child = 0;
-static int dying = 0;
-static int workers_may_exit = 0;
-static int start_thread_may_exit = 0;
-static int listener_may_exit = 0;
+static volatile int dying = 0;
+static volatile int workers_may_exit = 0;
+static volatile int start_thread_may_exit = 0;
+static volatile int listener_may_exit = 0;
static int listener_is_wakeable = 0; /* Pollset supports APR_POLLSET_WAKEABLE */
static int num_listensocks = 0;
static apr_int32_t conns_this_child; /* MaxConnectionsPerChild, only access
apr_time_t next_expiry;
APR_RING_INSERT_TAIL(&q->head, el, event_conn_state_t, timeout_list);
- apr_atomic_inc32(q->total);
+ ++*q->total;
++q->count;
/* Cheaply update the overall queues' next expiry according to the
{
APR_RING_REMOVE(el, timeout_list);
APR_RING_ELEM_INIT(el, timeout_list);
- apr_atomic_dec32(q->total);
+ --*q->total;
--q->count;
}
static void wakeup_listener(void)
{
listener_may_exit = 1;
+
+ /* Unblock the listener if it's poll()ing */
+ if (event_pollset && listener_is_wakeable) {
+ apr_pollset_wakeup(event_pollset);
+ }
+
+ /* unblock the listener if it's waiting for a worker */
+ if (worker_queue_info) {
+ ap_queue_info_term(worker_queue_info);
+ }
+
if (!listener_os_thread) {
/* XXX there is an obscure path that this doesn't handle perfectly:
* right after listener thread is created but before
*/
return;
}
-
- /* Unblock the listener if it's poll()ing */
- if (listener_is_wakeable) {
- apr_pollset_wakeup(event_pollset);
- }
-
- /* unblock the listener if it's waiting for a worker */
- ap_queue_info_term(worker_queue_info);
-
/*
* we should just be able to "kill(ap_my_pid, LISTENER_SIGNAL)" on all
* platforms and wake up the listener thread since it is the only thread
static void signal_threads(int mode)
{
- if (terminate_mode == mode) {
+ if (terminate_mode >= mode) {
return;
}
terminate_mode = mode;
struct timeout_queue *qp;
apr_status_t rv;
- if (!apr_atomic_read32(q->total)) {
+ if (!*q->total) {
return;
}
APR_RING_UNSPLICE(first, last, timeout_list);
APR_RING_SPLICE_TAIL(&trash, first, last, event_conn_state_t,
timeout_list);
- AP_DEBUG_ASSERT(apr_atomic_read32(q->total) >= count);
- apr_atomic_sub32(q->total, count);
+ AP_DEBUG_ASSERT(*q->total >= count && qp->count >= count);
+ *q->total -= count;
qp->count -= count;
total += count;
}
if (!timeout_time) {
ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
"All workers are busy or dying, will close %u "
- "keep-alive connections",
- apr_atomic_read32(keepalive_q->total));
+ "keep-alive connections", *keepalive_q->total);
}
process_timeout_queue(keepalive_q, timeout_time,
start_lingering_close_nonblocking);
timer_event_t *te;
const apr_pollfd_t *out_pfd;
apr_int32_t num = 0;
+ apr_uint32_t c_count, l_count, i_count;
apr_interval_time_t timeout_interval;
apr_time_t now, timeout_time;
int workers_were_busy = 0;
"keep-alive: %d lingering: %d suspended: %u)",
apr_atomic_read32(&connection_count),
apr_atomic_read32(&clogged_count),
- apr_atomic_read32(write_completion_q->total),
- apr_atomic_read32(keepalive_q->total),
+ *(volatile apr_uint32_t*)write_completion_q->total,
+ *(volatile apr_uint32_t*)keepalive_q->total,
apr_atomic_read32(&lingering_count),
apr_atomic_read32(&suspended_count));
if (dying) {
"All workers busy, not accepting new conns "
"in this process");
}
- else if ( (int)apr_atomic_read32(&connection_count)
- - (int)apr_atomic_read32(&lingering_count)
- > threads_per_child
- + ap_queue_info_get_idlers(worker_queue_info) *
- worker_factor / WORKER_FACTOR_SCALE)
+ else if ((c_count = apr_atomic_read32(&connection_count))
+ > (l_count = apr_atomic_read32(&lingering_count))
+ && (c_count - l_count
+ > ap_queue_info_get_idlers(worker_queue_info)
+ * worker_factor / WORKER_FACTOR_SCALE
+ + threads_per_child))
{
if (!listeners_disabled)
disable_listensocks(process_slot);
apr_thread_mutex_unlock(timeout_mutex);
- ps->keep_alive = apr_atomic_read32(keepalive_q->total);
- ps->write_completion = apr_atomic_read32(write_completion_q->total);
+ ps->keep_alive = *(volatile apr_uint32_t*)keepalive_q->total;
+ ps->write_completion = *(volatile apr_uint32_t*)write_completion_q->total;
ps->connections = apr_atomic_read32(&connection_count);
ps->suspended = apr_atomic_read32(&suspended_count);
ps->lingering_close = apr_atomic_read32(&lingering_count);
}
else if ((workers_were_busy || dying)
- && apr_atomic_read32(keepalive_q->total)) {
+ && *(volatile apr_uint32_t*)keepalive_q->total) {
apr_thread_mutex_lock(timeout_mutex);
process_keepalive_queue(0); /* kill'em all \m/ */
apr_thread_mutex_unlock(timeout_mutex);
}
if (listeners_disabled && !workers_were_busy
- && (int)apr_atomic_read32(&connection_count)
- - (int)apr_atomic_read32(&lingering_count)
- < ((int)ap_queue_info_get_idlers(worker_queue_info) - 1)
- * worker_factor / WORKER_FACTOR_SCALE + threads_per_child)
+ && ((c_count = apr_atomic_read32(&connection_count))
+ >= (l_count = apr_atomic_read32(&lingering_count))
+ && (i_count = ap_queue_info_get_idlers(worker_queue_info)) > 0
+ && (c_count - l_count
+ < (i_count - 1) * worker_factor / WORKER_FACTOR_SCALE
+ + threads_per_child)))
{
listeners_disabled = 0;
enable_listensocks(process_slot);
int loops;
int prev_threads_created;
int max_recycled_pools = -1;
- int good_methods[] = {APR_POLLSET_KQUEUE, APR_POLLSET_PORT, APR_POLLSET_EPOLL};
- /* XXX don't we need more to handle K-A or lingering close? */
- const apr_uint32_t pollset_size = threads_per_child * 2;
+ const int good_methods[] = { APR_POLLSET_KQUEUE,
+ APR_POLLSET_PORT,
+ APR_POLLSET_EPOLL };
+ /* XXX: K-A or lingering close connection included in the async factor */
+ const apr_uint32_t async_factor = worker_factor / WORKER_FACTOR_SCALE;
+ const apr_uint32_t pollset_size = (apr_uint32_t)num_listensocks +
+ (apr_uint32_t)threads_per_child *
+ (async_factor > 2 ? async_factor : 2);
/* We must create the fd queues before we start up the listener
* and worker threads. */
had_healthy_child = 0;
ap_extended_status = 0;
+ event_pollset = NULL;
+ worker_queue_info = NULL;
+ listener_os_thread = NULL;
+
return OK;
}
return "AsyncRequestWorkerFactor argument must be a positive number";
worker_factor = val * WORKER_FACTOR_SCALE;
- if (worker_factor == 0)
- worker_factor = 1;
+ if (worker_factor < WORKER_FACTOR_SCALE) {
+ worker_factor = WORKER_FACTOR_SCALE;
+ }
return NULL;
}
struct fd_queue_info_t
{
- apr_uint32_t idlers; /**
- * >= zero_pt: number of idle worker threads
- * < zero_pt: number of threads blocked waiting
- * for an idle worker
- */
+ apr_uint32_t volatile idlers; /**
+ * >= zero_pt: number of idle worker threads
+ * < zero_pt: number of threads blocked,
+ * waiting for an idle worker
+ */
apr_thread_mutex_t *idlers_mutex;
apr_thread_cond_t *wait_for_idler;
int terminated;
int max_idlers;
int max_recycled_pools;
apr_uint32_t recycled_pools_count;
- struct recycled_pool *recycled_pools;
+ struct recycled_pool *volatile recycled_pools;
};
static apr_status_t queue_info_cleanup(void *data_)
apr_pool_t * pool_to_recycle)
{
apr_status_t rv;
- apr_int32_t prev_idlers;
ap_push_pool(queue_info, pool_to_recycle);
- /* Atomically increment the count of idle workers */
- prev_idlers = apr_atomic_inc32(&(queue_info->idlers)) - zero_pt;
-
/* If other threads are waiting on a worker, wake one up */
- if (prev_idlers < 0) {
+ if (apr_atomic_inc32(&queue_info->idlers) < zero_pt) {
rv = apr_thread_mutex_lock(queue_info->idlers_mutex);
if (rv != APR_SUCCESS) {
AP_DEBUG_ASSERT(0);
apr_status_t ap_queue_info_try_get_idler(fd_queue_info_t * queue_info)
{
- apr_int32_t new_idlers;
- new_idlers = apr_atomic_add32(&(queue_info->idlers), -1) - zero_pt;
- if (--new_idlers <= 0) {
- apr_atomic_inc32(&(queue_info->idlers)); /* back out dec */
- return APR_EAGAIN;
+ /* Don't block if there isn't any idle worker. */
+ for (;;) {
+ apr_uint32_t idlers = queue_info->idlers;
+ if (idlers <= zero_pt) {
+ return APR_EAGAIN;
+ }
+ if (apr_atomic_cas32(&queue_info->idlers, idlers - 1,
+ idlers) == idlers) {
+ return APR_SUCCESS;
+ }
}
- return APR_SUCCESS;
}
apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t * queue_info,
int *had_to_block)
{
apr_status_t rv;
- apr_int32_t prev_idlers;
-
- /* Atomically decrement the idle worker count, saving the old value */
- /* See TODO in ap_queue_info_set_idle() */
- prev_idlers = apr_atomic_add32(&(queue_info->idlers), -1) - zero_pt;
- /* Block if there weren't any idle workers */
- if (prev_idlers <= 0) {
+ /* Block if there isn't any idle worker.
+ * apr_atomic_add32(x, -1) does the same as dec32(x), except
+ * that it returns the previous value (unlike dec32's bool).
+ */
+ if (apr_atomic_add32(&queue_info->idlers, -1) <= zero_pt) {
rv = apr_thread_mutex_lock(queue_info->idlers_mutex);
if (rv != APR_SUCCESS) {
AP_DEBUG_ASSERT(0);
- /* See TODO in ap_queue_info_set_idle() */
apr_atomic_inc32(&(queue_info->idlers)); /* back out dec */
return rv;
}
apr_uint32_t ap_queue_info_get_idlers(fd_queue_info_t * queue_info)
{
- apr_int32_t val;
- val = (apr_int32_t)apr_atomic_read32(&queue_info->idlers) - zero_pt;
- if (val < 0)
+ apr_uint32_t val;
+ val = apr_atomic_read32(&queue_info->idlers);
+ if (val <= zero_pt)
return 0;
- return val;
+ return val - zero_pt;
}
void ap_push_pool(fd_queue_info_t * queue_info,
return rv;
}
-static apr_status_t queue_interrupt(fd_queue_t * queue, int all)
+static apr_status_t queue_interrupt(fd_queue_t * queue, int all, int term)
{
apr_status_t rv;
if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
return rv;
}
+ /* we must hold one_big_mutex when setting this... otherwise,
+ * we could end up setting it and waking everybody up just after a
+ * would-be popper checks it but right before they block
+ */
+ if (term) {
+ queue->terminated = 1;
+ }
if (all)
apr_thread_cond_broadcast(queue->not_empty);
else
apr_status_t ap_queue_interrupt_all(fd_queue_t * queue)
{
- return queue_interrupt(queue, 1);
+ return queue_interrupt(queue, 1, 0);
}
apr_status_t ap_queue_interrupt_one(fd_queue_t * queue)
{
- return queue_interrupt(queue, 0);
+ return queue_interrupt(queue, 0, 0);
}
apr_status_t ap_queue_term(fd_queue_t * queue)
{
- apr_status_t rv;
-
- if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
- return rv;
- }
- /* we must hold one_big_mutex when setting this... otherwise,
- * we could end up setting it and waking everybody up just after a
- * would-be popper checks it but right before they block
- */
- queue->terminated = 1;
- if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) {
- return rv;
- }
- return ap_queue_interrupt_all(queue);
+ return queue_interrupt(queue, 1, 1);
}