static int server_limit = 0; /* ServerLimit */
static int thread_limit = 0; /* ThreadLimit */
static int had_healthy_child = 0;
-static volatile int dying = 0;
-static volatile int workers_may_exit = 0;
-static volatile int start_thread_may_exit = 0;
-static volatile int listener_may_exit = 0;
+static int dying = 0;
+static int workers_may_exit = 0;
+static int start_thread_may_exit = 0;
+static int listener_may_exit = 0;
static int listener_is_wakeable = 0; /* Pollset supports APR_POLLSET_WAKEABLE */
static int num_listensocks = 0;
static apr_int32_t conns_this_child; /* MaxConnectionsPerChild, only access
apr_time_t next_expiry;
APR_RING_INSERT_TAIL(&q->head, el, event_conn_state_t, timeout_list);
- ++*q->total;
+ apr_atomic_inc32(q->total);
++q->count;
/* Cheaply update the overall queues' next expiry according to the
{
APR_RING_REMOVE(el, timeout_list);
APR_RING_ELEM_INIT(el, timeout_list);
- --*q->total;
+ apr_atomic_dec32(q->total);
--q->count;
}
static pid_t parent_pid;
static apr_os_thread_t *listener_os_thread;
-static int ap_child_slot; /* Current child process slot in scoreboard */
-
/* The LISTENER_SIGNAL signal will be sent from the main thread to the
* listener thread to wake it up for graceful termination (what a child
* process from an old generation does when the admin does "apachectl
*/
static apr_socket_t **worker_sockets;
-static volatile apr_uint32_t listensocks_disabled;
-
-static void disable_listensocks(void)
+static void disable_listensocks(int process_slot)
{
int i;
- if (apr_atomic_cas32(&listensocks_disabled, 1, 0) != 0) {
- return;
+ for (i = 0; i < num_listensocks; i++) {
+ apr_pollset_remove(event_pollset, &listener_pollfd[i]);
}
- if (event_pollset) {
- for (i = 0; i < num_listensocks; i++) {
- apr_pollset_remove(event_pollset, &listener_pollfd[i]);
- }
- }
- ap_scoreboard_image->parent[ap_child_slot].not_accepting = 1;
+ ap_scoreboard_image->parent[process_slot].not_accepting = 1;
}
-static void enable_listensocks(void)
+static void enable_listensocks(int process_slot)
{
int i;
- if (listener_may_exit
- || apr_atomic_cas32(&listensocks_disabled, 0, 1) != 1) {
+ if (listener_may_exit) {
return;
}
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(00457)
* XXX: This is not yet optimal. If many workers suddenly become available,
* XXX: the parent may kill some processes off too soon.
*/
- ap_scoreboard_image->parent[ap_child_slot].not_accepting = 0;
-}
-
-static APR_INLINE apr_uint32_t listeners_disabled(void)
-{
- return apr_atomic_read32(&listensocks_disabled);
-}
-
-static APR_INLINE int connections_above_limit(void)
-{
- apr_uint32_t i_count = ap_queue_info_get_idlers(worker_queue_info);
- if (i_count > 0) {
- apr_uint32_t c_count = apr_atomic_read32(&connection_count);
- apr_uint32_t l_count = apr_atomic_read32(&lingering_count);
- if (c_count <= l_count
- /* Off by 'listeners_disabled()' to avoid flip flop */
- || c_count - l_count < (apr_uint32_t)threads_per_child +
- (i_count - listeners_disabled()) *
- (worker_factor / WORKER_FACTOR_SCALE)) {
- return 0;
- }
- }
- return 1;
+ ap_scoreboard_image->parent[process_slot].not_accepting = 0;
}
static void abort_socket_nonblocking(apr_socket_t *csd)
static void wakeup_listener(void)
{
listener_may_exit = 1;
- disable_listensocks();
-
- /* Unblock the listener if it's poll()ing */
- if (event_pollset && listener_is_wakeable) {
- apr_pollset_wakeup(event_pollset);
- }
-
- /* unblock the listener if it's waiting for a worker */
- if (worker_queue_info) {
- ap_queue_info_term(worker_queue_info);
- }
-
if (!listener_os_thread) {
/* XXX there is an obscure path that this doesn't handle perfectly:
* right after listener thread is created but before
*/
return;
}
+
+ /* Unblock the listener if it's poll()ing */
+ if (listener_is_wakeable) {
+ apr_pollset_wakeup(event_pollset);
+ }
+
+ /* unblock the listener if it's waiting for a worker */
+ ap_queue_info_term(worker_queue_info);
+
/*
* we should just be able to "kill(ap_my_pid, LISTENER_SIGNAL)" on all
* platforms and wake up the listener thread since it is the only thread
static void signal_threads(int mode)
{
- if (terminate_mode >= mode) {
+ if (terminate_mode == mode) {
return;
}
terminate_mode = mode;
static apr_status_t decrement_connection_count(void *cs_)
{
- int is_last_connection;
event_conn_state_t *cs = cs_;
switch (cs->pub.state) {
case CONN_STATE_LINGER_NORMAL:
default:
break;
}
- /* Unblock the listener if it's waiting for connection_count = 0,
- * or if the listening sockets were disabled due to limits and can
- * now accept new connections.
- */
- is_last_connection = !apr_atomic_dec32(&connection_count);
- if (listener_is_wakeable
- && ((is_last_connection && listener_may_exit)
- || (listeners_disabled() && !connections_above_limit()))) {
+ /* Unblock the listener if it's waiting for connection_count = 0 */
+ if (!apr_atomic_dec32(&connection_count)
+ && listener_is_wakeable && listener_may_exit) {
apr_pollset_wakeup(event_pollset);
}
return APR_SUCCESS;
}
}
-static void close_listeners(int *closed)
+static void close_listeners(int process_slot, int *closed)
{
if (!*closed) {
int i;
+ disable_listensocks(process_slot);
ap_close_listeners_ex(my_bucket->listeners);
*closed = 1;
dying = 1;
- ap_scoreboard_image->parent[ap_child_slot].quiescing = 1;
+ ap_scoreboard_image->parent[process_slot].quiescing = 1;
for (i = 0; i < threads_per_child; ++i) {
- ap_update_child_status_from_indexes(ap_child_slot, i,
+ ap_update_child_status_from_indexes(process_slot, i,
SERVER_GRACEFUL, NULL);
}
/* wake up the main thread */
struct timeout_queue *qp;
apr_status_t rv;
- if (!*q->total) {
+ if (!apr_atomic_read32(q->total)) {
return;
}
APR_RING_UNSPLICE(first, last, timeout_list);
APR_RING_SPLICE_TAIL(&trash, first, last, event_conn_state_t,
timeout_list);
- AP_DEBUG_ASSERT(*q->total >= count && qp->count >= count);
- *q->total -= count;
+ AP_DEBUG_ASSERT(apr_atomic_read32(q->total) >= count);
+ apr_atomic_sub32(q->total, count);
qp->count -= count;
total += count;
}
if (!timeout_time) {
ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
"All workers are busy or dying, will close %u "
- "keep-alive connections", *keepalive_q->total);
+ "keep-alive connections",
+ apr_atomic_read32(keepalive_q->total));
}
process_timeout_queue(keepalive_q, timeout_time,
start_lingering_close_nonblocking);
static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
{
+ timer_event_t *te;
apr_status_t rc;
proc_info *ti = dummy;
int process_slot = ti->pslot;
struct process_score *ps = ap_get_scoreboard_process(process_slot);
apr_pool_t *tpool = apr_thread_pool_get(thd);
- int closed = 0;
+ void *csd = NULL;
+ apr_pool_t *ptrans; /* Pool for per-transaction stuff */
+ ap_listen_rec *lr;
int have_idle_worker = 0;
- apr_time_t last_log;
+ const apr_pollfd_t *out_pfd;
+ apr_int32_t num = 0;
+ apr_interval_time_t timeout_interval;
+ apr_time_t timeout_time = 0, now, last_log;
+ listener_poll_type *pt;
+ int closed = 0, listeners_disabled = 0;
last_log = apr_time_now();
free(ti);
if (rc != APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
"failed to initialize pollset, "
- "shutdown process now");
- resource_shortage = 1;
- signal_threads(ST_UNGRACEFUL);
+ "attempting to shutdown process gracefully");
+ signal_threads(ST_GRACEFUL);
return NULL;
}
apr_signal(LISTENER_SIGNAL, dummy_signal_handler);
for (;;) {
- timer_event_t *te;
- const apr_pollfd_t *out_pfd;
- apr_int32_t num = 0;
- apr_interval_time_t timeout_interval;
- apr_time_t now, timeout_time;
int workers_were_busy = 0;
- if (conns_this_child <= 0)
- check_infinite_requests();
-
if (listener_may_exit) {
- close_listeners(&closed);
+ close_listeners(process_slot, &closed);
if (terminate_mode == ST_UNGRACEFUL
|| apr_atomic_read32(&connection_count) == 0)
break;
}
+ if (conns_this_child <= 0)
+ check_infinite_requests();
+
now = apr_time_now();
if (APLOGtrace6(ap_server_conf)) {
/* trace log status every second */
"keep-alive: %d lingering: %d suspended: %u)",
apr_atomic_read32(&connection_count),
apr_atomic_read32(&clogged_count),
- *(volatile apr_uint32_t*)write_completion_q->total,
- *(volatile apr_uint32_t*)keepalive_q->total,
+ apr_atomic_read32(write_completion_q->total),
+ apr_atomic_read32(keepalive_q->total),
apr_atomic_read32(&lingering_count),
apr_atomic_read32(&suspended_count));
if (dying) {
rc = apr_pollset_poll(event_pollset, timeout_interval, &num, &out_pfd);
if (rc != APR_SUCCESS) {
if (APR_STATUS_IS_EINTR(rc)) {
- /* Woken up, if we are exiting or listeners are disabled we
- * must fall through to kill kept-alive connections or test
- * whether listeners should be re-enabled. Otherwise we only
- * need to update timeouts (logic is above, so simply restart
- * the loop).
+ /* Woken up, if we are exiting we must fall through to kill
+ * kept-alive connections, otherwise we only need to update
+ * timeouts (logic is above, so restart the loop).
*/
- if (!listener_may_exit && !listeners_disabled()) {
+ if (!listener_may_exit) {
continue;
}
timeout_time = 0;
}
if (listener_may_exit) {
- close_listeners(&closed);
+ close_listeners(process_slot, &closed);
if (terminate_mode == ST_UNGRACEFUL
|| apr_atomic_read32(&connection_count) == 0)
break;
}
- for (; num; --num, ++out_pfd) {
- listener_poll_type *pt = (listener_poll_type *) out_pfd->client_data;
+ while (num) {
+ pt = (listener_poll_type *) out_pfd->client_data;
if (pt->type == PT_CSD) {
/* one of the sockets is readable */
event_conn_state_t *cs = (event_conn_state_t *) pt->baton;
ap_assert(0);
}
}
- else if (pt->type == PT_ACCEPT && !listeners_disabled()) {
+ else if (pt->type == PT_ACCEPT) {
/* A Listener Socket is ready for an accept() */
if (workers_were_busy) {
- disable_listensocks();
+ if (!listeners_disabled)
+ disable_listensocks(process_slot);
+ listeners_disabled = 1;
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf,
"All workers busy, not accepting new conns "
"in this process");
}
- else if (connections_above_limit()) {
- disable_listensocks();
+ else if ( (int)apr_atomic_read32(&connection_count)
+ - (int)apr_atomic_read32(&lingering_count)
+ > threads_per_child
+ + ap_queue_info_get_idlers(worker_queue_info) *
+ worker_factor / WORKER_FACTOR_SCALE)
+ {
+ if (!listeners_disabled)
+ disable_listensocks(process_slot);
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf,
"Too many open connections (%u), "
"not accepting new conns in this process",
ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
"Idle workers: %u",
ap_queue_info_get_idlers(worker_queue_info));
- workers_were_busy = 1;
+ listeners_disabled = 1;
}
- else if (!listener_may_exit) {
- void *csd = NULL;
- ap_listen_rec *lr = (ap_listen_rec *) pt->baton;
- apr_pool_t *ptrans; /* Pool for per-transaction stuff */
+ else if (listeners_disabled) {
+ listeners_disabled = 0;
+ enable_listensocks(process_slot);
+ }
+ if (!listeners_disabled) {
+ lr = (ap_listen_rec *) pt->baton;
ap_pop_pool(&ptrans, worker_queue_info);
if (ptrans == NULL) {
/* create a new transaction pool for each accepted socket */
- apr_allocator_t *allocator = NULL;
-
- rc = apr_allocator_create(&allocator);
- if (rc == APR_SUCCESS) {
- apr_allocator_max_free_set(allocator,
- ap_max_mem_free);
- rc = apr_pool_create_ex(&ptrans, pconf, NULL,
- allocator);
- if (rc == APR_SUCCESS) {
- apr_pool_tag(ptrans, "transaction");
- apr_allocator_owner_set(allocator, ptrans);
- }
- }
- if (rc != APR_SUCCESS) {
+ apr_allocator_t *allocator;
+
+ apr_allocator_create(&allocator);
+ apr_allocator_max_free_set(allocator,
+ ap_max_mem_free);
+ apr_pool_create_ex(&ptrans, pconf, NULL, allocator);
+ apr_allocator_owner_set(allocator, ptrans);
+ if (ptrans == NULL) {
ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
ap_server_conf, APLOGNO(03097)
"Failed to create transaction pool");
- if (allocator) {
- apr_allocator_destroy(allocator);
- }
- resource_shortage = 1;
signal_threads(ST_GRACEFUL);
- continue;
+ return NULL;
}
}
+ apr_pool_tag(ptrans, "transaction");
get_worker(&have_idle_worker, 1, &workers_were_busy);
rc = lr->accept_func(&csd, lr, ptrans);
}
}
} /* if:else on pt->type */
- } /* for processing poll */
+ out_pfd++;
+ num--;
+ } /* while for processing poll */
/* XXX possible optimization: stash the current time for use as
* r->request_time for new requests
apr_thread_mutex_unlock(timeout_mutex);
- ps->keep_alive = *(volatile apr_uint32_t*)keepalive_q->total;
- ps->write_completion = *(volatile apr_uint32_t*)write_completion_q->total;
+ ps->keep_alive = apr_atomic_read32(keepalive_q->total);
+ ps->write_completion = apr_atomic_read32(write_completion_q->total);
ps->connections = apr_atomic_read32(&connection_count);
ps->suspended = apr_atomic_read32(&suspended_count);
ps->lingering_close = apr_atomic_read32(&lingering_count);
}
else if ((workers_were_busy || dying)
- && *(volatile apr_uint32_t*)keepalive_q->total) {
+ && apr_atomic_read32(keepalive_q->total)) {
apr_thread_mutex_lock(timeout_mutex);
process_keepalive_queue(0); /* kill'em all \m/ */
apr_thread_mutex_unlock(timeout_mutex);
}
}
- if (listeners_disabled()
- && !workers_were_busy
- && !connections_above_limit()) {
- enable_listensocks();
+ if (listeners_disabled && !workers_were_busy
+ && (int)apr_atomic_read32(&connection_count)
+ - (int)apr_atomic_read32(&lingering_count)
+ < ((int)ap_queue_info_get_idlers(worker_queue_info) - 1)
+ * worker_factor / WORKER_FACTOR_SCALE + threads_per_child)
+ {
+ listeners_disabled = 0;
+ enable_listensocks(process_slot);
}
- } /* listener main loop */
+ /*
+ * XXX: do we need to set some timeout that re-enables the listensocks
+ * XXX: in case no other event occurs?
+ */
+ } /* listener main loop */
- close_listeners(&closed);
+ close_listeners(process_slot, &closed);
ap_queue_term(worker_queue);
apr_thread_exit(thd, APR_SUCCESS);
proc_info *ti = dummy;
int process_slot = ti->pslot;
int thread_slot = ti->tslot;
+ apr_socket_t *csd = NULL;
+ event_conn_state_t *cs;
+ apr_pool_t *ptrans; /* Pool for per-transaction stuff */
apr_status_t rv;
int is_idle = 0;
+ timer_event_t *te = NULL;
free(ti);
SERVER_STARTING, NULL);
while (!workers_may_exit) {
- apr_socket_t *csd = NULL;
- event_conn_state_t *cs;
- timer_event_t *te = NULL;
- apr_pool_t *ptrans; /* Pool for per-transaction stuff */
-
if (!is_idle) {
rv = ap_queue_info_set_idle(worker_queue_info, NULL);
if (rv != APR_SUCCESS) {
break;
}
+ te = NULL;
rv = ap_queue_pop_something(worker_queue, &csd, &cs, &ptrans, &te);
if (rv != APR_SUCCESS) {
int loops;
int prev_threads_created;
int max_recycled_pools = -1;
- const int good_methods[] = { APR_POLLSET_KQUEUE,
- APR_POLLSET_PORT,
- APR_POLLSET_EPOLL };
- /* XXX: K-A or lingering close connection included in the async factor */
- const apr_uint32_t async_factor = worker_factor / WORKER_FACTOR_SCALE;
- const apr_uint32_t pollset_size = (apr_uint32_t)num_listensocks +
- (apr_uint32_t)threads_per_child *
- (async_factor > 2 ? async_factor : 2);
+ int good_methods[] = {APR_POLLSET_KQUEUE, APR_POLLSET_PORT, APR_POLLSET_EPOLL};
+ /* XXX don't we need more to handle K-A or lingering close? */
+ const apr_uint32_t pollset_size = threads_per_child * 2;
/* We must create the fd queues before we start up the listener
* and worker threads. */
retained->mpm->mpm_state = AP_MPMQ_STARTING;
ap_my_pid = getpid();
- ap_child_slot = child_num_arg;
ap_fatal_signal_child_setup(ap_server_conf);
apr_pool_create(&pchild, pconf);
had_healthy_child = 0;
ap_extended_status = 0;
- event_pollset = NULL;
- worker_queue_info = NULL;
- listener_os_thread = NULL;
- listensocks_disabled = 0;
-
return OK;
}
return "AsyncRequestWorkerFactor argument must be a positive number";
worker_factor = val * WORKER_FACTOR_SCALE;
- if (worker_factor < WORKER_FACTOR_SCALE) {
- worker_factor = WORKER_FACTOR_SCALE;
- }
+ if (worker_factor == 0)
+ worker_factor = 1;
return NULL;
}
struct fd_queue_info_t
{
- apr_uint32_t volatile idlers; /**
- * >= zero_pt: number of idle worker threads
- * < zero_pt: number of threads blocked,
- * waiting for an idle worker
- */
+ apr_uint32_t idlers; /**
+ * >= zero_pt: number of idle worker threads
+ * < zero_pt: number of threads blocked waiting
+ * for an idle worker
+ */
apr_thread_mutex_t *idlers_mutex;
apr_thread_cond_t *wait_for_idler;
int terminated;
int max_idlers;
int max_recycled_pools;
apr_uint32_t recycled_pools_count;
- struct recycled_pool *volatile recycled_pools;
+ struct recycled_pool *recycled_pools;
};
static apr_status_t queue_info_cleanup(void *data_)
apr_pool_t * pool_to_recycle)
{
apr_status_t rv;
+ apr_int32_t prev_idlers;
ap_push_pool(queue_info, pool_to_recycle);
+ /* Atomically increment the count of idle workers */
+ prev_idlers = apr_atomic_inc32(&(queue_info->idlers)) - zero_pt;
+
/* If other threads are waiting on a worker, wake one up */
- if (apr_atomic_inc32(&queue_info->idlers) < zero_pt) {
+ if (prev_idlers < 0) {
rv = apr_thread_mutex_lock(queue_info->idlers_mutex);
if (rv != APR_SUCCESS) {
AP_DEBUG_ASSERT(0);
apr_status_t ap_queue_info_try_get_idler(fd_queue_info_t * queue_info)
{
- /* Don't block if there isn't any idle worker. */
- for (;;) {
- apr_uint32_t idlers = queue_info->idlers;
- if (idlers <= zero_pt) {
- return APR_EAGAIN;
- }
- if (apr_atomic_cas32(&queue_info->idlers, idlers - 1,
- idlers) == idlers) {
- return APR_SUCCESS;
- }
+ apr_int32_t new_idlers;
+ new_idlers = apr_atomic_add32(&(queue_info->idlers), -1) - zero_pt;
+ if (--new_idlers <= 0) {
+ apr_atomic_inc32(&(queue_info->idlers)); /* back out dec */
+ return APR_EAGAIN;
}
+ return APR_SUCCESS;
}
apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t * queue_info,
int *had_to_block)
{
apr_status_t rv;
+ apr_int32_t prev_idlers;
- /* Block if there isn't any idle worker.
- * apr_atomic_add32(x, -1) does the same as dec32(x), except
- * that it returns the previous value (unlike dec32's bool).
- */
- if (apr_atomic_add32(&queue_info->idlers, -1) <= zero_pt) {
+ /* Atomically decrement the idle worker count, saving the old value */
+ /* See TODO in ap_queue_info_set_idle() */
+ prev_idlers = apr_atomic_add32(&(queue_info->idlers), -1) - zero_pt;
+
+ /* Block if there weren't any idle workers */
+ if (prev_idlers <= 0) {
rv = apr_thread_mutex_lock(queue_info->idlers_mutex);
if (rv != APR_SUCCESS) {
AP_DEBUG_ASSERT(0);
+ /* See TODO in ap_queue_info_set_idle() */
apr_atomic_inc32(&(queue_info->idlers)); /* back out dec */
return rv;
}
apr_uint32_t ap_queue_info_get_idlers(fd_queue_info_t * queue_info)
{
- apr_uint32_t val;
- val = apr_atomic_read32(&queue_info->idlers);
- if (val <= zero_pt)
+ apr_int32_t val;
+ val = (apr_int32_t)apr_atomic_read32(&queue_info->idlers) - zero_pt;
+ if (val < 0)
return 0;
- return val - zero_pt;
+ return val;
}
void ap_push_pool(fd_queue_info_t * queue_info,
return rv;
}
-static apr_status_t queue_interrupt(fd_queue_t * queue, int all, int term)
+static apr_status_t queue_interrupt(fd_queue_t * queue, int all)
{
apr_status_t rv;
if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
return rv;
}
- /* we must hold one_big_mutex when setting this... otherwise,
- * we could end up setting it and waking everybody up just after a
- * would-be popper checks it but right before they block
- */
- if (term) {
- queue->terminated = 1;
- }
if (all)
apr_thread_cond_broadcast(queue->not_empty);
else
apr_status_t ap_queue_interrupt_all(fd_queue_t * queue)
{
- return queue_interrupt(queue, 1, 0);
+ return queue_interrupt(queue, 1);
}
apr_status_t ap_queue_interrupt_one(fd_queue_t * queue)
{
- return queue_interrupt(queue, 0, 0);
+ return queue_interrupt(queue, 0);
}
apr_status_t ap_queue_term(fd_queue_t * queue)
{
- return queue_interrupt(queue, 1, 1);
+ apr_status_t rv;
+
+ if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
+ return rv;
+ }
+ /* we must hold one_big_mutex when setting this... otherwise,
+ * we could end up setting it and waking everybody up just after a
+ * would-be popper checks it but right before they block
+ */
+ queue->terminated = 1;
+ if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) {
+ return rv;
+ }
+ return ap_queue_interrupt_all(queue);
}