From: Jim Jagielski Date: Mon, 17 Jul 2017 13:54:46 +0000 (+0000) Subject: Merge r1762580, r1762701, r1762702, r1762718, r1762723, r1762742, r1762743, r1774538... X-Git-Tag: 2.4.28~104 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=8929182de91779245bc367db5efff892bdeb3dfd;p=apache Merge r1762580, r1762701, r1762702, r1762718, r1762723, r1762742, r1762743, r1774538, r1779354 from trunk: event: use atomics for *timeout_queue->total since it's updated concurrently, and move TO_QUEUE_*() macros to functions. event: add/remove from/to the pollset outside of the critical sections. We don't need external locking since it's created with APR_POLLSET_THREADSAFE, hence reduce those sections to the lowest cycles possible. A spinlock may be interesting instead of the mutex now, we won't block and the TO_QUEUE_*() and process_timeout_queue() operations are fast... event: follow up to r1762701: update log tag. event: avoid unnecessary listener/polling wake ups (context switches) by using apr_pollset_wakeup(), when implemented, to signal the listener according to the next timers or timeout queues expiry (updated at insert and maintenance time). Follow up to r1762718: CHANGES entry. event: follow up to r1762718. We still need to kill kept-alive connections in normal/expiry processing if the workers are busy or dying. event: follow up to r1762718 and r1762742: put de condition where it belongs. event: follow up to r1762718. On graceful shutdown/restart, kill kept-alive connections before poll()ing again, avoiding to wait for their "normal" timers (before being woken up) when they remain the last handled connections. event: follow up to r1762701. Keep QUEUE_APPEND()+pollset_add() or QUEUE_REMOVE()+pollset_remove() atomic. Otherwise when a worker adds an entry in some queue (e.g. KA, lingering), it might race with the listener in the time between the mutex is released and the pollset is updated; meanwhile the listener might process the queue and find an entry no yet in its pollset. For the lingering queue, the entry could then have been used after its pool destroyed. Submitted by: ylavic Reviewed by: ylavic, icing, jim git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/branches/2.4.x@1802146 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/CHANGES b/CHANGES index f5709b520e..d80e762abe 100644 --- a/CHANGES +++ b/CHANGES @@ -2,6 +2,9 @@ Changes with Apache 2.4.28 + *) event: Avoid listener periodic wake ups by using the pollset wake-ability + when available. PR 57399. [Yann Ylavic, Luca Toscano] + *) mod_proxy_wstunnel: Fix detection of unresponded request which could have led to spurious HTTP 502 error messages sent on upgrade connections. PR 61283. [Yann Ylavic] diff --git a/STATUS b/STATUS index 9d25049de8..7316b41a5a 100644 --- a/STATUS +++ b/STATUS @@ -116,19 +116,6 @@ RELEASE SHOWSTOPPERS: PATCHES ACCEPTED TO BACKPORT FROM TRUNK: [ start all new proposals below, under PATCHES PROPOSED. ] - *) event: Avoid listener periodic wake ups by using the pollset wake-ability - when available. PR 57399. - trunk patch: http://svn.apache.org/r1762580 - http://svn.apache.org/r1762701 - http://svn.apache.org/r1762702 - http://svn.apache.org/r1762718 - http://svn.apache.org/r1762723 - http://svn.apache.org/r1762742 - http://svn.apache.org/r1762743 - http://svn.apache.org/r1774538 - http://svn.apache.org/r1779354 - 2.4.x patch: http://home.apache.org/~ylavic/patches/httpd-2.4.x-mpm_event-wakeup-v7.1.patch - +1: ylavic, icing, jim PATCHES PROPOSED TO BACKPORT FROM TRUNK: [ New proposals should be added at the end of the list ] diff --git a/server/mpm/event/event.c b/server/mpm/event/event.c index 6bbc8e5572..4742895334 100644 --- a/server/mpm/event/event.c +++ b/server/mpm/event/event.c @@ -177,6 +177,7 @@ static int dying = 0; static int workers_may_exit = 0; static int start_thread_may_exit = 0; static int listener_may_exit = 0; +static int listener_is_wakeable = 0; /* Pollset supports APR_POLLSET_WAKEABLE */ static int num_listensocks = 0; static apr_int32_t conns_this_child; /* MaxConnectionsPerChild, only access in listener thread */ @@ -198,6 +199,19 @@ module AP_MODULE_DECLARE_DATA mpm_event_module; struct event_srv_cfg_s; typedef struct event_srv_cfg_s event_srv_cfg; +static apr_pollfd_t *listener_pollfd; + +/* + * The pollset for sockets that are in any of the timeout queues. Currently + * we use the timeout_mutex to make sure that connections are added/removed + * atomically to/from both event_pollset and a timeout queue. Otherwise + * some confusion can happen under high load if timeout queues and pollset + * get out of sync. + * XXX: It should be possible to make the lock unnecessary in many or even all + * XXX: cases. + */ +static apr_pollset_t *event_pollset; + struct event_conn_state_t { /** APR_RING of expiration timeouts */ APR_RING_ENTRY(event_conn_state_t) timeout_list; @@ -227,9 +241,10 @@ APR_RING_HEAD(timeout_head_t, event_conn_state_t); struct timeout_queue { struct timeout_head_t head; - int count, *total; apr_interval_time_t timeout; - struct timeout_queue *next; + apr_uint32_t count; /* for this queue */ + apr_uint32_t *total; /* for all chained/related queues */ + struct timeout_queue *next; /* chaining */ }; /* * Several timeout queues that use different timeouts, so that we always can @@ -243,51 +258,64 @@ static struct timeout_queue *write_completion_q, *keepalive_q, *linger_q, *short_linger_q; +static volatile apr_time_t queues_next_expiry; -static apr_pollfd_t *listener_pollfd; +/* Prevent extra poll/wakeup calls for timeouts close in the future (queues + * have the granularity of a second anyway). + * XXX: Wouldn't 0.5s (instead of 0.1s) be "enough"? + */ +#define TIMEOUT_FUDGE_FACTOR apr_time_from_msec(100) /* * Macros for accessing struct timeout_queue. * For TO_QUEUE_APPEND and TO_QUEUE_REMOVE, timeout_mutex must be held. */ -#define TO_QUEUE_APPEND(q, el) \ - do { \ - APR_RING_INSERT_TAIL(&(q)->head, el, event_conn_state_t, \ - timeout_list); \ - ++*(q)->total; \ - ++(q)->count; \ - } while (0) - -#define TO_QUEUE_REMOVE(q, el) \ - do { \ - APR_RING_REMOVE(el, timeout_list); \ - --*(q)->total; \ - --(q)->count; \ - } while (0) - -#define TO_QUEUE_INIT(q, p, t, v) \ - do { \ - struct timeout_queue *b = (v); \ - (q) = apr_palloc((p), sizeof *(q)); \ - APR_RING_INIT(&(q)->head, event_conn_state_t, timeout_list); \ - (q)->total = (b) ? (b)->total : apr_pcalloc((p), sizeof *(q)->total); \ - (q)->count = 0; \ - (q)->timeout = (t); \ - (q)->next = NULL; \ - } while (0) - -#define TO_QUEUE_ELEM_INIT(el) APR_RING_ELEM_INIT(el, timeout_list) +static void TO_QUEUE_APPEND(struct timeout_queue *q, event_conn_state_t *el) +{ + apr_time_t q_expiry; + apr_time_t next_expiry; -/* - * The pollset for sockets that are in any of the timeout queues. Currently - * we use the timeout_mutex to make sure that connections are added/removed - * atomically to/from both event_pollset and a timeout queue. Otherwise - * some confusion can happen under high load if timeout queues and pollset - * get out of sync. - * XXX: It should be possible to make the lock unnecessary in many or even all - * XXX: cases. - */ -static apr_pollset_t *event_pollset; + APR_RING_INSERT_TAIL(&q->head, el, event_conn_state_t, timeout_list); + apr_atomic_inc32(q->total); + ++q->count; + + /* Cheaply update the overall queues' next expiry according to the + * first entry of this queue (oldest), if necessary. + */ + el = APR_RING_FIRST(&q->head); + q_expiry = el->queue_timestamp + q->timeout; + next_expiry = queues_next_expiry; + if (!next_expiry || next_expiry > q_expiry + TIMEOUT_FUDGE_FACTOR) { + queues_next_expiry = q_expiry; + /* Unblock the poll()ing listener for it to update its timeout. */ + if (listener_is_wakeable) { + apr_pollset_wakeup(event_pollset); + } + } +} + +static void TO_QUEUE_REMOVE(struct timeout_queue *q, event_conn_state_t *el) +{ + APR_RING_REMOVE(el, timeout_list); + apr_atomic_dec32(q->total); + --q->count; +} + +static struct timeout_queue *TO_QUEUE_MAKE(apr_pool_t *p, apr_time_t t, + struct timeout_queue *ref) +{ + struct timeout_queue *q; + + q = apr_pcalloc(p, sizeof *q); + APR_RING_INIT(&q->head, event_conn_state_t, timeout_list); + q->total = (ref) ? ref->total : apr_pcalloc(p, sizeof *q->total); + q->timeout = t; + + return q; +} + +#define TO_QUEUE_ELEM_INIT(el) \ + APR_RING_ELEM_INIT((el), timeout_list) /* The structure used to pass unique initialization info to each thread */ typedef struct @@ -469,6 +497,11 @@ static void wakeup_listener(void) return; } + /* Unblock the listener if it's poll()ing */ + if (listener_is_wakeable) { + apr_pollset_wakeup(event_pollset); + } + /* unblock the listener if it's waiting for a worker */ ap_queue_info_term(worker_queue_info); @@ -638,7 +671,11 @@ static apr_status_t decrement_connection_count(void *cs_) default: break; } - apr_atomic_dec32(&connection_count); + /* Unblock the listener if it's waiting for connection_count = 0 */ + if (!apr_atomic_dec32(&connection_count) + && listener_is_wakeable && listener_may_exit) { + apr_pollset_wakeup(event_pollset); + } return APR_SUCCESS; } @@ -690,12 +727,12 @@ static int start_lingering_close_common(event_conn_state_t *cs, int in_worker) else { cs->c->sbh = NULL; } - apr_thread_mutex_lock(timeout_mutex); - TO_QUEUE_APPEND(q, cs); cs->pfd.reqevents = ( cs->pub.sense == CONN_SENSE_WANT_WRITE ? APR_POLLOUT : APR_POLLIN) | APR_POLLHUP | APR_POLLERR; cs->pub.sense = CONN_SENSE_DEFAULT; + apr_thread_mutex_lock(timeout_mutex); + TO_QUEUE_APPEND(q, cs); rv = apr_pollset_add(event_pollset, &cs->pfd); apr_thread_mutex_unlock(timeout_mutex); if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) { @@ -974,14 +1011,24 @@ read_request: */ cs->queue_timestamp = apr_time_now(); notify_suspend(cs); - apr_thread_mutex_lock(timeout_mutex); - TO_QUEUE_APPEND(cs->sc->wc_q, cs); cs->pfd.reqevents = ( cs->pub.sense == CONN_SENSE_WANT_READ ? APR_POLLIN : APR_POLLOUT) | APR_POLLHUP | APR_POLLERR; cs->pub.sense = CONN_SENSE_DEFAULT; + apr_thread_mutex_lock(timeout_mutex); + TO_QUEUE_APPEND(cs->sc->wc_q, cs); rc = apr_pollset_add(event_pollset, &cs->pfd); apr_thread_mutex_unlock(timeout_mutex); + if (rc != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rc)) { + ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf, APLOGNO(03465) + "process_socket: apr_pollset_add failure for " + "write completion"); + apr_thread_mutex_lock(timeout_mutex); + TO_QUEUE_REMOVE(cs->sc->wc_q, cs); + apr_thread_mutex_unlock(timeout_mutex); + apr_socket_close(cs->pfd.desc.s); + ap_push_pool(worker_queue_info, cs->p); + } return; } else if (c->keepalive != AP_CONN_KEEPALIVE || c->aborted || @@ -1011,18 +1058,23 @@ read_request: */ cs->queue_timestamp = apr_time_now(); notify_suspend(cs); - apr_thread_mutex_lock(timeout_mutex); - TO_QUEUE_APPEND(cs->sc->ka_q, cs); /* Add work to pollset. */ cs->pfd.reqevents = APR_POLLIN; + apr_thread_mutex_lock(timeout_mutex); + TO_QUEUE_APPEND(cs->sc->ka_q, cs); rc = apr_pollset_add(event_pollset, &cs->pfd); apr_thread_mutex_unlock(timeout_mutex); - - if (rc != APR_SUCCESS) { + if (rc != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rc)) { ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf, APLOGNO(03093) - "process_socket: apr_pollset_add failure"); - AP_DEBUG_ASSERT(rc == APR_SUCCESS); + "process_socket: apr_pollset_add failure for " + "keep alive"); + apr_thread_mutex_lock(timeout_mutex); + TO_QUEUE_REMOVE(cs->sc->ka_q, cs); + apr_thread_mutex_unlock(timeout_mutex); + apr_socket_close(cs->pfd.desc.s); + ap_push_pool(worker_queue_info, cs->p); + return; } } else if (cs->pub.state == CONN_STATE_SUSPENDED) { @@ -1193,6 +1245,13 @@ static void get_worker(int *have_idle_worker_p, int blocking, int *all_busy) static APR_RING_HEAD(timer_free_ring_t, timer_event_t) timer_free_ring; static apr_skiplist *timer_skiplist; +static volatile apr_time_t timers_next_expiry; + +/* Same goal as for TIMEOUT_FUDGE_FACTOR (avoid extra poll calls), but applied + * to timers. Since their timeouts are custom (user defined), we can't be too + * approximative here (hence using 0.01s). + */ +#define EVENT_FUDGE_FACTOR apr_time_from_msec(10) /* The following compare function is used by apr_skiplist_insert() to keep the * elements (timers) sorted and provide O(log n) complexity (this is also true @@ -1239,8 +1298,24 @@ static apr_status_t event_register_timed_callback(apr_time_t t, /* XXXXX: optimize */ te->when = t + apr_time_now(); - /* Okay, add sorted by when.. */ - apr_skiplist_insert(timer_skiplist, te); + { + apr_time_t next_expiry; + + /* Okay, add sorted by when.. */ + apr_skiplist_insert(timer_skiplist, te); + + /* Cheaply update the overall timers' next expiry according to + * this event, if necessary. + */ + next_expiry = timers_next_expiry; + if (!next_expiry || next_expiry > te->when + EVENT_FUDGE_FACTOR) { + timers_next_expiry = te->when; + /* Unblock the poll()ing listener for it to update its timeout. */ + if (listener_is_wakeable) { + apr_pollset_wakeup(event_pollset); + } + } + } apr_thread_mutex_unlock(g_timer_skiplist_mtx); @@ -1274,16 +1349,15 @@ static void process_lingering_close(event_conn_state_t *cs, const apr_pollfd_t * } apr_thread_mutex_lock(timeout_mutex); + TO_QUEUE_REMOVE(q, cs); rv = apr_pollset_remove(event_pollset, pfd); - AP_DEBUG_ASSERT(rv == APR_SUCCESS); + apr_thread_mutex_unlock(timeout_mutex); + AP_DEBUG_ASSERT(rv == APR_SUCCESS || APR_STATUS_IS_NOTFOUND(rv)); + TO_QUEUE_ELEM_INIT(cs); rv = apr_socket_close(csd); AP_DEBUG_ASSERT(rv == APR_SUCCESS); - TO_QUEUE_REMOVE(q, cs); - apr_thread_mutex_unlock(timeout_mutex); - TO_QUEUE_ELEM_INIT(cs); - ap_push_pool(worker_queue_info, cs->p); if (dying) ap_queue_interrupt_one(worker_queue); @@ -1297,13 +1371,13 @@ static void process_timeout_queue(struct timeout_queue *q, apr_time_t timeout_time, int (*func)(event_conn_state_t *)) { - int total = 0, count; + apr_uint32_t total = 0, count; event_conn_state_t *first, *cs, *last; struct timeout_head_t trash; struct timeout_queue *qp; apr_status_t rv; - if (!*q->total) { + if (!apr_atomic_read32(q->total)) { return; } @@ -1312,20 +1386,32 @@ static void process_timeout_queue(struct timeout_queue *q, count = 0; cs = first = last = APR_RING_FIRST(&qp->head); while (cs != APR_RING_SENTINEL(&qp->head, event_conn_state_t, - timeout_list) - /* Trash the entry if: - * - no timeout_time was given (asked for all), or - * - it expired (according to the queue timeout), or - * - the system clock skewed in the past: no entry should be - * registered above the given timeout_time (~now) + the queue - * timeout, we won't keep any here (eg. for centuries). - * Stop otherwise, no following entry will match thanks to the - * single timeout per queue (entries are added to the end!). - * This allows maintenance in O(1). - */ - && (!timeout_time - || cs->queue_timestamp + qp->timeout < timeout_time - || cs->queue_timestamp > timeout_time + qp->timeout)) { + timeout_list)) { + /* Trash the entry if: + * - no timeout_time was given (asked for all), or + * - it expired (according to the queue timeout), or + * - the system clock skewed in the past: no entry should be + * registered above the given timeout_time (~now) + the queue + * timeout, we won't keep any here (eg. for centuries). + * + * Otherwise stop, no following entry will match thanks to the + * single timeout per queue (entries are added to the end!). + * This allows maintenance in O(1). + */ + if (timeout_time + && cs->queue_timestamp + qp->timeout > timeout_time + && cs->queue_timestamp < timeout_time + qp->timeout) { + /* Since this is the next expiring of this queue, update the + * overall queues' next expiry if it's later than this one. + */ + apr_time_t q_expiry = cs->queue_timestamp + qp->timeout; + apr_time_t next_expiry = queues_next_expiry; + if (!next_expiry || next_expiry > q_expiry) { + queues_next_expiry = q_expiry; + } + break; + } + last = cs; rv = apr_pollset_remove(event_pollset, &cs->pfd); if (rv != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rv)) { @@ -1341,14 +1427,14 @@ static void process_timeout_queue(struct timeout_queue *q, APR_RING_UNSPLICE(first, last, timeout_list); APR_RING_SPLICE_TAIL(&trash, first, last, event_conn_state_t, timeout_list); + AP_DEBUG_ASSERT(apr_atomic_read32(q->total) >= count); + apr_atomic_sub32(q->total, count); qp->count -= count; total += count; } if (!total) return; - AP_DEBUG_ASSERT(*q->total >= total); - *q->total -= total; apr_thread_mutex_unlock(timeout_mutex); first = APR_RING_FIRST(&trash); do { @@ -1360,13 +1446,28 @@ static void process_timeout_queue(struct timeout_queue *q, apr_thread_mutex_lock(timeout_mutex); } +static void process_keepalive_queue(apr_time_t timeout_time) +{ + /* If all workers are busy, we kill older keep-alive connections so + * that they may connect to another process. + */ + if (!timeout_time) { + ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf, + "All workers are busy or dying, will close %u " + "keep-alive connections", + apr_atomic_read32(keepalive_q->total)); + } + process_timeout_queue(keepalive_q, timeout_time, + start_lingering_close_nonblocking); +} + static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) { - timer_event_t *ep; timer_event_t *te; apr_status_t rc; proc_info *ti = dummy; int process_slot = ti->pslot; + struct process_score *ps = ap_get_scoreboard_process(process_slot); apr_pool_t *tpool = apr_thread_pool_get(thd); void *csd = NULL; apr_pool_t *ptrans; /* Pool for per-transaction stuff */ @@ -1382,14 +1483,6 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) last_log = apr_time_now(); free(ti); - /* the following times out events that are really close in the future - * to prevent extra poll calls - * - * current value is .1 second - */ -#define TIMEOUT_FUDGE_FACTOR 100000 -#define EVENT_FUDGE_FACTOR 10000 - rc = init_pollset(tpool); if (rc != APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf, @@ -1407,6 +1500,7 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) for (;;) { int workers_were_busy = 0; + if (listener_may_exit) { close_listeners(process_slot, &closed); if (terminate_mode == ST_UNGRACEFUL @@ -1420,7 +1514,7 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) now = apr_time_now(); if (APLOGtrace6(ap_server_conf)) { /* trace log status every second */ - if (now - last_log > apr_time_from_msec(1000)) { + if (now - last_log > apr_time_from_sec(1)) { last_log = now; apr_thread_mutex_lock(timeout_mutex); ap_log_error(APLOG_MARK, APLOG_TRACE6, 0, ap_server_conf, @@ -1428,8 +1522,8 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) "keep-alive: %d lingering: %d suspended: %u)", apr_atomic_read32(&connection_count), apr_atomic_read32(&clogged_count), - *write_completion_q->total, - *keepalive_q->total, + apr_atomic_read32(write_completion_q->total), + apr_atomic_read32(keepalive_q->total), apr_atomic_read32(&lingering_count), apr_atomic_read32(&suspended_count)); if (dying) { @@ -1442,32 +1536,71 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) } } - apr_thread_mutex_lock(g_timer_skiplist_mtx); - te = apr_skiplist_peek(timer_skiplist); - if (te) { - if (te->when > now) { - timeout_interval = te->when - now; + /* Start with an infinite poll() timeout and update it according to + * the next expiring timer or queue entry. If there are none, either + * the listener is wakeable and it can poll() indefinitely until a wake + * up occurs, otherwise periodic checks (maintenance, shutdown, ...) + * must be performed. + */ + timeout_interval = -1; + + /* Push expired timers to a worker, the first remaining one determines + * the maximum time to poll() below, if any. + */ + timeout_time = timers_next_expiry; + if (timeout_time && timeout_time < now + EVENT_FUDGE_FACTOR) { + apr_thread_mutex_lock(g_timer_skiplist_mtx); + while ((te = apr_skiplist_peek(timer_skiplist))) { + if (te->when > now + EVENT_FUDGE_FACTOR) { + timers_next_expiry = te->when; + timeout_interval = te->when - now; + break; + } + apr_skiplist_pop(timer_skiplist, NULL); + push_timer2worker(te); } - else { - timeout_interval = 1; + if (!te) { + timers_next_expiry = 0; } + apr_thread_mutex_unlock(g_timer_skiplist_mtx); } - else { - timeout_interval = apr_time_from_msec(100); + + /* Same for queues, use their next expiry, if any. */ + timeout_time = queues_next_expiry; + if (timeout_time + && (timeout_interval < 0 + || timeout_time <= now + || timeout_interval > timeout_time - now)) { + timeout_interval = timeout_time > now ? timeout_time - now : 1; + } + + /* When non-wakeable, don't wait more than 100 ms, in any case. */ +#define NON_WAKEABLE_POLL_TIMEOUT apr_time_from_msec(100) + if (!listener_is_wakeable + && (timeout_interval < 0 + || timeout_interval > NON_WAKEABLE_POLL_TIMEOUT)) { + timeout_interval = NON_WAKEABLE_POLL_TIMEOUT; } - apr_thread_mutex_unlock(g_timer_skiplist_mtx); rc = apr_pollset_poll(event_pollset, timeout_interval, &num, &out_pfd); if (rc != APR_SUCCESS) { if (APR_STATUS_IS_EINTR(rc)) { - continue; + /* Woken up, if we are exiting we must fall through to kill + * kept-alive connections, otherwise we only need to update + * timeouts (logic is above, so restart the loop). + */ + if (!listener_may_exit) { + continue; + } + timeout_time = 0; } - if (!APR_STATUS_IS_TIMEUP(rc)) { + else if (!APR_STATUS_IS_TIMEUP(rc)) { ap_log_error(APLOG_MARK, APLOG_CRIT, rc, ap_server_conf, "apr_pollset_poll failed. Attempting to " "shutdown process gracefully"); signal_threads(ST_GRACEFUL); } + num = 0; } if (listener_may_exit) { @@ -1477,21 +1610,6 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) break; } - now = apr_time_now(); - apr_thread_mutex_lock(g_timer_skiplist_mtx); - ep = apr_skiplist_peek(timer_skiplist); - while (ep) { - if (ep->when < now + EVENT_FUDGE_FACTOR) { - apr_skiplist_pop(timer_skiplist, NULL); - push_timer2worker(ep); - } - else { - break; - } - ep = apr_skiplist_peek(timer_skiplist); - } - apr_thread_mutex_unlock(g_timer_skiplist_mtx); - while (num) { pt = (listener_poll_type *) out_pfd->client_data; if (pt->type == PT_CSD) { @@ -1514,7 +1632,6 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) TO_QUEUE_REMOVE(remove_from_q, cs); rc = apr_pollset_remove(event_pollset, &cs->pfd); apr_thread_mutex_unlock(timeout_mutex); - /* * Some of the pollset backends, like KQueue or Epoll * automagically remove the FD if the socket is closed, @@ -1656,52 +1773,55 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) /* XXX possible optimization: stash the current time for use as * r->request_time for new requests */ - now = apr_time_now(); - /* We only do this once per 0.1s (TIMEOUT_FUDGE_FACTOR), or on a clock - * skew (if the system time is set back in the meantime, timeout_time - * will exceed now + TIMEOUT_FUDGE_FACTOR, can't happen otherwise). + /* We process the timeout queues here only when their overall next + * expiry (read once above) is over. This happens accurately since + * adding to the queues (in workers) can only decrease this expiry, + * while latest ones are only taken into account here (in listener) + * during queues' processing, with the lock held. This works both + * with and without wake-ability. */ - if (now > timeout_time || now + TIMEOUT_FUDGE_FACTOR < timeout_time ) { - struct process_score *ps; + if (timeout_time && timeout_time < (now = apr_time_now())) { timeout_time = now + TIMEOUT_FUDGE_FACTOR; /* handle timed out sockets */ apr_thread_mutex_lock(timeout_mutex); + /* Processing all the queues below will recompute this. */ + queues_next_expiry = 0; + /* Step 1: keepalive timeouts */ - /* If all workers are busy, we kill older keep-alive connections so that they - * may connect to another process. - */ - if ((workers_were_busy || dying) && *keepalive_q->total) { - if (!dying) - ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf, - "All workers are busy, will close %d keep-alive " - "connections", - *keepalive_q->total); - process_timeout_queue(keepalive_q, 0, - start_lingering_close_nonblocking); + if (workers_were_busy || dying) { + process_keepalive_queue(0); /* kill'em all \m/ */ } else { - process_timeout_queue(keepalive_q, timeout_time, - start_lingering_close_nonblocking); + process_keepalive_queue(timeout_time); } /* Step 2: write completion timeouts */ process_timeout_queue(write_completion_q, timeout_time, start_lingering_close_nonblocking); /* Step 3: (normal) lingering close completion timeouts */ - process_timeout_queue(linger_q, timeout_time, stop_lingering_close); + process_timeout_queue(linger_q, timeout_time, + stop_lingering_close); /* Step 4: (short) lingering close completion timeouts */ - process_timeout_queue(short_linger_q, timeout_time, stop_lingering_close); + process_timeout_queue(short_linger_q, timeout_time, + stop_lingering_close); - ps = ap_get_scoreboard_process(process_slot); - ps->write_completion = *write_completion_q->total; - ps->keep_alive = *keepalive_q->total; apr_thread_mutex_unlock(timeout_mutex); + ps->keep_alive = apr_atomic_read32(keepalive_q->total); + ps->write_completion = apr_atomic_read32(write_completion_q->total); ps->connections = apr_atomic_read32(&connection_count); ps->suspended = apr_atomic_read32(&suspended_count); ps->lingering_close = apr_atomic_read32(&lingering_count); } + else if ((workers_were_busy || dying) + && apr_atomic_read32(keepalive_q->total)) { + apr_thread_mutex_lock(timeout_mutex); + process_keepalive_queue(0); /* kill'em all \m/ */ + apr_thread_mutex_unlock(timeout_mutex); + ps->keep_alive = 0; + } + if (listeners_disabled && !workers_were_busy && (int)apr_atomic_read32(&connection_count) - (int)apr_atomic_read32(&lingering_count) @@ -1912,6 +2032,8 @@ static void *APR_THREAD_FUNC start_threads(apr_thread_t * thd, void *dummy) int prev_threads_created; int max_recycled_pools = -1; int good_methods[] = {APR_POLLSET_KQUEUE, APR_POLLSET_PORT, APR_POLLSET_EPOLL}; + /* XXX don't we need more to handle K-A or lingering close? */ + const apr_uint32_t pollset_size = threads_per_child * 2; /* We must create the fd queues before we start up the listener * and worker threads. */ @@ -1951,24 +2073,24 @@ static void *APR_THREAD_FUNC start_threads(apr_thread_t * thd, void *dummy) /* Create the main pollset */ for (i = 0; i < sizeof(good_methods) / sizeof(good_methods[0]); i++) { - rv = apr_pollset_create_ex(&event_pollset, - threads_per_child*2, /* XXX don't we need more, to handle - * connections in K-A or lingering - * close? - */ - pchild, APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY | APR_POLLSET_NODEFAULT, - good_methods[i]); + apr_uint32_t flags = APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY | + APR_POLLSET_NODEFAULT | APR_POLLSET_WAKEABLE; + rv = apr_pollset_create_ex(&event_pollset, pollset_size, pchild, flags, + good_methods[i]); + if (rv == APR_SUCCESS) { + listener_is_wakeable = 1; + break; + } + flags &= ~APR_POLLSET_WAKEABLE; + rv = apr_pollset_create_ex(&event_pollset, pollset_size, pchild, flags, + good_methods[i]); if (rv == APR_SUCCESS) { break; } } if (rv != APR_SUCCESS) { - rv = apr_pollset_create(&event_pollset, - threads_per_child*2, /* XXX don't we need more, to handle - * connections in K-A or lingering - * close? - */ - pchild, APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY); + rv = apr_pollset_create(&event_pollset, pollset_size, pchild, + APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY); } if (rv != APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03103) @@ -1977,7 +2099,9 @@ static void *APR_THREAD_FUNC start_threads(apr_thread_t * thd, void *dummy) } ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(02471) - "start_threads: Using %s", apr_pollset_method_name(event_pollset)); + "start_threads: Using %s (%swakeable)", + apr_pollset_method_name(event_pollset), + listener_is_wakeable ? "" : "not "); worker_sockets = apr_pcalloc(pchild, threads_per_child * sizeof(apr_socket_t *)); @@ -3123,10 +3247,10 @@ static int event_post_config(apr_pool_t *pconf, apr_pool_t *plog, wc.hash = apr_hash_make(ptemp); ka.hash = apr_hash_make(ptemp); - TO_QUEUE_INIT(linger_q, pconf, - apr_time_from_sec(MAX_SECS_TO_LINGER), NULL); - TO_QUEUE_INIT(short_linger_q, pconf, - apr_time_from_sec(SECONDS_TO_LINGER), NULL); + linger_q = TO_QUEUE_MAKE(pconf, apr_time_from_sec(MAX_SECS_TO_LINGER), + NULL); + short_linger_q = TO_QUEUE_MAKE(pconf, apr_time_from_sec(SECONDS_TO_LINGER), + NULL); for (; s; s = s->next) { event_srv_cfg *sc = apr_pcalloc(pconf, sizeof *sc); @@ -3134,11 +3258,11 @@ static int event_post_config(apr_pool_t *pconf, apr_pool_t *plog, ap_set_module_config(s->module_config, &mpm_event_module, sc); if (!wc.tail) { /* The main server uses the global queues */ - TO_QUEUE_INIT(wc.q, pconf, s->timeout, NULL); + wc.q = TO_QUEUE_MAKE(pconf, s->timeout, NULL); apr_hash_set(wc.hash, &s->timeout, sizeof s->timeout, wc.q); wc.tail = write_completion_q = wc.q; - TO_QUEUE_INIT(ka.q, pconf, s->keep_alive_timeout, NULL); + ka.q = TO_QUEUE_MAKE(pconf, s->keep_alive_timeout, NULL); apr_hash_set(ka.hash, &s->keep_alive_timeout, sizeof s->keep_alive_timeout, ka.q); ka.tail = keepalive_q = ka.q; @@ -3148,7 +3272,7 @@ static int event_post_config(apr_pool_t *pconf, apr_pool_t *plog, * or their own queue(s) if there isn't */ wc.q = apr_hash_get(wc.hash, &s->timeout, sizeof s->timeout); if (!wc.q) { - TO_QUEUE_INIT(wc.q, pconf, s->timeout, wc.tail); + wc.q = TO_QUEUE_MAKE(pconf, s->timeout, wc.tail); apr_hash_set(wc.hash, &s->timeout, sizeof s->timeout, wc.q); wc.tail = wc.tail->next = wc.q; } @@ -3156,7 +3280,7 @@ static int event_post_config(apr_pool_t *pconf, apr_pool_t *plog, ka.q = apr_hash_get(ka.hash, &s->keep_alive_timeout, sizeof s->keep_alive_timeout); if (!ka.q) { - TO_QUEUE_INIT(ka.q, pconf, s->keep_alive_timeout, ka.tail); + ka.q = TO_QUEUE_MAKE(pconf, s->keep_alive_timeout, ka.tail); apr_hash_set(ka.hash, &s->keep_alive_timeout, sizeof s->keep_alive_timeout, ka.q); ka.tail = ka.tail->next = ka.q;