From bbcd9b0737d423744aa04e0b7ec938cefa144c4e Mon Sep 17 00:00:00 2001 From: Yann Ylavic Date: Fri, 9 Feb 2018 11:12:42 +0000 Subject: [PATCH] Merge r1643279, r1703241, r1802535, r1819847, r1819848, r1819852, r1819853 from trunk: mpm_event(opt): avoid casts/comparisons from unsigned to signed (atomics). mpm_event/worker: make ap_queue_term() atomic (acquire/release the mutex once). mpm_event: ap_queue_info_try_get_idler() may atomically decrement and then re-increment the number idlers if it went under or to zero. We can avoid this by switching to a compare-and-swap scheme. mpm_event: avoid unexpected compiler optimizations. Make sure the compiler doesn't play games with our synchronization variables by marking them volatile. mpm_event: make sure wakeup_listener() does its minimal job. Even if the listener thread is not created yet (i.e. about to be), we must still tell it to leave, and terminate the worker queue in any case. mpm_event: worker factor vs pollset. Make sure the worker factor is at least one (w.r.t. WORKER_FACTOR_SCALE), and use it to size the pollset appropriately (including K-A and lingering close connections), in addition to the listening sockets. mpm_event: remove atomics for timeout_queue's total counter. It's always updated under the timeout_mutex lock, or read for logging and scoreboard updates (not critical). For the read cases a volatile access is enough, while removing the atomic ops for the already protected write cases saves cycles and context switches. Submitted by: ylavic Reviewed by: ylavic, jim, icing git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/branches/2.4.x@1823642 13f79535-47bb-0310-9956-ffa450edef68 --- server/mpm/event/event.c | 95 +++++++++++++++++++++---------------- server/mpm/event/fdqueue.c | 83 +++++++++++++++----------------- server/mpm/worker/fdqueue.c | 29 ++++++----- 3 files changed, 106 insertions(+), 101 deletions(-) diff --git a/server/mpm/event/event.c b/server/mpm/event/event.c index d35efc1289..a9e79b7a98 100644 --- a/server/mpm/event/event.c +++ b/server/mpm/event/event.c @@ -173,10 +173,10 @@ static int max_workers = 0; /* MaxRequestWorkers */ static int server_limit = 0; /* ServerLimit */ static int thread_limit = 0; /* ThreadLimit */ static int had_healthy_child = 0; -static int dying = 0; -static int workers_may_exit = 0; -static int start_thread_may_exit = 0; -static int listener_may_exit = 0; +static volatile int dying = 0; +static volatile int workers_may_exit = 0; +static volatile int start_thread_may_exit = 0; +static volatile int listener_may_exit = 0; static int listener_is_wakeable = 0; /* Pollset supports APR_POLLSET_WAKEABLE */ static int num_listensocks = 0; static apr_int32_t conns_this_child; /* MaxConnectionsPerChild, only access @@ -287,7 +287,7 @@ static void TO_QUEUE_APPEND(struct timeout_queue *q, event_conn_state_t *el) apr_time_t next_expiry; APR_RING_INSERT_TAIL(&q->head, el, event_conn_state_t, timeout_list); - apr_atomic_inc32(q->total); + ++*q->total; ++q->count; /* Cheaply update the overall queues' next expiry according to the @@ -309,7 +309,7 @@ static void TO_QUEUE_REMOVE(struct timeout_queue *q, event_conn_state_t *el) { APR_RING_REMOVE(el, timeout_list); APR_RING_ELEM_INIT(el, timeout_list); - apr_atomic_dec32(q->total); + --*q->total; --q->count; } @@ -541,6 +541,17 @@ static void close_worker_sockets(void) static void wakeup_listener(void) { listener_may_exit = 1; + + /* Unblock the listener if it's poll()ing */ + if (event_pollset && listener_is_wakeable) { + apr_pollset_wakeup(event_pollset); + } + + /* unblock the listener if it's waiting for a worker */ + if (worker_queue_info) { + ap_queue_info_term(worker_queue_info); + } + if (!listener_os_thread) { /* XXX there is an obscure path that this doesn't handle perfectly: * right after listener thread is created but before @@ -549,15 +560,6 @@ static void wakeup_listener(void) */ return; } - - /* Unblock the listener if it's poll()ing */ - if (listener_is_wakeable) { - apr_pollset_wakeup(event_pollset); - } - - /* unblock the listener if it's waiting for a worker */ - ap_queue_info_term(worker_queue_info); - /* * we should just be able to "kill(ap_my_pid, LISTENER_SIGNAL)" on all * platforms and wake up the listener thread since it is the only thread @@ -578,7 +580,7 @@ static int terminate_mode = ST_INIT; static void signal_threads(int mode) { - if (terminate_mode == mode) { + if (terminate_mode >= mode) { return; } terminate_mode = mode; @@ -1461,7 +1463,7 @@ static void process_timeout_queue(struct timeout_queue *q, struct timeout_queue *qp; apr_status_t rv; - if (!apr_atomic_read32(q->total)) { + if (!*q->total) { return; } @@ -1511,8 +1513,8 @@ static void process_timeout_queue(struct timeout_queue *q, APR_RING_UNSPLICE(first, last, timeout_list); APR_RING_SPLICE_TAIL(&trash, first, last, event_conn_state_t, timeout_list); - AP_DEBUG_ASSERT(apr_atomic_read32(q->total) >= count); - apr_atomic_sub32(q->total, count); + AP_DEBUG_ASSERT(*q->total >= count && qp->count >= count); + *q->total -= count; qp->count -= count; total += count; } @@ -1538,8 +1540,7 @@ static void process_keepalive_queue(apr_time_t timeout_time) if (!timeout_time) { ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf, "All workers are busy or dying, will close %u " - "keep-alive connections", - apr_atomic_read32(keepalive_q->total)); + "keep-alive connections", *keepalive_q->total); } process_timeout_queue(keepalive_q, timeout_time, start_lingering_close_nonblocking); @@ -1578,6 +1579,7 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) timer_event_t *te; const apr_pollfd_t *out_pfd; apr_int32_t num = 0; + apr_uint32_t c_count, l_count, i_count; apr_interval_time_t timeout_interval; apr_time_t now, timeout_time; int workers_were_busy = 0; @@ -1603,8 +1605,8 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) "keep-alive: %d lingering: %d suspended: %u)", apr_atomic_read32(&connection_count), apr_atomic_read32(&clogged_count), - apr_atomic_read32(write_completion_q->total), - apr_atomic_read32(keepalive_q->total), + *(volatile apr_uint32_t*)write_completion_q->total, + *(volatile apr_uint32_t*)keepalive_q->total, apr_atomic_read32(&lingering_count), apr_atomic_read32(&suspended_count)); if (dying) { @@ -1762,11 +1764,12 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) "All workers busy, not accepting new conns " "in this process"); } - else if ( (int)apr_atomic_read32(&connection_count) - - (int)apr_atomic_read32(&lingering_count) - > threads_per_child - + ap_queue_info_get_idlers(worker_queue_info) * - worker_factor / WORKER_FACTOR_SCALE) + else if ((c_count = apr_atomic_read32(&connection_count)) + > (l_count = apr_atomic_read32(&lingering_count)) + && (c_count - l_count + > ap_queue_info_get_idlers(worker_queue_info) + * worker_factor / WORKER_FACTOR_SCALE + + threads_per_child)) { if (!listeners_disabled) disable_listensocks(process_slot); @@ -1875,14 +1878,14 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) apr_thread_mutex_unlock(timeout_mutex); - ps->keep_alive = apr_atomic_read32(keepalive_q->total); - ps->write_completion = apr_atomic_read32(write_completion_q->total); + ps->keep_alive = *(volatile apr_uint32_t*)keepalive_q->total; + ps->write_completion = *(volatile apr_uint32_t*)write_completion_q->total; ps->connections = apr_atomic_read32(&connection_count); ps->suspended = apr_atomic_read32(&suspended_count); ps->lingering_close = apr_atomic_read32(&lingering_count); } else if ((workers_were_busy || dying) - && apr_atomic_read32(keepalive_q->total)) { + && *(volatile apr_uint32_t*)keepalive_q->total) { apr_thread_mutex_lock(timeout_mutex); process_keepalive_queue(0); /* kill'em all \m/ */ apr_thread_mutex_unlock(timeout_mutex); @@ -1908,10 +1911,12 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) } if (listeners_disabled && !workers_were_busy - && (int)apr_atomic_read32(&connection_count) - - (int)apr_atomic_read32(&lingering_count) - < ((int)ap_queue_info_get_idlers(worker_queue_info) - 1) - * worker_factor / WORKER_FACTOR_SCALE + threads_per_child) + && ((c_count = apr_atomic_read32(&connection_count)) + >= (l_count = apr_atomic_read32(&lingering_count)) + && (i_count = ap_queue_info_get_idlers(worker_queue_info)) > 0 + && (c_count - l_count + < (i_count - 1) * worker_factor / WORKER_FACTOR_SCALE + + threads_per_child))) { listeners_disabled = 0; enable_listensocks(process_slot); @@ -2143,9 +2148,14 @@ static void *APR_THREAD_FUNC start_threads(apr_thread_t * thd, void *dummy) int loops; int prev_threads_created; int max_recycled_pools = -1; - int good_methods[] = {APR_POLLSET_KQUEUE, APR_POLLSET_PORT, APR_POLLSET_EPOLL}; - /* XXX don't we need more to handle K-A or lingering close? */ - const apr_uint32_t pollset_size = threads_per_child * 2; + const int good_methods[] = { APR_POLLSET_KQUEUE, + APR_POLLSET_PORT, + APR_POLLSET_EPOLL }; + /* XXX: K-A or lingering close connection included in the async factor */ + const apr_uint32_t async_factor = worker_factor / WORKER_FACTOR_SCALE; + const apr_uint32_t pollset_size = (apr_uint32_t)num_listensocks + + (apr_uint32_t)threads_per_child * + (async_factor > 2 ? async_factor : 2); /* We must create the fd queues before we start up the listener * and worker threads. */ @@ -3339,6 +3349,10 @@ static int event_pre_config(apr_pool_t * pconf, apr_pool_t * plog, had_healthy_child = 0; ap_extended_status = 0; + event_pollset = NULL; + worker_queue_info = NULL; + listener_os_thread = NULL; + return OK; } @@ -3752,8 +3766,9 @@ static const char *set_worker_factor(cmd_parms * cmd, void *dummy, return "AsyncRequestWorkerFactor argument must be a positive number"; worker_factor = val * WORKER_FACTOR_SCALE; - if (worker_factor == 0) - worker_factor = 1; + if (worker_factor < WORKER_FACTOR_SCALE) { + worker_factor = WORKER_FACTOR_SCALE; + } return NULL; } diff --git a/server/mpm/event/fdqueue.c b/server/mpm/event/fdqueue.c index 64b318d0e0..175d86a720 100644 --- a/server/mpm/event/fdqueue.c +++ b/server/mpm/event/fdqueue.c @@ -27,18 +27,18 @@ struct recycled_pool struct fd_queue_info_t { - apr_uint32_t idlers; /** - * >= zero_pt: number of idle worker threads - * < zero_pt: number of threads blocked waiting - * for an idle worker - */ + apr_uint32_t volatile idlers; /** + * >= zero_pt: number of idle worker threads + * < zero_pt: number of threads blocked, + * waiting for an idle worker + */ apr_thread_mutex_t *idlers_mutex; apr_thread_cond_t *wait_for_idler; int terminated; int max_idlers; int max_recycled_pools; apr_uint32_t recycled_pools_count; - struct recycled_pool *recycled_pools; + struct recycled_pool *volatile recycled_pools; }; static apr_status_t queue_info_cleanup(void *data_) @@ -97,15 +97,11 @@ apr_status_t ap_queue_info_set_idle(fd_queue_info_t * queue_info, apr_pool_t * pool_to_recycle) { apr_status_t rv; - apr_int32_t prev_idlers; ap_push_pool(queue_info, pool_to_recycle); - /* Atomically increment the count of idle workers */ - prev_idlers = apr_atomic_inc32(&(queue_info->idlers)) - zero_pt; - /* If other threads are waiting on a worker, wake one up */ - if (prev_idlers < 0) { + if (apr_atomic_inc32(&queue_info->idlers) < zero_pt) { rv = apr_thread_mutex_lock(queue_info->idlers_mutex); if (rv != APR_SUCCESS) { AP_DEBUG_ASSERT(0); @@ -127,31 +123,32 @@ apr_status_t ap_queue_info_set_idle(fd_queue_info_t * queue_info, apr_status_t ap_queue_info_try_get_idler(fd_queue_info_t * queue_info) { - apr_int32_t new_idlers; - new_idlers = apr_atomic_add32(&(queue_info->idlers), -1) - zero_pt; - if (--new_idlers <= 0) { - apr_atomic_inc32(&(queue_info->idlers)); /* back out dec */ - return APR_EAGAIN; + /* Don't block if there isn't any idle worker. */ + for (;;) { + apr_uint32_t idlers = queue_info->idlers; + if (idlers <= zero_pt) { + return APR_EAGAIN; + } + if (apr_atomic_cas32(&queue_info->idlers, idlers - 1, + idlers) == idlers) { + return APR_SUCCESS; + } } - return APR_SUCCESS; } apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t * queue_info, int *had_to_block) { apr_status_t rv; - apr_int32_t prev_idlers; - - /* Atomically decrement the idle worker count, saving the old value */ - /* See TODO in ap_queue_info_set_idle() */ - prev_idlers = apr_atomic_add32(&(queue_info->idlers), -1) - zero_pt; - /* Block if there weren't any idle workers */ - if (prev_idlers <= 0) { + /* Block if there isn't any idle worker. + * apr_atomic_add32(x, -1) does the same as dec32(x), except + * that it returns the previous value (unlike dec32's bool). + */ + if (apr_atomic_add32(&queue_info->idlers, -1) <= zero_pt) { rv = apr_thread_mutex_lock(queue_info->idlers_mutex); if (rv != APR_SUCCESS) { AP_DEBUG_ASSERT(0); - /* See TODO in ap_queue_info_set_idle() */ apr_atomic_inc32(&(queue_info->idlers)); /* back out dec */ return rv; } @@ -205,11 +202,11 @@ apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t * queue_info, apr_uint32_t ap_queue_info_get_idlers(fd_queue_info_t * queue_info) { - apr_int32_t val; - val = (apr_int32_t)apr_atomic_read32(&queue_info->idlers) - zero_pt; - if (val < 0) + apr_uint32_t val; + val = apr_atomic_read32(&queue_info->idlers); + if (val <= zero_pt) return 0; - return val; + return val - zero_pt; } void ap_push_pool(fd_queue_info_t * queue_info, @@ -490,13 +487,20 @@ apr_status_t ap_queue_pop_something(fd_queue_t * queue, apr_socket_t ** sd, return rv; } -static apr_status_t queue_interrupt(fd_queue_t * queue, int all) +static apr_status_t queue_interrupt(fd_queue_t * queue, int all, int term) { apr_status_t rv; if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) { return rv; } + /* we must hold one_big_mutex when setting this... otherwise, + * we could end up setting it and waking everybody up just after a + * would-be popper checks it but right before they block + */ + if (term) { + queue->terminated = 1; + } if (all) apr_thread_cond_broadcast(queue->not_empty); else @@ -506,28 +510,15 @@ static apr_status_t queue_interrupt(fd_queue_t * queue, int all) apr_status_t ap_queue_interrupt_all(fd_queue_t * queue) { - return queue_interrupt(queue, 1); + return queue_interrupt(queue, 1, 0); } apr_status_t ap_queue_interrupt_one(fd_queue_t * queue) { - return queue_interrupt(queue, 0); + return queue_interrupt(queue, 0, 0); } apr_status_t ap_queue_term(fd_queue_t * queue) { - apr_status_t rv; - - if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) { - return rv; - } - /* we must hold one_big_mutex when setting this... otherwise, - * we could end up setting it and waking everybody up just after a - * would-be popper checks it but right before they block - */ - queue->terminated = 1; - if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) { - return rv; - } - return ap_queue_interrupt_all(queue); + return queue_interrupt(queue, 1, 1); } diff --git a/server/mpm/worker/fdqueue.c b/server/mpm/worker/fdqueue.c index fe5881b4ce..803267afbd 100644 --- a/server/mpm/worker/fdqueue.c +++ b/server/mpm/worker/fdqueue.c @@ -382,31 +382,30 @@ apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p) return rv; } -apr_status_t ap_queue_interrupt_all(fd_queue_t *queue) +static apr_status_t queue_interrupt_all(fd_queue_t *queue, int term) { apr_status_t rv; if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) { return rv; } + /* we must hold one_big_mutex when setting this... otherwise, + * we could end up setting it and waking everybody up just after a + * would-be popper checks it but right before they block + */ + if (term) { + queue->terminated = 1; + } apr_thread_cond_broadcast(queue->not_empty); return apr_thread_mutex_unlock(queue->one_big_mutex); } -apr_status_t ap_queue_term(fd_queue_t *queue) +apr_status_t ap_queue_interrupt_all(fd_queue_t *queue) { - apr_status_t rv; + return queue_interrupt_all(queue, 0); +} - if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) { - return rv; - } - /* we must hold one_big_mutex when setting this... otherwise, - * we could end up setting it and waking everybody up just after a - * would-be popper checks it but right before they block - */ - queue->terminated = 1; - if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) { - return rv; - } - return ap_queue_interrupt_all(queue); +apr_status_t ap_queue_term(fd_queue_t *queue) +{ + return queue_interrupt_all(queue, 1); } -- 2.40.0