From: Stefan Eissing Date: Fri, 9 Feb 2018 10:28:07 +0000 (+0000) Subject: On the 2.4.x branch: X-Git-Tag: 2.4.30~104 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=222e7b07f5385383650c9b879382aced95bdeaf2;p=apache On the 2.4.x branch: Merge of r1605328,r1629576,r1643279,r1703241,r1802535,r1819847,r1819848,r1819852,r1819853,r1819855,r1821562,r1821558,r1821561,r1821595 from trunk *) event: staging changes (incremental patches) to sync 2.4.x with trunk. [Reverted by r1823636] git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/branches/2.4.x@1823629 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/CHANGES b/CHANGES index e706b53061..c7c5939049 100644 --- a/CHANGES +++ b/CHANGES @@ -1,6 +1,12 @@ -*- coding: utf-8 -*- Changes with Apache 2.4.30 + *) mpm_event: Let the listener thread do its maintenance job on resources + shortage. PR 61979. [Yann Ylavic] + + *) mpm_event: Wakeup the listener to re-enable listening sockets. + [Yann Ylavic] + *) mod_ssl: The SSLCompression directive will now give an error if used with an OpenSSL build which does not support any compression methods. [Joe Orton] diff --git a/server/mpm/event/event.c b/server/mpm/event/event.c index 8e0f72dedc..5e3186c44c 100644 --- a/server/mpm/event/event.c +++ b/server/mpm/event/event.c @@ -173,10 +173,10 @@ static int max_workers = 0; /* MaxRequestWorkers */ static int server_limit = 0; /* ServerLimit */ static int thread_limit = 0; /* ThreadLimit */ static int had_healthy_child = 0; -static int dying = 0; -static int workers_may_exit = 0; -static int start_thread_may_exit = 0; -static int listener_may_exit = 0; +static volatile int dying = 0; +static volatile int workers_may_exit = 0; +static volatile int start_thread_may_exit = 0; +static volatile int listener_may_exit = 0; static int listener_is_wakeable = 0; /* Pollset supports APR_POLLSET_WAKEABLE */ static int num_listensocks = 0; static apr_int32_t conns_this_child; /* MaxConnectionsPerChild, only access @@ -287,7 +287,7 @@ static void TO_QUEUE_APPEND(struct timeout_queue *q, event_conn_state_t *el) apr_time_t next_expiry; APR_RING_INSERT_TAIL(&q->head, el, event_conn_state_t, timeout_list); - apr_atomic_inc32(q->total); + ++*q->total; ++q->count; /* Cheaply update the overall queues' next expiry according to the @@ -309,7 +309,7 @@ static void TO_QUEUE_REMOVE(struct timeout_queue *q, event_conn_state_t *el) { APR_RING_REMOVE(el, timeout_list); APR_RING_ELEM_INIT(el, timeout_list); - apr_atomic_dec32(q->total); + --*q->total; --q->count; } @@ -440,6 +440,8 @@ static pid_t ap_my_pid; /* Linux getpid() doesn't work except in main static pid_t parent_pid; static apr_os_thread_t *listener_os_thread; +static int ap_child_slot; /* Current child process slot in scoreboard */ + /* The LISTENER_SIGNAL signal will be sent from the main thread to the * listener thread to wake it up for graceful termination (what a child * process from an old generation does when the admin does "apachectl @@ -453,19 +455,27 @@ static apr_os_thread_t *listener_os_thread; */ static apr_socket_t **worker_sockets; -static void disable_listensocks(int process_slot) +static volatile apr_uint32_t listensocks_disabled; + +static void disable_listensocks(void) { int i; - for (i = 0; i < num_listensocks; i++) { - apr_pollset_remove(event_pollset, &listener_pollfd[i]); + if (apr_atomic_cas32(&listensocks_disabled, 1, 0) != 0) { + return; } - ap_scoreboard_image->parent[process_slot].not_accepting = 1; + if (event_pollset) { + for (i = 0; i < num_listensocks; i++) { + apr_pollset_remove(event_pollset, &listener_pollfd[i]); + } + } + ap_scoreboard_image->parent[ap_child_slot].not_accepting = 1; } -static void enable_listensocks(int process_slot) +static void enable_listensocks(void) { int i; - if (listener_may_exit) { + if (listener_may_exit + || apr_atomic_cas32(&listensocks_disabled, 0, 1) != 1) { return; } ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(00457) @@ -483,7 +493,29 @@ static void enable_listensocks(int process_slot) * XXX: This is not yet optimal. If many workers suddenly become available, * XXX: the parent may kill some processes off too soon. */ - ap_scoreboard_image->parent[process_slot].not_accepting = 0; + ap_scoreboard_image->parent[ap_child_slot].not_accepting = 0; +} + +static APR_INLINE apr_uint32_t listeners_disabled(void) +{ + return apr_atomic_read32(&listensocks_disabled); +} + +static APR_INLINE int connections_above_limit(void) +{ + apr_uint32_t i_count = ap_queue_info_get_idlers(worker_queue_info); + if (i_count > 0) { + apr_uint32_t c_count = apr_atomic_read32(&connection_count); + apr_uint32_t l_count = apr_atomic_read32(&lingering_count); + if (c_count <= l_count + /* Off by 'listeners_disabled()' to avoid flip flop */ + || c_count - l_count < (apr_uint32_t)threads_per_child + + (i_count - listeners_disabled()) * + (worker_factor / WORKER_FACTOR_SCALE)) { + return 0; + } + } + return 1; } static void abort_socket_nonblocking(apr_socket_t *csd) @@ -541,6 +573,18 @@ static void close_worker_sockets(void) static void wakeup_listener(void) { listener_may_exit = 1; + disable_listensocks(); + + /* Unblock the listener if it's poll()ing */ + if (event_pollset && listener_is_wakeable) { + apr_pollset_wakeup(event_pollset); + } + + /* unblock the listener if it's waiting for a worker */ + if (worker_queue_info) { + ap_queue_info_term(worker_queue_info); + } + if (!listener_os_thread) { /* XXX there is an obscure path that this doesn't handle perfectly: * right after listener thread is created but before @@ -549,15 +593,6 @@ static void wakeup_listener(void) */ return; } - - /* Unblock the listener if it's poll()ing */ - if (listener_is_wakeable) { - apr_pollset_wakeup(event_pollset); - } - - /* unblock the listener if it's waiting for a worker */ - ap_queue_info_term(worker_queue_info); - /* * we should just be able to "kill(ap_my_pid, LISTENER_SIGNAL)" on all * platforms and wake up the listener thread since it is the only thread @@ -578,7 +613,7 @@ static int terminate_mode = ST_INIT; static void signal_threads(int mode) { - if (terminate_mode == mode) { + if (terminate_mode >= mode) { return; } terminate_mode = mode; @@ -712,6 +747,7 @@ static int child_fatal; static apr_status_t decrement_connection_count(void *cs_) { + int is_last_connection; event_conn_state_t *cs = cs_; switch (cs->pub.state) { case CONN_STATE_LINGER_NORMAL: @@ -724,9 +760,14 @@ static apr_status_t decrement_connection_count(void *cs_) default: break; } - /* Unblock the listener if it's waiting for connection_count = 0 */ - if (!apr_atomic_dec32(&connection_count) - && listener_is_wakeable && listener_may_exit) { + /* Unblock the listener if it's waiting for connection_count = 0, + * or if the listening sockets were disabled due to limits and can + * now accept new connections. + */ + is_last_connection = !apr_atomic_dec32(&connection_count); + if (listener_is_wakeable + && ((is_last_connection && listener_may_exit) + || (listeners_disabled() && !connections_above_limit()))) { apr_pollset_wakeup(event_pollset); } return APR_SUCCESS; @@ -1178,17 +1219,16 @@ static void check_infinite_requests(void) } } -static void close_listeners(int process_slot, int *closed) +static void close_listeners(int *closed) { if (!*closed) { int i; - disable_listensocks(process_slot); ap_close_listeners_ex(my_bucket->listeners); *closed = 1; dying = 1; - ap_scoreboard_image->parent[process_slot].quiescing = 1; + ap_scoreboard_image->parent[ap_child_slot].quiescing = 1; for (i = 0; i < threads_per_child; ++i) { - ap_update_child_status_from_indexes(process_slot, i, + ap_update_child_status_from_indexes(ap_child_slot, i, SERVER_GRACEFUL, NULL); } /* wake up the main thread */ @@ -1461,7 +1501,7 @@ static void process_timeout_queue(struct timeout_queue *q, struct timeout_queue *qp; apr_status_t rv; - if (!apr_atomic_read32(q->total)) { + if (!*q->total) { return; } @@ -1511,8 +1551,8 @@ static void process_timeout_queue(struct timeout_queue *q, APR_RING_UNSPLICE(first, last, timeout_list); APR_RING_SPLICE_TAIL(&trash, first, last, event_conn_state_t, timeout_list); - AP_DEBUG_ASSERT(apr_atomic_read32(q->total) >= count); - apr_atomic_sub32(q->total, count); + AP_DEBUG_ASSERT(*q->total >= count && qp->count >= count); + *q->total -= count; qp->count -= count; total += count; } @@ -1538,8 +1578,7 @@ static void process_keepalive_queue(apr_time_t timeout_time) if (!timeout_time) { ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf, "All workers are busy or dying, will close %u " - "keep-alive connections", - apr_atomic_read32(keepalive_q->total)); + "keep-alive connections", *keepalive_q->total); } process_timeout_queue(keepalive_q, timeout_time, start_lingering_close_nonblocking); @@ -1547,22 +1586,14 @@ static void process_keepalive_queue(apr_time_t timeout_time) static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) { - timer_event_t *te; apr_status_t rc; proc_info *ti = dummy; int process_slot = ti->pslot; struct process_score *ps = ap_get_scoreboard_process(process_slot); apr_pool_t *tpool = apr_thread_pool_get(thd); - void *csd = NULL; - apr_pool_t *ptrans; /* Pool for per-transaction stuff */ - ap_listen_rec *lr; + int closed = 0; int have_idle_worker = 0; - const apr_pollfd_t *out_pfd; - apr_int32_t num = 0; - apr_interval_time_t timeout_interval; - apr_time_t timeout_time = 0, now, last_log; - listener_poll_type *pt; - int closed = 0, listeners_disabled = 0; + apr_time_t last_log; last_log = apr_time_now(); free(ti); @@ -1571,8 +1602,9 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) if (rc != APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf, "failed to initialize pollset, " - "attempting to shutdown process gracefully"); - signal_threads(ST_GRACEFUL); + "shutdown process now"); + resource_shortage = 1; + signal_threads(ST_UNGRACEFUL); return NULL; } @@ -1583,18 +1615,23 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) apr_signal(LISTENER_SIGNAL, dummy_signal_handler); for (;;) { + timer_event_t *te; + const apr_pollfd_t *out_pfd; + apr_int32_t num = 0; + apr_interval_time_t timeout_interval; + apr_time_t now, timeout_time; int workers_were_busy = 0; + if (conns_this_child <= 0) + check_infinite_requests(); + if (listener_may_exit) { - close_listeners(process_slot, &closed); + close_listeners(&closed); if (terminate_mode == ST_UNGRACEFUL || apr_atomic_read32(&connection_count) == 0) break; } - if (conns_this_child <= 0) - check_infinite_requests(); - now = apr_time_now(); if (APLOGtrace6(ap_server_conf)) { /* trace log status every second */ @@ -1606,8 +1643,8 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) "keep-alive: %d lingering: %d suspended: %u)", apr_atomic_read32(&connection_count), apr_atomic_read32(&clogged_count), - apr_atomic_read32(write_completion_q->total), - apr_atomic_read32(keepalive_q->total), + *(volatile apr_uint32_t*)write_completion_q->total, + *(volatile apr_uint32_t*)keepalive_q->total, apr_atomic_read32(&lingering_count), apr_atomic_read32(&suspended_count)); if (dying) { @@ -1669,11 +1706,13 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) rc = apr_pollset_poll(event_pollset, timeout_interval, &num, &out_pfd); if (rc != APR_SUCCESS) { if (APR_STATUS_IS_EINTR(rc)) { - /* Woken up, if we are exiting we must fall through to kill - * kept-alive connections, otherwise we only need to update - * timeouts (logic is above, so restart the loop). + /* Woken up, if we are exiting or listeners are disabled we + * must fall through to kill kept-alive connections or test + * whether listeners should be re-enabled. Otherwise we only + * need to update timeouts (logic is above, so simply restart + * the loop). */ - if (!listener_may_exit) { + if (!listener_may_exit && !listeners_disabled()) { continue; } timeout_time = 0; @@ -1688,14 +1727,14 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) } if (listener_may_exit) { - close_listeners(process_slot, &closed); + close_listeners(&closed); if (terminate_mode == ST_UNGRACEFUL || apr_atomic_read32(&connection_count) == 0) break; } - while (num) { - pt = (listener_poll_type *) out_pfd->client_data; + for (; num; --num, ++out_pfd) { + listener_poll_type *pt = (listener_poll_type *) out_pfd->client_data; if (pt->type == PT_CSD) { /* one of the sockets is readable */ event_conn_state_t *cs = (event_conn_state_t *) pt->baton; @@ -1755,24 +1794,16 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) ap_assert(0); } } - else if (pt->type == PT_ACCEPT) { + else if (pt->type == PT_ACCEPT && !listeners_disabled()) { /* A Listener Socket is ready for an accept() */ if (workers_were_busy) { - if (!listeners_disabled) - disable_listensocks(process_slot); - listeners_disabled = 1; + disable_listensocks(); ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, "All workers busy, not accepting new conns " "in this process"); } - else if ( (int)apr_atomic_read32(&connection_count) - - (int)apr_atomic_read32(&lingering_count) - > threads_per_child - + ap_queue_info_get_idlers(worker_queue_info) * - worker_factor / WORKER_FACTOR_SCALE) - { - if (!listeners_disabled) - disable_listensocks(process_slot); + else if (connections_above_limit()) { + disable_listensocks(); ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, "Too many open connections (%u), " "not accepting new conns in this process", @@ -1780,34 +1811,41 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf, "Idle workers: %u", ap_queue_info_get_idlers(worker_queue_info)); - listeners_disabled = 1; + workers_were_busy = 1; } - else if (listeners_disabled) { - listeners_disabled = 0; - enable_listensocks(process_slot); - } - if (!listeners_disabled) { - lr = (ap_listen_rec *) pt->baton; + else if (!listener_may_exit) { + void *csd = NULL; + ap_listen_rec *lr = (ap_listen_rec *) pt->baton; + apr_pool_t *ptrans; /* Pool for per-transaction stuff */ ap_pop_pool(&ptrans, worker_queue_info); if (ptrans == NULL) { /* create a new transaction pool for each accepted socket */ - apr_allocator_t *allocator; - - apr_allocator_create(&allocator); - apr_allocator_max_free_set(allocator, - ap_max_mem_free); - apr_pool_create_ex(&ptrans, pconf, NULL, allocator); - apr_allocator_owner_set(allocator, ptrans); - if (ptrans == NULL) { + apr_allocator_t *allocator = NULL; + + rc = apr_allocator_create(&allocator); + if (rc == APR_SUCCESS) { + apr_allocator_max_free_set(allocator, + ap_max_mem_free); + rc = apr_pool_create_ex(&ptrans, pconf, NULL, + allocator); + if (rc == APR_SUCCESS) { + apr_pool_tag(ptrans, "transaction"); + apr_allocator_owner_set(allocator, ptrans); + } + } + if (rc != APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_CRIT, rc, ap_server_conf, APLOGNO(03097) "Failed to create transaction pool"); + if (allocator) { + apr_allocator_destroy(allocator); + } + resource_shortage = 1; signal_threads(ST_GRACEFUL); - return NULL; + continue; } } - apr_pool_tag(ptrans, "transaction"); get_worker(&have_idle_worker, 1, &workers_were_busy); rc = lr->accept_func(&csd, lr, ptrans); @@ -1834,9 +1872,7 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) } } } /* if:else on pt->type */ - out_pfd++; - num--; - } /* while for processing poll */ + } /* for processing poll */ /* XXX possible optimization: stash the current time for use as * r->request_time for new requests @@ -1876,14 +1912,14 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) apr_thread_mutex_unlock(timeout_mutex); - ps->keep_alive = apr_atomic_read32(keepalive_q->total); - ps->write_completion = apr_atomic_read32(write_completion_q->total); + ps->keep_alive = *(volatile apr_uint32_t*)keepalive_q->total; + ps->write_completion = *(volatile apr_uint32_t*)write_completion_q->total; ps->connections = apr_atomic_read32(&connection_count); ps->suspended = apr_atomic_read32(&suspended_count); ps->lingering_close = apr_atomic_read32(&lingering_count); } else if ((workers_were_busy || dying) - && apr_atomic_read32(keepalive_q->total)) { + && *(volatile apr_uint32_t*)keepalive_q->total) { apr_thread_mutex_lock(timeout_mutex); process_keepalive_queue(0); /* kill'em all \m/ */ apr_thread_mutex_unlock(timeout_mutex); @@ -1908,22 +1944,14 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) } } - if (listeners_disabled && !workers_were_busy - && (int)apr_atomic_read32(&connection_count) - - (int)apr_atomic_read32(&lingering_count) - < ((int)ap_queue_info_get_idlers(worker_queue_info) - 1) - * worker_factor / WORKER_FACTOR_SCALE + threads_per_child) - { - listeners_disabled = 0; - enable_listensocks(process_slot); + if (listeners_disabled() + && !workers_were_busy + && !connections_above_limit()) { + enable_listensocks(); } - /* - * XXX: do we need to set some timeout that re-enables the listensocks - * XXX: in case no other event occurs? - */ - } /* listener main loop */ + } /* listener main loop */ - close_listeners(process_slot, &closed); + close_listeners(&closed); ap_queue_term(worker_queue); apr_thread_exit(thd, APR_SUCCESS); @@ -1970,12 +1998,8 @@ static void *APR_THREAD_FUNC worker_thread(apr_thread_t * thd, void *dummy) proc_info *ti = dummy; int process_slot = ti->pslot; int thread_slot = ti->tslot; - apr_socket_t *csd = NULL; - event_conn_state_t *cs; - apr_pool_t *ptrans; /* Pool for per-transaction stuff */ apr_status_t rv; int is_idle = 0; - timer_event_t *te = NULL; free(ti); @@ -1986,6 +2010,11 @@ static void *APR_THREAD_FUNC worker_thread(apr_thread_t * thd, void *dummy) SERVER_STARTING, NULL); while (!workers_may_exit) { + apr_socket_t *csd = NULL; + event_conn_state_t *cs; + timer_event_t *te = NULL; + apr_pool_t *ptrans; /* Pool for per-transaction stuff */ + if (!is_idle) { rv = ap_queue_info_set_idle(worker_queue_info, NULL); if (rv != APR_SUCCESS) { @@ -2009,7 +2038,6 @@ static void *APR_THREAD_FUNC worker_thread(apr_thread_t * thd, void *dummy) break; } - te = NULL; rv = ap_queue_pop_something(worker_queue, &csd, &cs, &ptrans, &te); if (rv != APR_SUCCESS) { @@ -2144,9 +2172,14 @@ static void *APR_THREAD_FUNC start_threads(apr_thread_t * thd, void *dummy) int loops; int prev_threads_created; int max_recycled_pools = -1; - int good_methods[] = {APR_POLLSET_KQUEUE, APR_POLLSET_PORT, APR_POLLSET_EPOLL}; - /* XXX don't we need more to handle K-A or lingering close? */ - const apr_uint32_t pollset_size = threads_per_child * 2; + const int good_methods[] = { APR_POLLSET_KQUEUE, + APR_POLLSET_PORT, + APR_POLLSET_EPOLL }; + /* XXX: K-A or lingering close connection included in the async factor */ + const apr_uint32_t async_factor = worker_factor / WORKER_FACTOR_SCALE; + const apr_uint32_t pollset_size = (apr_uint32_t)num_listensocks + + (apr_uint32_t)threads_per_child * + (async_factor > 2 ? async_factor : 2); /* We must create the fd queues before we start up the listener * and worker threads. */ @@ -2366,6 +2399,7 @@ static void child_main(int child_num_arg, int child_bucket) retained->mpm->mpm_state = AP_MPMQ_STARTING; ap_my_pid = getpid(); + ap_child_slot = child_num_arg; ap_fatal_signal_child_setup(ap_server_conf); apr_pool_create(&pchild, pconf); @@ -3340,6 +3374,11 @@ static int event_pre_config(apr_pool_t * pconf, apr_pool_t * plog, had_healthy_child = 0; ap_extended_status = 0; + event_pollset = NULL; + worker_queue_info = NULL; + listener_os_thread = NULL; + listensocks_disabled = 0; + return OK; } @@ -3753,8 +3792,9 @@ static const char *set_worker_factor(cmd_parms * cmd, void *dummy, return "AsyncRequestWorkerFactor argument must be a positive number"; worker_factor = val * WORKER_FACTOR_SCALE; - if (worker_factor == 0) - worker_factor = 1; + if (worker_factor < WORKER_FACTOR_SCALE) { + worker_factor = WORKER_FACTOR_SCALE; + } return NULL; } diff --git a/server/mpm/event/fdqueue.c b/server/mpm/event/fdqueue.c index 64b318d0e0..175d86a720 100644 --- a/server/mpm/event/fdqueue.c +++ b/server/mpm/event/fdqueue.c @@ -27,18 +27,18 @@ struct recycled_pool struct fd_queue_info_t { - apr_uint32_t idlers; /** - * >= zero_pt: number of idle worker threads - * < zero_pt: number of threads blocked waiting - * for an idle worker - */ + apr_uint32_t volatile idlers; /** + * >= zero_pt: number of idle worker threads + * < zero_pt: number of threads blocked, + * waiting for an idle worker + */ apr_thread_mutex_t *idlers_mutex; apr_thread_cond_t *wait_for_idler; int terminated; int max_idlers; int max_recycled_pools; apr_uint32_t recycled_pools_count; - struct recycled_pool *recycled_pools; + struct recycled_pool *volatile recycled_pools; }; static apr_status_t queue_info_cleanup(void *data_) @@ -97,15 +97,11 @@ apr_status_t ap_queue_info_set_idle(fd_queue_info_t * queue_info, apr_pool_t * pool_to_recycle) { apr_status_t rv; - apr_int32_t prev_idlers; ap_push_pool(queue_info, pool_to_recycle); - /* Atomically increment the count of idle workers */ - prev_idlers = apr_atomic_inc32(&(queue_info->idlers)) - zero_pt; - /* If other threads are waiting on a worker, wake one up */ - if (prev_idlers < 0) { + if (apr_atomic_inc32(&queue_info->idlers) < zero_pt) { rv = apr_thread_mutex_lock(queue_info->idlers_mutex); if (rv != APR_SUCCESS) { AP_DEBUG_ASSERT(0); @@ -127,31 +123,32 @@ apr_status_t ap_queue_info_set_idle(fd_queue_info_t * queue_info, apr_status_t ap_queue_info_try_get_idler(fd_queue_info_t * queue_info) { - apr_int32_t new_idlers; - new_idlers = apr_atomic_add32(&(queue_info->idlers), -1) - zero_pt; - if (--new_idlers <= 0) { - apr_atomic_inc32(&(queue_info->idlers)); /* back out dec */ - return APR_EAGAIN; + /* Don't block if there isn't any idle worker. */ + for (;;) { + apr_uint32_t idlers = queue_info->idlers; + if (idlers <= zero_pt) { + return APR_EAGAIN; + } + if (apr_atomic_cas32(&queue_info->idlers, idlers - 1, + idlers) == idlers) { + return APR_SUCCESS; + } } - return APR_SUCCESS; } apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t * queue_info, int *had_to_block) { apr_status_t rv; - apr_int32_t prev_idlers; - - /* Atomically decrement the idle worker count, saving the old value */ - /* See TODO in ap_queue_info_set_idle() */ - prev_idlers = apr_atomic_add32(&(queue_info->idlers), -1) - zero_pt; - /* Block if there weren't any idle workers */ - if (prev_idlers <= 0) { + /* Block if there isn't any idle worker. + * apr_atomic_add32(x, -1) does the same as dec32(x), except + * that it returns the previous value (unlike dec32's bool). + */ + if (apr_atomic_add32(&queue_info->idlers, -1) <= zero_pt) { rv = apr_thread_mutex_lock(queue_info->idlers_mutex); if (rv != APR_SUCCESS) { AP_DEBUG_ASSERT(0); - /* See TODO in ap_queue_info_set_idle() */ apr_atomic_inc32(&(queue_info->idlers)); /* back out dec */ return rv; } @@ -205,11 +202,11 @@ apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t * queue_info, apr_uint32_t ap_queue_info_get_idlers(fd_queue_info_t * queue_info) { - apr_int32_t val; - val = (apr_int32_t)apr_atomic_read32(&queue_info->idlers) - zero_pt; - if (val < 0) + apr_uint32_t val; + val = apr_atomic_read32(&queue_info->idlers); + if (val <= zero_pt) return 0; - return val; + return val - zero_pt; } void ap_push_pool(fd_queue_info_t * queue_info, @@ -490,13 +487,20 @@ apr_status_t ap_queue_pop_something(fd_queue_t * queue, apr_socket_t ** sd, return rv; } -static apr_status_t queue_interrupt(fd_queue_t * queue, int all) +static apr_status_t queue_interrupt(fd_queue_t * queue, int all, int term) { apr_status_t rv; if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) { return rv; } + /* we must hold one_big_mutex when setting this... otherwise, + * we could end up setting it and waking everybody up just after a + * would-be popper checks it but right before they block + */ + if (term) { + queue->terminated = 1; + } if (all) apr_thread_cond_broadcast(queue->not_empty); else @@ -506,28 +510,15 @@ static apr_status_t queue_interrupt(fd_queue_t * queue, int all) apr_status_t ap_queue_interrupt_all(fd_queue_t * queue) { - return queue_interrupt(queue, 1); + return queue_interrupt(queue, 1, 0); } apr_status_t ap_queue_interrupt_one(fd_queue_t * queue) { - return queue_interrupt(queue, 0); + return queue_interrupt(queue, 0, 0); } apr_status_t ap_queue_term(fd_queue_t * queue) { - apr_status_t rv; - - if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) { - return rv; - } - /* we must hold one_big_mutex when setting this... otherwise, - * we could end up setting it and waking everybody up just after a - * would-be popper checks it but right before they block - */ - queue->terminated = 1; - if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) { - return rv; - } - return ap_queue_interrupt_all(queue); + return queue_interrupt(queue, 1, 1); } diff --git a/server/mpm/worker/fdqueue.c b/server/mpm/worker/fdqueue.c index fe5881b4ce..803267afbd 100644 --- a/server/mpm/worker/fdqueue.c +++ b/server/mpm/worker/fdqueue.c @@ -382,31 +382,30 @@ apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p) return rv; } -apr_status_t ap_queue_interrupt_all(fd_queue_t *queue) +static apr_status_t queue_interrupt_all(fd_queue_t *queue, int term) { apr_status_t rv; if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) { return rv; } + /* we must hold one_big_mutex when setting this... otherwise, + * we could end up setting it and waking everybody up just after a + * would-be popper checks it but right before they block + */ + if (term) { + queue->terminated = 1; + } apr_thread_cond_broadcast(queue->not_empty); return apr_thread_mutex_unlock(queue->one_big_mutex); } -apr_status_t ap_queue_term(fd_queue_t *queue) +apr_status_t ap_queue_interrupt_all(fd_queue_t *queue) { - apr_status_t rv; + return queue_interrupt_all(queue, 0); +} - if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) { - return rv; - } - /* we must hold one_big_mutex when setting this... otherwise, - * we could end up setting it and waking everybody up just after a - * would-be popper checks it but right before they block - */ - queue->terminated = 1; - if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) { - return rv; - } - return ap_queue_interrupt_all(queue); +apr_status_t ap_queue_term(fd_queue_t *queue) +{ + return queue_interrupt_all(queue, 1); }