From 0e24e6aebf52a2d61a24aa485443a1fae75cf9a6 Mon Sep 17 00:00:00 2001 From: Yann Ylavic Date: Fri, 9 Feb 2018 11:04:55 +0000 Subject: [PATCH] Revert r1823629 to stage backport commits too. git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/branches/2.4.x@1823636 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES | 6 - server/mpm/event/event.c | 278 +++++++++++++++--------------------- server/mpm/event/fdqueue.c | 83 ++++++----- server/mpm/worker/fdqueue.c | 29 ++-- 4 files changed, 180 insertions(+), 216 deletions(-) diff --git a/CHANGES b/CHANGES index c7c5939049..e706b53061 100644 --- a/CHANGES +++ b/CHANGES @@ -1,12 +1,6 @@ -*- coding: utf-8 -*- Changes with Apache 2.4.30 - *) mpm_event: Let the listener thread do its maintenance job on resources - shortage. PR 61979. [Yann Ylavic] - - *) mpm_event: Wakeup the listener to re-enable listening sockets. - [Yann Ylavic] - *) mod_ssl: The SSLCompression directive will now give an error if used with an OpenSSL build which does not support any compression methods. [Joe Orton] diff --git a/server/mpm/event/event.c b/server/mpm/event/event.c index 5e3186c44c..8e0f72dedc 100644 --- a/server/mpm/event/event.c +++ b/server/mpm/event/event.c @@ -173,10 +173,10 @@ static int max_workers = 0; /* MaxRequestWorkers */ static int server_limit = 0; /* ServerLimit */ static int thread_limit = 0; /* ThreadLimit */ static int had_healthy_child = 0; -static volatile int dying = 0; -static volatile int workers_may_exit = 0; -static volatile int start_thread_may_exit = 0; -static volatile int listener_may_exit = 0; +static int dying = 0; +static int workers_may_exit = 0; +static int start_thread_may_exit = 0; +static int listener_may_exit = 0; static int listener_is_wakeable = 0; /* Pollset supports APR_POLLSET_WAKEABLE */ static int num_listensocks = 0; static apr_int32_t conns_this_child; /* MaxConnectionsPerChild, only access @@ -287,7 +287,7 @@ static void TO_QUEUE_APPEND(struct timeout_queue *q, event_conn_state_t *el) apr_time_t next_expiry; APR_RING_INSERT_TAIL(&q->head, el, event_conn_state_t, timeout_list); - ++*q->total; + apr_atomic_inc32(q->total); ++q->count; /* Cheaply update the overall queues' next expiry according to the @@ -309,7 +309,7 @@ static void TO_QUEUE_REMOVE(struct timeout_queue *q, event_conn_state_t *el) { APR_RING_REMOVE(el, timeout_list); APR_RING_ELEM_INIT(el, timeout_list); - --*q->total; + apr_atomic_dec32(q->total); --q->count; } @@ -440,8 +440,6 @@ static pid_t ap_my_pid; /* Linux getpid() doesn't work except in main static pid_t parent_pid; static apr_os_thread_t *listener_os_thread; -static int ap_child_slot; /* Current child process slot in scoreboard */ - /* The LISTENER_SIGNAL signal will be sent from the main thread to the * listener thread to wake it up for graceful termination (what a child * process from an old generation does when the admin does "apachectl @@ -455,27 +453,19 @@ static int ap_child_slot; /* Current child process slot in scoreboard */ */ static apr_socket_t **worker_sockets; -static volatile apr_uint32_t listensocks_disabled; - -static void disable_listensocks(void) +static void disable_listensocks(int process_slot) { int i; - if (apr_atomic_cas32(&listensocks_disabled, 1, 0) != 0) { - return; + for (i = 0; i < num_listensocks; i++) { + apr_pollset_remove(event_pollset, &listener_pollfd[i]); } - if (event_pollset) { - for (i = 0; i < num_listensocks; i++) { - apr_pollset_remove(event_pollset, &listener_pollfd[i]); - } - } - ap_scoreboard_image->parent[ap_child_slot].not_accepting = 1; + ap_scoreboard_image->parent[process_slot].not_accepting = 1; } -static void enable_listensocks(void) +static void enable_listensocks(int process_slot) { int i; - if (listener_may_exit - || apr_atomic_cas32(&listensocks_disabled, 0, 1) != 1) { + if (listener_may_exit) { return; } ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(00457) @@ -493,29 +483,7 @@ static void enable_listensocks(void) * XXX: This is not yet optimal. If many workers suddenly become available, * XXX: the parent may kill some processes off too soon. */ - ap_scoreboard_image->parent[ap_child_slot].not_accepting = 0; -} - -static APR_INLINE apr_uint32_t listeners_disabled(void) -{ - return apr_atomic_read32(&listensocks_disabled); -} - -static APR_INLINE int connections_above_limit(void) -{ - apr_uint32_t i_count = ap_queue_info_get_idlers(worker_queue_info); - if (i_count > 0) { - apr_uint32_t c_count = apr_atomic_read32(&connection_count); - apr_uint32_t l_count = apr_atomic_read32(&lingering_count); - if (c_count <= l_count - /* Off by 'listeners_disabled()' to avoid flip flop */ - || c_count - l_count < (apr_uint32_t)threads_per_child + - (i_count - listeners_disabled()) * - (worker_factor / WORKER_FACTOR_SCALE)) { - return 0; - } - } - return 1; + ap_scoreboard_image->parent[process_slot].not_accepting = 0; } static void abort_socket_nonblocking(apr_socket_t *csd) @@ -573,18 +541,6 @@ static void close_worker_sockets(void) static void wakeup_listener(void) { listener_may_exit = 1; - disable_listensocks(); - - /* Unblock the listener if it's poll()ing */ - if (event_pollset && listener_is_wakeable) { - apr_pollset_wakeup(event_pollset); - } - - /* unblock the listener if it's waiting for a worker */ - if (worker_queue_info) { - ap_queue_info_term(worker_queue_info); - } - if (!listener_os_thread) { /* XXX there is an obscure path that this doesn't handle perfectly: * right after listener thread is created but before @@ -593,6 +549,15 @@ static void wakeup_listener(void) */ return; } + + /* Unblock the listener if it's poll()ing */ + if (listener_is_wakeable) { + apr_pollset_wakeup(event_pollset); + } + + /* unblock the listener if it's waiting for a worker */ + ap_queue_info_term(worker_queue_info); + /* * we should just be able to "kill(ap_my_pid, LISTENER_SIGNAL)" on all * platforms and wake up the listener thread since it is the only thread @@ -613,7 +578,7 @@ static int terminate_mode = ST_INIT; static void signal_threads(int mode) { - if (terminate_mode >= mode) { + if (terminate_mode == mode) { return; } terminate_mode = mode; @@ -747,7 +712,6 @@ static int child_fatal; static apr_status_t decrement_connection_count(void *cs_) { - int is_last_connection; event_conn_state_t *cs = cs_; switch (cs->pub.state) { case CONN_STATE_LINGER_NORMAL: @@ -760,14 +724,9 @@ static apr_status_t decrement_connection_count(void *cs_) default: break; } - /* Unblock the listener if it's waiting for connection_count = 0, - * or if the listening sockets were disabled due to limits and can - * now accept new connections. - */ - is_last_connection = !apr_atomic_dec32(&connection_count); - if (listener_is_wakeable - && ((is_last_connection && listener_may_exit) - || (listeners_disabled() && !connections_above_limit()))) { + /* Unblock the listener if it's waiting for connection_count = 0 */ + if (!apr_atomic_dec32(&connection_count) + && listener_is_wakeable && listener_may_exit) { apr_pollset_wakeup(event_pollset); } return APR_SUCCESS; @@ -1219,16 +1178,17 @@ static void check_infinite_requests(void) } } -static void close_listeners(int *closed) +static void close_listeners(int process_slot, int *closed) { if (!*closed) { int i; + disable_listensocks(process_slot); ap_close_listeners_ex(my_bucket->listeners); *closed = 1; dying = 1; - ap_scoreboard_image->parent[ap_child_slot].quiescing = 1; + ap_scoreboard_image->parent[process_slot].quiescing = 1; for (i = 0; i < threads_per_child; ++i) { - ap_update_child_status_from_indexes(ap_child_slot, i, + ap_update_child_status_from_indexes(process_slot, i, SERVER_GRACEFUL, NULL); } /* wake up the main thread */ @@ -1501,7 +1461,7 @@ static void process_timeout_queue(struct timeout_queue *q, struct timeout_queue *qp; apr_status_t rv; - if (!*q->total) { + if (!apr_atomic_read32(q->total)) { return; } @@ -1551,8 +1511,8 @@ static void process_timeout_queue(struct timeout_queue *q, APR_RING_UNSPLICE(first, last, timeout_list); APR_RING_SPLICE_TAIL(&trash, first, last, event_conn_state_t, timeout_list); - AP_DEBUG_ASSERT(*q->total >= count && qp->count >= count); - *q->total -= count; + AP_DEBUG_ASSERT(apr_atomic_read32(q->total) >= count); + apr_atomic_sub32(q->total, count); qp->count -= count; total += count; } @@ -1578,7 +1538,8 @@ static void process_keepalive_queue(apr_time_t timeout_time) if (!timeout_time) { ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf, "All workers are busy or dying, will close %u " - "keep-alive connections", *keepalive_q->total); + "keep-alive connections", + apr_atomic_read32(keepalive_q->total)); } process_timeout_queue(keepalive_q, timeout_time, start_lingering_close_nonblocking); @@ -1586,14 +1547,22 @@ static void process_keepalive_queue(apr_time_t timeout_time) static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) { + timer_event_t *te; apr_status_t rc; proc_info *ti = dummy; int process_slot = ti->pslot; struct process_score *ps = ap_get_scoreboard_process(process_slot); apr_pool_t *tpool = apr_thread_pool_get(thd); - int closed = 0; + void *csd = NULL; + apr_pool_t *ptrans; /* Pool for per-transaction stuff */ + ap_listen_rec *lr; int have_idle_worker = 0; - apr_time_t last_log; + const apr_pollfd_t *out_pfd; + apr_int32_t num = 0; + apr_interval_time_t timeout_interval; + apr_time_t timeout_time = 0, now, last_log; + listener_poll_type *pt; + int closed = 0, listeners_disabled = 0; last_log = apr_time_now(); free(ti); @@ -1602,9 +1571,8 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) if (rc != APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf, "failed to initialize pollset, " - "shutdown process now"); - resource_shortage = 1; - signal_threads(ST_UNGRACEFUL); + "attempting to shutdown process gracefully"); + signal_threads(ST_GRACEFUL); return NULL; } @@ -1615,23 +1583,18 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) apr_signal(LISTENER_SIGNAL, dummy_signal_handler); for (;;) { - timer_event_t *te; - const apr_pollfd_t *out_pfd; - apr_int32_t num = 0; - apr_interval_time_t timeout_interval; - apr_time_t now, timeout_time; int workers_were_busy = 0; - if (conns_this_child <= 0) - check_infinite_requests(); - if (listener_may_exit) { - close_listeners(&closed); + close_listeners(process_slot, &closed); if (terminate_mode == ST_UNGRACEFUL || apr_atomic_read32(&connection_count) == 0) break; } + if (conns_this_child <= 0) + check_infinite_requests(); + now = apr_time_now(); if (APLOGtrace6(ap_server_conf)) { /* trace log status every second */ @@ -1643,8 +1606,8 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) "keep-alive: %d lingering: %d suspended: %u)", apr_atomic_read32(&connection_count), apr_atomic_read32(&clogged_count), - *(volatile apr_uint32_t*)write_completion_q->total, - *(volatile apr_uint32_t*)keepalive_q->total, + apr_atomic_read32(write_completion_q->total), + apr_atomic_read32(keepalive_q->total), apr_atomic_read32(&lingering_count), apr_atomic_read32(&suspended_count)); if (dying) { @@ -1706,13 +1669,11 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) rc = apr_pollset_poll(event_pollset, timeout_interval, &num, &out_pfd); if (rc != APR_SUCCESS) { if (APR_STATUS_IS_EINTR(rc)) { - /* Woken up, if we are exiting or listeners are disabled we - * must fall through to kill kept-alive connections or test - * whether listeners should be re-enabled. Otherwise we only - * need to update timeouts (logic is above, so simply restart - * the loop). + /* Woken up, if we are exiting we must fall through to kill + * kept-alive connections, otherwise we only need to update + * timeouts (logic is above, so restart the loop). */ - if (!listener_may_exit && !listeners_disabled()) { + if (!listener_may_exit) { continue; } timeout_time = 0; @@ -1727,14 +1688,14 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) } if (listener_may_exit) { - close_listeners(&closed); + close_listeners(process_slot, &closed); if (terminate_mode == ST_UNGRACEFUL || apr_atomic_read32(&connection_count) == 0) break; } - for (; num; --num, ++out_pfd) { - listener_poll_type *pt = (listener_poll_type *) out_pfd->client_data; + while (num) { + pt = (listener_poll_type *) out_pfd->client_data; if (pt->type == PT_CSD) { /* one of the sockets is readable */ event_conn_state_t *cs = (event_conn_state_t *) pt->baton; @@ -1794,16 +1755,24 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) ap_assert(0); } } - else if (pt->type == PT_ACCEPT && !listeners_disabled()) { + else if (pt->type == PT_ACCEPT) { /* A Listener Socket is ready for an accept() */ if (workers_were_busy) { - disable_listensocks(); + if (!listeners_disabled) + disable_listensocks(process_slot); + listeners_disabled = 1; ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, "All workers busy, not accepting new conns " "in this process"); } - else if (connections_above_limit()) { - disable_listensocks(); + else if ( (int)apr_atomic_read32(&connection_count) + - (int)apr_atomic_read32(&lingering_count) + > threads_per_child + + ap_queue_info_get_idlers(worker_queue_info) * + worker_factor / WORKER_FACTOR_SCALE) + { + if (!listeners_disabled) + disable_listensocks(process_slot); ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, "Too many open connections (%u), " "not accepting new conns in this process", @@ -1811,41 +1780,34 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf, "Idle workers: %u", ap_queue_info_get_idlers(worker_queue_info)); - workers_were_busy = 1; + listeners_disabled = 1; } - else if (!listener_may_exit) { - void *csd = NULL; - ap_listen_rec *lr = (ap_listen_rec *) pt->baton; - apr_pool_t *ptrans; /* Pool for per-transaction stuff */ + else if (listeners_disabled) { + listeners_disabled = 0; + enable_listensocks(process_slot); + } + if (!listeners_disabled) { + lr = (ap_listen_rec *) pt->baton; ap_pop_pool(&ptrans, worker_queue_info); if (ptrans == NULL) { /* create a new transaction pool for each accepted socket */ - apr_allocator_t *allocator = NULL; - - rc = apr_allocator_create(&allocator); - if (rc == APR_SUCCESS) { - apr_allocator_max_free_set(allocator, - ap_max_mem_free); - rc = apr_pool_create_ex(&ptrans, pconf, NULL, - allocator); - if (rc == APR_SUCCESS) { - apr_pool_tag(ptrans, "transaction"); - apr_allocator_owner_set(allocator, ptrans); - } - } - if (rc != APR_SUCCESS) { + apr_allocator_t *allocator; + + apr_allocator_create(&allocator); + apr_allocator_max_free_set(allocator, + ap_max_mem_free); + apr_pool_create_ex(&ptrans, pconf, NULL, allocator); + apr_allocator_owner_set(allocator, ptrans); + if (ptrans == NULL) { ap_log_error(APLOG_MARK, APLOG_CRIT, rc, ap_server_conf, APLOGNO(03097) "Failed to create transaction pool"); - if (allocator) { - apr_allocator_destroy(allocator); - } - resource_shortage = 1; signal_threads(ST_GRACEFUL); - continue; + return NULL; } } + apr_pool_tag(ptrans, "transaction"); get_worker(&have_idle_worker, 1, &workers_were_busy); rc = lr->accept_func(&csd, lr, ptrans); @@ -1872,7 +1834,9 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) } } } /* if:else on pt->type */ - } /* for processing poll */ + out_pfd++; + num--; + } /* while for processing poll */ /* XXX possible optimization: stash the current time for use as * r->request_time for new requests @@ -1912,14 +1876,14 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) apr_thread_mutex_unlock(timeout_mutex); - ps->keep_alive = *(volatile apr_uint32_t*)keepalive_q->total; - ps->write_completion = *(volatile apr_uint32_t*)write_completion_q->total; + ps->keep_alive = apr_atomic_read32(keepalive_q->total); + ps->write_completion = apr_atomic_read32(write_completion_q->total); ps->connections = apr_atomic_read32(&connection_count); ps->suspended = apr_atomic_read32(&suspended_count); ps->lingering_close = apr_atomic_read32(&lingering_count); } else if ((workers_were_busy || dying) - && *(volatile apr_uint32_t*)keepalive_q->total) { + && apr_atomic_read32(keepalive_q->total)) { apr_thread_mutex_lock(timeout_mutex); process_keepalive_queue(0); /* kill'em all \m/ */ apr_thread_mutex_unlock(timeout_mutex); @@ -1944,14 +1908,22 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) } } - if (listeners_disabled() - && !workers_were_busy - && !connections_above_limit()) { - enable_listensocks(); + if (listeners_disabled && !workers_were_busy + && (int)apr_atomic_read32(&connection_count) + - (int)apr_atomic_read32(&lingering_count) + < ((int)ap_queue_info_get_idlers(worker_queue_info) - 1) + * worker_factor / WORKER_FACTOR_SCALE + threads_per_child) + { + listeners_disabled = 0; + enable_listensocks(process_slot); } - } /* listener main loop */ + /* + * XXX: do we need to set some timeout that re-enables the listensocks + * XXX: in case no other event occurs? + */ + } /* listener main loop */ - close_listeners(&closed); + close_listeners(process_slot, &closed); ap_queue_term(worker_queue); apr_thread_exit(thd, APR_SUCCESS); @@ -1998,8 +1970,12 @@ static void *APR_THREAD_FUNC worker_thread(apr_thread_t * thd, void *dummy) proc_info *ti = dummy; int process_slot = ti->pslot; int thread_slot = ti->tslot; + apr_socket_t *csd = NULL; + event_conn_state_t *cs; + apr_pool_t *ptrans; /* Pool for per-transaction stuff */ apr_status_t rv; int is_idle = 0; + timer_event_t *te = NULL; free(ti); @@ -2010,11 +1986,6 @@ static void *APR_THREAD_FUNC worker_thread(apr_thread_t * thd, void *dummy) SERVER_STARTING, NULL); while (!workers_may_exit) { - apr_socket_t *csd = NULL; - event_conn_state_t *cs; - timer_event_t *te = NULL; - apr_pool_t *ptrans; /* Pool for per-transaction stuff */ - if (!is_idle) { rv = ap_queue_info_set_idle(worker_queue_info, NULL); if (rv != APR_SUCCESS) { @@ -2038,6 +2009,7 @@ static void *APR_THREAD_FUNC worker_thread(apr_thread_t * thd, void *dummy) break; } + te = NULL; rv = ap_queue_pop_something(worker_queue, &csd, &cs, &ptrans, &te); if (rv != APR_SUCCESS) { @@ -2172,14 +2144,9 @@ static void *APR_THREAD_FUNC start_threads(apr_thread_t * thd, void *dummy) int loops; int prev_threads_created; int max_recycled_pools = -1; - const int good_methods[] = { APR_POLLSET_KQUEUE, - APR_POLLSET_PORT, - APR_POLLSET_EPOLL }; - /* XXX: K-A or lingering close connection included in the async factor */ - const apr_uint32_t async_factor = worker_factor / WORKER_FACTOR_SCALE; - const apr_uint32_t pollset_size = (apr_uint32_t)num_listensocks + - (apr_uint32_t)threads_per_child * - (async_factor > 2 ? async_factor : 2); + int good_methods[] = {APR_POLLSET_KQUEUE, APR_POLLSET_PORT, APR_POLLSET_EPOLL}; + /* XXX don't we need more to handle K-A or lingering close? */ + const apr_uint32_t pollset_size = threads_per_child * 2; /* We must create the fd queues before we start up the listener * and worker threads. */ @@ -2399,7 +2366,6 @@ static void child_main(int child_num_arg, int child_bucket) retained->mpm->mpm_state = AP_MPMQ_STARTING; ap_my_pid = getpid(); - ap_child_slot = child_num_arg; ap_fatal_signal_child_setup(ap_server_conf); apr_pool_create(&pchild, pconf); @@ -3374,11 +3340,6 @@ static int event_pre_config(apr_pool_t * pconf, apr_pool_t * plog, had_healthy_child = 0; ap_extended_status = 0; - event_pollset = NULL; - worker_queue_info = NULL; - listener_os_thread = NULL; - listensocks_disabled = 0; - return OK; } @@ -3792,9 +3753,8 @@ static const char *set_worker_factor(cmd_parms * cmd, void *dummy, return "AsyncRequestWorkerFactor argument must be a positive number"; worker_factor = val * WORKER_FACTOR_SCALE; - if (worker_factor < WORKER_FACTOR_SCALE) { - worker_factor = WORKER_FACTOR_SCALE; - } + if (worker_factor == 0) + worker_factor = 1; return NULL; } diff --git a/server/mpm/event/fdqueue.c b/server/mpm/event/fdqueue.c index 175d86a720..64b318d0e0 100644 --- a/server/mpm/event/fdqueue.c +++ b/server/mpm/event/fdqueue.c @@ -27,18 +27,18 @@ struct recycled_pool struct fd_queue_info_t { - apr_uint32_t volatile idlers; /** - * >= zero_pt: number of idle worker threads - * < zero_pt: number of threads blocked, - * waiting for an idle worker - */ + apr_uint32_t idlers; /** + * >= zero_pt: number of idle worker threads + * < zero_pt: number of threads blocked waiting + * for an idle worker + */ apr_thread_mutex_t *idlers_mutex; apr_thread_cond_t *wait_for_idler; int terminated; int max_idlers; int max_recycled_pools; apr_uint32_t recycled_pools_count; - struct recycled_pool *volatile recycled_pools; + struct recycled_pool *recycled_pools; }; static apr_status_t queue_info_cleanup(void *data_) @@ -97,11 +97,15 @@ apr_status_t ap_queue_info_set_idle(fd_queue_info_t * queue_info, apr_pool_t * pool_to_recycle) { apr_status_t rv; + apr_int32_t prev_idlers; ap_push_pool(queue_info, pool_to_recycle); + /* Atomically increment the count of idle workers */ + prev_idlers = apr_atomic_inc32(&(queue_info->idlers)) - zero_pt; + /* If other threads are waiting on a worker, wake one up */ - if (apr_atomic_inc32(&queue_info->idlers) < zero_pt) { + if (prev_idlers < 0) { rv = apr_thread_mutex_lock(queue_info->idlers_mutex); if (rv != APR_SUCCESS) { AP_DEBUG_ASSERT(0); @@ -123,32 +127,31 @@ apr_status_t ap_queue_info_set_idle(fd_queue_info_t * queue_info, apr_status_t ap_queue_info_try_get_idler(fd_queue_info_t * queue_info) { - /* Don't block if there isn't any idle worker. */ - for (;;) { - apr_uint32_t idlers = queue_info->idlers; - if (idlers <= zero_pt) { - return APR_EAGAIN; - } - if (apr_atomic_cas32(&queue_info->idlers, idlers - 1, - idlers) == idlers) { - return APR_SUCCESS; - } + apr_int32_t new_idlers; + new_idlers = apr_atomic_add32(&(queue_info->idlers), -1) - zero_pt; + if (--new_idlers <= 0) { + apr_atomic_inc32(&(queue_info->idlers)); /* back out dec */ + return APR_EAGAIN; } + return APR_SUCCESS; } apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t * queue_info, int *had_to_block) { apr_status_t rv; + apr_int32_t prev_idlers; - /* Block if there isn't any idle worker. - * apr_atomic_add32(x, -1) does the same as dec32(x), except - * that it returns the previous value (unlike dec32's bool). - */ - if (apr_atomic_add32(&queue_info->idlers, -1) <= zero_pt) { + /* Atomically decrement the idle worker count, saving the old value */ + /* See TODO in ap_queue_info_set_idle() */ + prev_idlers = apr_atomic_add32(&(queue_info->idlers), -1) - zero_pt; + + /* Block if there weren't any idle workers */ + if (prev_idlers <= 0) { rv = apr_thread_mutex_lock(queue_info->idlers_mutex); if (rv != APR_SUCCESS) { AP_DEBUG_ASSERT(0); + /* See TODO in ap_queue_info_set_idle() */ apr_atomic_inc32(&(queue_info->idlers)); /* back out dec */ return rv; } @@ -202,11 +205,11 @@ apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t * queue_info, apr_uint32_t ap_queue_info_get_idlers(fd_queue_info_t * queue_info) { - apr_uint32_t val; - val = apr_atomic_read32(&queue_info->idlers); - if (val <= zero_pt) + apr_int32_t val; + val = (apr_int32_t)apr_atomic_read32(&queue_info->idlers) - zero_pt; + if (val < 0) return 0; - return val - zero_pt; + return val; } void ap_push_pool(fd_queue_info_t * queue_info, @@ -487,20 +490,13 @@ apr_status_t ap_queue_pop_something(fd_queue_t * queue, apr_socket_t ** sd, return rv; } -static apr_status_t queue_interrupt(fd_queue_t * queue, int all, int term) +static apr_status_t queue_interrupt(fd_queue_t * queue, int all) { apr_status_t rv; if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) { return rv; } - /* we must hold one_big_mutex when setting this... otherwise, - * we could end up setting it and waking everybody up just after a - * would-be popper checks it but right before they block - */ - if (term) { - queue->terminated = 1; - } if (all) apr_thread_cond_broadcast(queue->not_empty); else @@ -510,15 +506,28 @@ static apr_status_t queue_interrupt(fd_queue_t * queue, int all, int term) apr_status_t ap_queue_interrupt_all(fd_queue_t * queue) { - return queue_interrupt(queue, 1, 0); + return queue_interrupt(queue, 1); } apr_status_t ap_queue_interrupt_one(fd_queue_t * queue) { - return queue_interrupt(queue, 0, 0); + return queue_interrupt(queue, 0); } apr_status_t ap_queue_term(fd_queue_t * queue) { - return queue_interrupt(queue, 1, 1); + apr_status_t rv; + + if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) { + return rv; + } + /* we must hold one_big_mutex when setting this... otherwise, + * we could end up setting it and waking everybody up just after a + * would-be popper checks it but right before they block + */ + queue->terminated = 1; + if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) { + return rv; + } + return ap_queue_interrupt_all(queue); } diff --git a/server/mpm/worker/fdqueue.c b/server/mpm/worker/fdqueue.c index 803267afbd..fe5881b4ce 100644 --- a/server/mpm/worker/fdqueue.c +++ b/server/mpm/worker/fdqueue.c @@ -382,30 +382,31 @@ apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p) return rv; } -static apr_status_t queue_interrupt_all(fd_queue_t *queue, int term) +apr_status_t ap_queue_interrupt_all(fd_queue_t *queue) { apr_status_t rv; if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) { return rv; } - /* we must hold one_big_mutex when setting this... otherwise, - * we could end up setting it and waking everybody up just after a - * would-be popper checks it but right before they block - */ - if (term) { - queue->terminated = 1; - } apr_thread_cond_broadcast(queue->not_empty); return apr_thread_mutex_unlock(queue->one_big_mutex); } -apr_status_t ap_queue_interrupt_all(fd_queue_t *queue) -{ - return queue_interrupt_all(queue, 0); -} - apr_status_t ap_queue_term(fd_queue_t *queue) { - return queue_interrupt_all(queue, 1); + apr_status_t rv; + + if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) { + return rv; + } + /* we must hold one_big_mutex when setting this... otherwise, + * we could end up setting it and waking everybody up just after a + * would-be popper checks it but right before they block + */ + queue->terminated = 1; + if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) { + return rv; + } + return ap_queue_interrupt_all(queue); } -- 2.40.0