From 18fdb9d7119da5e2d521eb8307b73e1f482b15c6 Mon Sep 17 00:00:00 2001 From: Stefan Fritsch Date: Sun, 19 Jun 2011 12:23:42 +0000 Subject: [PATCH] Some improvements for handling of many connections for MPM event: - Process lingering close asynchronously instead of tying up worker threads (based on patch by Jeff Trawick). - If the number of connections of a process is above threads_per_child + WORKER_OVERCOMMIT * (idle_workers - 1) (WORKER_OVERCOMMIT is fixed at 2, at the moment), or if all workers are busy, don't accept new connections in that process. Such a dynamic connection limit is necessary because we may have both async and non-async (ssl) connections. WORKER_OVERCOMMIT should be a config option. - Don't count idle workers of not-accepting processes against MinSpareThreads, so that the parent will spawn new processes when necessary. - If we receive a keep-alive request while all workers are busy, don't block but close the connection immediately so that the client will re-connect to a different process. Related changes: - Log what is going on at trace loglevels. - Remove the bypass_push poll type flag, this code cannot be hit anymore (if it ever could?). - Add some macro helpers for dealing with timeout queues. git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1137358 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES | 7 + include/ap_mmn.h | 7 +- include/http_connection.h | 4 +- include/httpd.h | 4 +- include/scoreboard.h | 10 +- server/connection.c | 26 +- server/mpm/event/event.c | 620 ++++++++++++++++++++++++++----------- server/mpm/event/fdqueue.c | 23 +- server/mpm/event/fdqueue.h | 5 +- 9 files changed, 506 insertions(+), 200 deletions(-) diff --git a/CHANGES b/CHANGES index 3465138844..03e72a387b 100644 --- a/CHANGES +++ b/CHANGES @@ -2,6 +2,13 @@ Changes with Apache 2.3.13 + *) mpm_event: If the number of connections of a process is very high, or if + all workers are busy, don't accept new connections in that process. + [Stefan Fritsch] + + *) mpm_event: Process lingering close asynchronously instead of tying up + worker threads. [Jeff Trawick, Stefan Fritsch] + *) mpm_event: If MaxMemFree is set, limit the number of pools that is kept around. [Stefan Fritsch] diff --git a/include/ap_mmn.h b/include/ap_mmn.h index b7f927ca26..f5c9346459 100644 --- a/include/ap_mmn.h +++ b/include/ap_mmn.h @@ -333,14 +333,17 @@ * Add ap_context_*(), ap_set_context_info(), ap_set_document_root() * 20110605.1 (2.3.13-dev) add ap_(get|set)_core_module_config() * 20110605.2 (2.3.13-dev) add ap_get_conn_socket() + * 20110619.0 (2.3.13-dev) add async connection infos to process_score in scoreboard, + * add ap_start_lingering_close(), + * add conn_state_e:CONN_STATE_LINGER_NORMAL and CONN_STATE_LINGER_SHORT */ #define MODULE_MAGIC_COOKIE 0x41503234UL /* "AP24" */ #ifndef MODULE_MAGIC_NUMBER_MAJOR -#define MODULE_MAGIC_NUMBER_MAJOR 20110605 +#define MODULE_MAGIC_NUMBER_MAJOR 20110619 #endif -#define MODULE_MAGIC_NUMBER_MINOR 2 /* 0...n */ +#define MODULE_MAGIC_NUMBER_MINOR 0 /* 0...n */ /** * Determine if the server's current MODULE_MAGIC_NUMBER is at least a diff --git a/include/http_connection.h b/include/http_connection.h index 37437cfd5a..d5bca043bb 100644 --- a/include/http_connection.h +++ b/include/http_connection.h @@ -70,7 +70,9 @@ AP_CORE_DECLARE(void) ap_flush_conn(conn_rec *c); */ AP_DECLARE(void) ap_lingering_close(conn_rec *c); - /* Hooks */ +AP_DECLARE(int) ap_start_lingering_close(conn_rec *c); + +/* Hooks */ /** * create_connection is a RUN_FIRST hook which allows modules to create * connections. In general, you should not install filters with the diff --git a/include/httpd.h b/include/httpd.h index 89d421a5f9..b09a9d040f 100644 --- a/include/httpd.h +++ b/include/httpd.h @@ -1133,7 +1133,9 @@ typedef enum { CONN_STATE_HANDLER, CONN_STATE_WRITE_COMPLETION, CONN_STATE_SUSPENDED, - CONN_STATE_LINGER + CONN_STATE_LINGER, + CONN_STATE_LINGER_NORMAL, + CONN_STATE_LINGER_SHORT } conn_state_e; /** diff --git a/include/scoreboard.h b/include/scoreboard.h index 1bf0ad2de4..98fd7cf748 100644 --- a/include/scoreboard.h +++ b/include/scoreboard.h @@ -132,9 +132,17 @@ typedef struct process_score process_score; struct process_score { pid_t pid; ap_generation_t generation; /* generation of this child */ - int quiescing; /* the process whose pid is stored above is + char quiescing; /* the process whose pid is stored above is * going down gracefully */ + char not_accepting; /* the process is busy and is not accepting more + * connections (for async MPMs) + */ + apr_uint32_t connections; /* total connections (for async MPMs) */ + apr_uint32_t write_completion; /* async connections doing write completion */ + apr_uint32_t lingering_close; /* async connections in lingering close */ + apr_uint32_t keep_alive; /* async connections in keep alive */ + apr_uint32_t suspended; /* connections suspended by some module */ }; /* Scoreboard is now in 'local' memory, since it isn't updated once created, diff --git a/server/connection.c b/server/connection.c index 31279b3338..13ba951b0c 100644 --- a/server/connection.c +++ b/server/connection.c @@ -93,15 +93,13 @@ AP_CORE_DECLARE(void) ap_flush_conn(conn_rec *c) * all the response data has been sent to the client. */ #define SECONDS_TO_LINGER 2 -AP_DECLARE(void) ap_lingering_close(conn_rec *c) + +AP_DECLARE(int) ap_start_lingering_close(conn_rec *c) { - char dummybuf[512]; - apr_size_t nbytes; - apr_time_t timeup = 0; apr_socket_t *csd = ap_get_conn_socket(c); if (!csd) { - return; + return 1; } ap_update_child_status(c->sbh, SERVER_CLOSING, NULL); @@ -109,7 +107,7 @@ AP_DECLARE(void) ap_lingering_close(conn_rec *c) #ifdef NO_LINGCLOSE ap_flush_conn(c); /* just close it */ apr_socket_close(csd); - return; + return 1; #endif /* Close the connection, being careful to send out whatever is still @@ -122,7 +120,7 @@ AP_DECLARE(void) ap_lingering_close(conn_rec *c) if (c->aborted) { apr_socket_close(csd); - return; + return 1; } /* Shut down the socket for write, which will send a FIN @@ -131,6 +129,20 @@ AP_DECLARE(void) ap_lingering_close(conn_rec *c) if (apr_socket_shutdown(csd, APR_SHUTDOWN_WRITE) != APR_SUCCESS || c->aborted) { apr_socket_close(csd); + return 1; + } + + return 0; +} + +AP_DECLARE(void) ap_lingering_close(conn_rec *c) +{ + char dummybuf[512]; + apr_size_t nbytes; + apr_time_t timeup = 0; + apr_socket_t *csd = ap_get_conn_socket(c); + + if (ap_start_lingering_close(c)) { return; } diff --git a/server/mpm/event/event.c b/server/mpm/event/event.c index b002f56910..432ba81fbc 100644 --- a/server/mpm/event/event.c +++ b/server/mpm/event/event.c @@ -148,6 +148,11 @@ #define apr_time_from_msec(x) (x * 1000) #endif +#ifndef MAX_SECS_TO_LINGER +#define MAX_SECS_TO_LINGER 30 +#endif +#define SECONDS_TO_LINGER 2 + /* * Actual definitions of config globals */ @@ -174,7 +179,38 @@ static int mpm_state = AP_MPMQ_STARTING; static apr_thread_mutex_t *timeout_mutex; APR_RING_HEAD(timeout_head_t, conn_state_t); -static struct timeout_head_t timeout_head, keepalive_timeout_head; +struct timeout_queue { + struct timeout_head_t head; + int count; + const char *tag; +}; +static struct timeout_queue write_completion_q, keepalive_q, linger_q, + short_linger_q; +static apr_pollfd_t *listener_pollfd; + +/* + * Macros for accessing struct timeout_queue. + * For TO_QUEUE_APPEND and TO_QUEUE_REMOVE, timeout_mutex must be held. + */ +#define TO_QUEUE_APPEND(q, el) \ + do { \ + APR_RING_INSERT_TAIL(&(q).head, el, conn_state_t, timeout_list); \ + (q).count++; \ + } while (0) + +#define TO_QUEUE_REMOVE(q, el) \ + do { \ + APR_RING_REMOVE(el, timeout_list); \ + (q).count--; \ + } while (0) + +#define TO_QUEUE_INIT(q) \ + do { \ + APR_RING_INIT(&(q).head, conn_state_t, timeout_list); \ + (q).tag = #q; \ + } while (0) + +#define TO_QUEUE_ELEM_INIT(el) APR_RING_ELEM_INIT(el, timeout_list) static apr_pollset_t *event_pollset; @@ -218,7 +254,6 @@ typedef enum typedef struct { poll_type_e type; - int bypass_push; void *baton; } listener_poll_type; @@ -299,6 +334,32 @@ static apr_os_thread_t *listener_os_thread; */ static apr_socket_t **worker_sockets; +static void disable_listensocks(int process_slot) +{ + int i; + for (i = 0; i < num_listensocks; i++) { + apr_pollset_remove(event_pollset, &listener_pollfd[i]); + } + ap_scoreboard_image->parent[process_slot].not_accepting = 1; +} + +static void enable_listensocks(int process_slot) +{ + int i; + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, + "Accepting new connections again: " + "%u active conns, %u idle workers", + apr_atomic_read32(&connection_count), + ap_queue_info_get_idlers(worker_queue_info)); + for (i = 0; i < num_listensocks; i++) + apr_pollset_add(event_pollset, &listener_pollfd[i]); + /* + * XXX: This is not yet optimal. If many workers suddenly become available, + * XXX: the parent may kill some processes off too soon. + */ + ap_scoreboard_image->parent[process_slot].not_accepting = 0; +} + static void close_worker_sockets(void) { int i; @@ -654,6 +715,69 @@ static void set_signals(void) #endif } +static int start_lingering_close(conn_state_t *cs) +{ + apr_status_t rv; + if (ap_start_lingering_close(cs->c)) { + apr_pool_clear(cs->p); + ap_push_pool(worker_queue_info, cs->p); + return 0; + } + else { + apr_socket_t *csd = ap_get_conn_socket(cs->c); + struct timeout_queue *q; + + rv = apr_socket_timeout_set(csd, 0); + AP_DEBUG_ASSERT(rv == APR_SUCCESS); + /* + * If some module requested a shortened waiting period, only wait for + * 2s (SECONDS_TO_LINGER). This is useful for mitigating certain + * DoS attacks. + */ + if (apr_table_get(cs->c->notes, "short-lingering-close")) { + cs->expiration_time = + apr_time_now() + apr_time_from_sec(SECONDS_TO_LINGER); + q = &short_linger_q; + cs->state = CONN_STATE_LINGER_SHORT; + } + else { + cs->expiration_time = + apr_time_now() + apr_time_from_sec(MAX_SECS_TO_LINGER); + q = &linger_q; + cs->state = CONN_STATE_LINGER_NORMAL; + } + apr_thread_mutex_lock(timeout_mutex); + TO_QUEUE_APPEND(*q, cs); + apr_thread_mutex_unlock(timeout_mutex); + cs->pfd.reqevents = APR_POLLIN | APR_POLLHUP | APR_POLLERR; + rv = apr_pollset_add(event_pollset, &cs->pfd); + if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) { + ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, + "start_lingering_close: apr_pollset_add failure"); + AP_DEBUG_ASSERT(0); + } + } + return 1; +} + +static int stop_lingering_close(conn_state_t *cs) +{ + apr_status_t rv; + apr_socket_t *csd = ap_get_conn_socket(cs->c); + ap_log_error(APLOG_MARK, APLOG_TRACE4, 0, ap_server_conf, + "socket reached timeout in lingering-close state"); + rv = apr_socket_close(csd); + if (rv != APR_SUCCESS) { + ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, "error closing socket"); + AP_DEBUG_ASSERT(0); + } + apr_pool_clear(cs->p); + ap_push_pool(worker_queue_info, cs->p); + return 0; +} + + + /***************************************************************** * Child process main loop. */ @@ -689,10 +813,9 @@ static int process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * sock cs->pfd.reqevents = APR_POLLIN; cs->pfd.desc.s = sock; pt->type = PT_CSD; - pt->bypass_push = 1; pt->baton = cs; cs->pfd.client_data = pt; - APR_RING_ELEM_INIT(cs, timeout_list); + TO_QUEUE_ELEM_INIT(cs); ap_update_vhost_given_ip(c); @@ -755,6 +878,7 @@ read_request: if (cs->state == CONN_STATE_WRITE_COMPLETION) { ap_filter_t *output_filter = c->output_filters; apr_status_t rv; + ap_update_child_status_from_conn(sbh, SERVER_BUSY_WRITE, c); while (output_filter->next != NULL) { output_filter = output_filter->next; } @@ -771,9 +895,8 @@ read_request: */ cs->expiration_time = ap_server_conf->timeout + apr_time_now(); apr_thread_mutex_lock(timeout_mutex); - APR_RING_INSERT_TAIL(&timeout_head, cs, conn_state_t, timeout_list); + TO_QUEUE_APPEND(write_completion_q, cs); apr_thread_mutex_unlock(timeout_mutex); - pt->bypass_push = 0; cs->pfd.reqevents = APR_POLLOUT | APR_POLLHUP | APR_POLLERR; rc = apr_pollset_add(event_pollset, &cs->pfd); return 1; @@ -792,14 +915,11 @@ read_request: } if (cs->state == CONN_STATE_LINGER) { - ap_lingering_close(c); - apr_pool_clear(p); - ap_push_pool(worker_queue_info, p); - return 0; + if (!start_lingering_close(cs)) + return 0; } else if (cs->state == CONN_STATE_CHECK_REQUEST_LINE_READABLE) { apr_status_t rc; - listener_poll_type *pt = (listener_poll_type *) cs->pfd.client_data; /* It greatly simplifies the logic to use a single timeout value here * because the new element can just be added to the end of the list and @@ -812,10 +932,9 @@ read_request: cs->expiration_time = ap_server_conf->keep_alive_timeout + apr_time_now(); apr_thread_mutex_lock(timeout_mutex); - APR_RING_INSERT_TAIL(&keepalive_timeout_head, cs, conn_state_t, timeout_list); + TO_QUEUE_APPEND(keepalive_q, cs); apr_thread_mutex_unlock(timeout_mutex); - pt->bypass_push = 0; /* Add work to pollset. */ cs->pfd.reqevents = APR_POLLIN; rc = apr_pollset_add(event_pollset, &cs->pfd); @@ -844,21 +963,15 @@ static void check_infinite_requests(void) static void close_listeners(int process_slot, int *closed) { if (!*closed) { - ap_listen_rec *lr; int i; - for (lr = ap_listeners; lr != NULL; lr = lr->next) { - apr_pollfd_t *pfd = apr_pcalloc(pchild, sizeof(*pfd)); - pfd->desc_type = APR_POLL_SOCKET; - pfd->desc.s = lr->sd; - apr_pollset_remove(event_pollset, pfd); - } + disable_listensocks(process_slot); ap_close_listeners(); *closed = 1; dying = 1; ap_scoreboard_image->parent[process_slot].quiescing = 1; for (i = 0; i < threads_per_child; ++i) { ap_update_child_status_from_indexes(process_slot, i, - SERVER_DEAD, NULL); + SERVER_GRACEFUL, NULL); } /* wake up the main thread */ kill(ap_my_pid, SIGTERM); @@ -918,12 +1031,18 @@ static apr_status_t init_pollset(apr_pool_t *p) #endif ap_listen_rec *lr; listener_poll_type *pt; - - APR_RING_INIT(&timeout_head, conn_state_t, timeout_list); - APR_RING_INIT(&keepalive_timeout_head, conn_state_t, timeout_list); - - for (lr = ap_listeners; lr != NULL; lr = lr->next) { - apr_pollfd_t *pfd = apr_palloc(p, sizeof(*pfd)); + int i = 0; + + TO_QUEUE_INIT(write_completion_q); + TO_QUEUE_INIT(keepalive_q); + TO_QUEUE_INIT(linger_q); + TO_QUEUE_INIT(short_linger_q); + + listener_pollfd = apr_palloc(p, sizeof(apr_pollfd_t) * num_listensocks); + for (lr = ap_listeners; lr != NULL; lr = lr->next, i++) { + apr_pollfd_t *pfd; + AP_DEBUG_ASSERT(i < num_listensocks); + pfd = &listener_pollfd[i]; pt = apr_pcalloc(p, sizeof(*pt)); pfd->desc_type = APR_POLL_SOCKET; pfd->desc.s = lr->sd; @@ -970,12 +1089,6 @@ static apr_status_t push2worker(const apr_pollfd_t * pfd, conn_state_t *cs = (conn_state_t *) pt->baton; apr_status_t rc; - if (pt->bypass_push) { - return APR_SUCCESS; - } - - pt->bypass_push = 1; - rc = apr_pollset_remove(pollset, pfd); /* @@ -987,7 +1100,8 @@ static apr_status_t push2worker(const apr_pollfd_t * pfd, if (rc != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rc)) { ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf, "pollset remove failed"); - cs->state = CONN_STATE_LINGER; + start_lingering_close(cs); + return rc; } rc = ap_queue_push(worker_queue, cs->pfd.desc.s, cs, cs->p); @@ -1007,36 +1121,41 @@ static apr_status_t push2worker(const apr_pollfd_t * pfd, } /* get_worker: - * reserve a worker thread, block if all are currently busy. - * this prevents the worker queue from overflowing and lets - * other processes accept new connections in the mean time. + * If *have_idle_worker_p == 0, reserve a worker thread, and set + * *have_idle_worker_p = 1. + * If *have_idle_worker_p is already 1, will do nothing. + * If blocking == 1, block if all workers are currently busy. + * If no worker was available immediately, will set *all_busy to 1. + * XXX: If there are no workers, we should not block immediately but + * XXX: close all keep-alive connections first. */ -static int get_worker(int *have_idle_worker_p) +static void get_worker(int *have_idle_worker_p, int blocking, int *all_busy) { apr_status_t rc; - if (!*have_idle_worker_p) { - rc = ap_queue_info_wait_for_idler(worker_queue_info); - - if (rc == APR_SUCCESS) { - *have_idle_worker_p = 1; - return 1; - } - else { - if (!APR_STATUS_IS_EOF(rc)) { - ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf, - "ap_queue_info_wait_for_idler failed. " - "Attempting to shutdown process gracefully"); - signal_threads(ST_GRACEFUL); - } - return 0; - } - } - else { + if (*have_idle_worker_p) { /* already reserved a worker thread - must have hit a * transient error on a previous pass */ - return 1; + return; + } + + if (blocking) + rc = ap_queue_info_wait_for_idler(worker_queue_info, all_busy); + else + rc = ap_queue_info_try_get_idler(worker_queue_info); + + if (rc == APR_SUCCESS) { + *have_idle_worker_p = 1; + } + else if (!blocking && rc == APR_EAGAIN) { + *all_busy = 1; + } + else if (!APR_STATUS_IS_EOF(rc)) { + ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf, + "ap_queue_info_wait_for_idler failed. " + "Attempting to shutdown process gracefully"); + signal_threads(ST_GRACEFUL); } } @@ -1098,6 +1217,79 @@ static apr_status_t event_register_timed_callback(apr_time_t t, return APR_SUCCESS; } +static void process_lingering_close(conn_state_t *cs, const apr_pollfd_t *pfd) +{ + apr_socket_t *csd = ap_get_conn_socket(cs->c); + char dummybuf[2048]; + apr_size_t nbytes; + apr_status_t rv; + struct timeout_queue *q; + q = (cs->state == CONN_STATE_LINGER_SHORT) ? &short_linger_q : &linger_q; + + /* socket is already in non-blocking state */ + do { + nbytes = sizeof(dummybuf); + rv = apr_socket_recv(csd, dummybuf, &nbytes); + } while (rv == APR_SUCCESS); + + if (!APR_STATUS_IS_EOF(rv)) { + return; + } + + rv = apr_pollset_remove(event_pollset, pfd); + AP_DEBUG_ASSERT(rv == APR_SUCCESS); + + rv = apr_socket_close(csd); + AP_DEBUG_ASSERT(rv == APR_SUCCESS); + + apr_thread_mutex_lock(timeout_mutex); + TO_QUEUE_REMOVE(*q, cs); + apr_thread_mutex_unlock(timeout_mutex); + TO_QUEUE_ELEM_INIT(cs); + + apr_pool_clear(cs->p); + ap_push_pool(worker_queue_info, cs->p); +} + +/* call 'func' for all elements of 'q' with timeout less than 'timeout_time'. + * Pre-condition: timeout_mutex must already be locked + * Post-condition: timeout_mutex will be locked again + */ +static void process_timeout_queue(struct timeout_queue *q, + apr_time_t timeout_time, + int (*func)(conn_state_t *)) +{ + int count = 0; + conn_state_t *first, *cs, *last; + if (!q->count) { + return; + } + AP_DEBUG_ASSERT(!APR_RING_EMPTY(&q->head, conn_state_t, timeout_list)); + + cs = first = APR_RING_FIRST(&q->head); + while (cs != APR_RING_SENTINEL(&q->head, conn_state_t, timeout_list) + && cs->expiration_time < timeout_time) { + last = cs; + cs = APR_RING_NEXT(cs, timeout_list); + count++; + } + if (!count) + return; + + APR_RING_UNSPLICE(first, last, timeout_list); + AP_DEBUG_ASSERT(q->count >= count); + q->count -= count; + apr_thread_mutex_unlock(timeout_mutex); + while (count) { + cs = APR_RING_NEXT(first, timeout_list); + TO_QUEUE_ELEM_INIT(first); + func(first); + first = cs; + count--; + } + apr_thread_mutex_lock(timeout_mutex); +} + static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) { timer_event_t *ep; @@ -1114,10 +1306,11 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) const apr_pollfd_t *out_pfd; apr_int32_t num = 0; apr_interval_time_t timeout_interval; - apr_time_t timeout_time, now; + apr_time_t timeout_time = 0, now, last_log; listener_poll_type *pt; - int closed = 0; + int closed = 0, listeners_disabled = 0; + last_log = apr_time_now(); free(ti); /* the following times out events that are really close in the future @@ -1128,6 +1321,9 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) #define TIMEOUT_FUDGE_FACTOR 100000 #define EVENT_FUDGE_FACTOR 10000 +/* XXX: this should be a config options */ +#define WORKER_OVERCOMMIT 2 + rc = init_pollset(tpool); if (rc != APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf, @@ -1144,6 +1340,7 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) apr_signal(LISTENER_SIGNAL, dummy_signal_handler); for (;;) { + int workers_were_busy = 0; if (listener_may_exit) { close_listeners(process_slot, &closed); if (terminate_mode == ST_UNGRACEFUL @@ -1155,8 +1352,22 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) check_infinite_requests(); } - now = apr_time_now(); + if (APLOGtrace6(ap_server_conf)) { + /* trace log status every second */ + if (now - last_log > apr_time_from_msec(1000)) { + last_log = now; + apr_thread_mutex_lock(timeout_mutex); + ap_log_error(APLOG_MARK, APLOG_TRACE6, 0, ap_server_conf, + "connections: %d (write-completion: %d " + "keep-alive: %d lingering: %d)", + connection_count, write_completion_q.count, + keepalive_q.count, + linger_q.count + short_linger_q.count); + apr_thread_mutex_unlock(timeout_mutex); + } + } + apr_thread_mutex_lock(g_timer_ring_mtx); if (!APR_RING_EMPTY(&timer_ring, timer_event_t, link)) { te = APR_RING_FIRST(&timer_ring); @@ -1179,7 +1390,6 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) } #endif rc = apr_pollset_poll(event_pollset, timeout_interval, &num, &out_pfd); - if (rc != APR_SUCCESS) { if (APR_STATUS_IS_EINTR(rc)) { continue; @@ -1216,16 +1426,47 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) } apr_thread_mutex_unlock(g_timer_ring_mtx); - while (num && get_worker(&have_idle_worker)) { + while (num) { pt = (listener_poll_type *) out_pfd->client_data; if (pt->type == PT_CSD) { /* one of the sockets is readable */ + struct timeout_queue *remove_from_q = &write_completion_q; + int blocking = 1; cs = (conn_state_t *) pt->baton; switch (cs->state) { case CONN_STATE_CHECK_REQUEST_LINE_READABLE: cs->state = CONN_STATE_READ_REQUEST_LINE; - break; + remove_from_q = &keepalive_q; + /* don't wait for a worker for a keepalive request */ + blocking = 0; + /* FALL THROUGH */ case CONN_STATE_WRITE_COMPLETION: + get_worker(&have_idle_worker, blocking, + &workers_were_busy); + apr_thread_mutex_lock(timeout_mutex); + TO_QUEUE_REMOVE(*remove_from_q, cs); + apr_thread_mutex_unlock(timeout_mutex); + TO_QUEUE_ELEM_INIT(cs); + /* If we didn't get a worker immediately for a keep-alive + * request, we close the connection, so that the client can + * re-connect to a different process. + */ + if (!have_idle_worker) { + start_lingering_close(cs); + break; + } + rc = push2worker(out_pfd, event_pollset); + if (rc != APR_SUCCESS) { + ap_log_error(APLOG_MARK, APLOG_CRIT, rc, + ap_server_conf, "push2worker failed"); + } + else { + have_idle_worker = 0; + } + break; + case CONN_STATE_LINGER_NORMAL: + case CONN_STATE_LINGER_SHORT: + process_lingering_close(cs, out_pfd); break; default: ap_log_error(APLOG_MARK, APLOG_CRIT, rc, @@ -1234,86 +1475,99 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) cs->state); AP_DEBUG_ASSERT(0); } - - apr_thread_mutex_lock(timeout_mutex); - APR_RING_REMOVE(cs, timeout_list); - apr_thread_mutex_unlock(timeout_mutex); - APR_RING_ELEM_INIT(cs, timeout_list); - - rc = push2worker(out_pfd, event_pollset); - if (rc != APR_SUCCESS) { - ap_log_error(APLOG_MARK, APLOG_CRIT, rc, - ap_server_conf, "push2worker failed"); - } - else { - have_idle_worker = 0; - } } else if (pt->type == PT_ACCEPT) { /* A Listener Socket is ready for an accept() */ + if (workers_were_busy) { + if (!listeners_disabled) + disable_listensocks(process_slot); + listeners_disabled = 1; + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, + "All workers busy, not accepting new conns" + "in this process"); + } + else if (apr_atomic_read32(&connection_count) > threads_per_child + + ap_queue_info_get_idlers(worker_queue_info) * WORKER_OVERCOMMIT) + { + if (!listeners_disabled) + disable_listensocks(process_slot); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, + "Too many open connections (%u), " + "not accepting new conns in this process", + apr_atomic_read32(&connection_count)); + ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf, + "Idle workers: %u", + ap_queue_info_get_idlers(worker_queue_info)); + listeners_disabled = 1; + } + else if (listeners_disabled) { + listeners_disabled = 0; + enable_listensocks(process_slot); + } + if (!listeners_disabled) { + lr = (ap_listen_rec *) pt->baton; + ap_pop_pool(&ptrans, worker_queue_info); - lr = (ap_listen_rec *) pt->baton; - - ap_pop_pool(&ptrans, worker_queue_info); - - if (ptrans == NULL) { - /* create a new transaction pool for each accepted socket */ - apr_allocator_t *allocator; - - apr_allocator_create(&allocator); - apr_allocator_max_free_set(allocator, - ap_max_mem_free); - apr_pool_create_ex(&ptrans, pconf, NULL, allocator); - apr_allocator_owner_set(allocator, ptrans); if (ptrans == NULL) { - ap_log_error(APLOG_MARK, APLOG_CRIT, rc, - ap_server_conf, - "Failed to create transaction pool"); - signal_threads(ST_GRACEFUL); - return NULL; + /* create a new transaction pool for each accepted socket */ + apr_allocator_t *allocator; + + apr_allocator_create(&allocator); + apr_allocator_max_free_set(allocator, + ap_max_mem_free); + apr_pool_create_ex(&ptrans, pconf, NULL, allocator); + apr_allocator_owner_set(allocator, ptrans); + if (ptrans == NULL) { + ap_log_error(APLOG_MARK, APLOG_CRIT, rc, + ap_server_conf, + "Failed to create transaction pool"); + signal_threads(ST_GRACEFUL); + return NULL; + } } - } - apr_pool_tag(ptrans, "transaction"); + apr_pool_tag(ptrans, "transaction"); - rc = lr->accept_func(&csd, lr, ptrans); + get_worker(&have_idle_worker, 1, &workers_were_busy); + rc = lr->accept_func(&csd, lr, ptrans); - /* later we trash rv and rely on csd to indicate - * success/failure - */ - AP_DEBUG_ASSERT(rc == APR_SUCCESS || !csd); + /* later we trash rv and rely on csd to indicate + * success/failure + */ + AP_DEBUG_ASSERT(rc == APR_SUCCESS || !csd); - if (rc == APR_EGENERAL) { - /* E[NM]FILE, ENOMEM, etc */ - resource_shortage = 1; - signal_threads(ST_GRACEFUL); - } + if (rc == APR_EGENERAL) { + /* E[NM]FILE, ENOMEM, etc */ + resource_shortage = 1; + signal_threads(ST_GRACEFUL); + } - if (csd != NULL) { - rc = ap_queue_push(worker_queue, csd, NULL, ptrans); - if (rc != APR_SUCCESS) { - /* trash the connection; we couldn't queue the connected - * socket to a worker - */ - apr_socket_close(csd); - ap_log_error(APLOG_MARK, APLOG_CRIT, rc, - ap_server_conf, - "ap_queue_push failed"); - apr_pool_clear(ptrans); - ap_push_pool(worker_queue_info, ptrans); + if (csd != NULL) { + rc = ap_queue_push(worker_queue, csd, NULL, ptrans); + if (rc != APR_SUCCESS) { + /* trash the connection; we couldn't queue the connected + * socket to a worker + */ + apr_socket_close(csd); + ap_log_error(APLOG_MARK, APLOG_CRIT, rc, + ap_server_conf, + "ap_queue_push failed"); + apr_pool_clear(ptrans); + ap_push_pool(worker_queue_info, ptrans); + } + else { + have_idle_worker = 0; + } } else { - have_idle_worker = 0; + apr_pool_clear(ptrans); + ap_push_pool(worker_queue_info, ptrans); } } - else { - apr_pool_clear(ptrans); - ap_push_pool(worker_queue_info, ptrans); - } } /* if:else on pt->type */ #if HAVE_SERF else if (pt->type == PT_SERF) { /* send socket to serf. */ - /* XXXX: this doesn't require get_worker(&have_idle_worker) */ + /* XXXX: this doesn't require get_worker() */ serf_event_trigger(g_serf, pt->baton, out_pfd); } #endif @@ -1325,70 +1579,59 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) * r->request_time for new requests */ now = apr_time_now(); + /* we only do this once per 0.1s (TIMEOUT_FUDGE_FACTOR) */ + if (now > timeout_time) { + struct process_score *ps; + timeout_time = now + TIMEOUT_FUDGE_FACTOR; - /* handle timed out sockets */ - apr_thread_mutex_lock(timeout_mutex); - - /* Step 1: keepalive timeouts */ - cs = APR_RING_FIRST(&keepalive_timeout_head); - timeout_time = now + TIMEOUT_FUDGE_FACTOR; - while (!APR_RING_EMPTY(&keepalive_timeout_head, conn_state_t, timeout_list) - && cs->expiration_time < timeout_time) { - - cs->state = CONN_STATE_LINGER; - - APR_RING_REMOVE(cs, timeout_list); - apr_thread_mutex_unlock(timeout_mutex); + /* handle timed out sockets */ + apr_thread_mutex_lock(timeout_mutex); - if (!get_worker(&have_idle_worker)) { - apr_thread_mutex_lock(timeout_mutex); - APR_RING_INSERT_HEAD(&keepalive_timeout_head, cs, - conn_state_t, timeout_list); - break; + /* Step 1: keepalive timeouts */ + /* If all workers are busy, we kill older keep-alive connections so that they + * may connect to another process. + */ + if (workers_were_busy && keepalive_q.count) { + ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf, + "All workers are busy, will close %d keep-alive " + "connections", + keepalive_q.count); + process_timeout_queue(&keepalive_q, + timeout_time + ap_server_conf->keep_alive_timeout, + start_lingering_close); } - - rc = push2worker(&cs->pfd, event_pollset); - - if (rc != APR_SUCCESS) { - return NULL; - /* XXX return NULL looks wrong - not an init failure - * that bypasses all the cleanup outside the main loop - * break seems more like it - * need to evaluate seriousness of push2worker failures - */ + else { + process_timeout_queue(&keepalive_q, timeout_time, + start_lingering_close); } - have_idle_worker = 0; - apr_thread_mutex_lock(timeout_mutex); - cs = APR_RING_FIRST(&keepalive_timeout_head); - } - - /* Step 2: write completion timeouts */ - cs = APR_RING_FIRST(&timeout_head); - while (!APR_RING_EMPTY(&timeout_head, conn_state_t, timeout_list) - && cs->expiration_time < timeout_time) { - - cs->state = CONN_STATE_LINGER; - APR_RING_REMOVE(cs, timeout_list); + /* Step 2: write completion timeouts */ + process_timeout_queue(&write_completion_q, timeout_time, start_lingering_close); + /* Step 3: (normal) lingering close completion timeouts */ + process_timeout_queue(&linger_q, timeout_time, stop_lingering_close); + /* Step 4: (short) lingering close completion timeouts */ + process_timeout_queue(&short_linger_q, timeout_time, stop_lingering_close); + + ps = ap_get_scoreboard_process(process_slot); + ps->write_completion = write_completion_q.count; + ps->lingering_close = linger_q.count + short_linger_q.count; + ps->keep_alive = keepalive_q.count; apr_thread_mutex_unlock(timeout_mutex); - if (!get_worker(&have_idle_worker)) { - apr_thread_mutex_lock(timeout_mutex); - APR_RING_INSERT_HEAD(&timeout_head, cs, - conn_state_t, timeout_list); - break; - } - - rc = push2worker(&cs->pfd, event_pollset); - if (rc != APR_SUCCESS) { - return NULL; - } - have_idle_worker = 0; - apr_thread_mutex_lock(timeout_mutex); - cs = APR_RING_FIRST(&timeout_head); + ps->connections = apr_atomic_read32(&connection_count); + /* XXX: should count CONN_STATE_SUSPENDED and set ps->suspended */ } - - apr_thread_mutex_unlock(timeout_mutex); - + if (listeners_disabled && !workers_were_busy && + (int)apr_atomic_read32(&connection_count) < + ((int)ap_queue_info_get_idlers(worker_queue_info) - 1) * WORKER_OVERCOMMIT + + threads_per_child) + { + listeners_disabled = 0; + enable_listensocks(process_slot); + } + /* + * XXX: do we need to set some timeout that re-enables the listensocks + * XXX: in case no other event occurs? + */ } /* listener main loop */ close_listeners(process_slot, &closed); @@ -1439,7 +1682,7 @@ static void *APR_THREAD_FUNC worker_thread(apr_thread_t * thd, void *dummy) } ap_update_child_status_from_indexes(process_slot, thread_slot, - dying ? SERVER_DEAD : SERVER_READY, NULL); + dying ? SERVER_GRACEFUL : SERVER_READY, NULL); worker_pop: if (workers_may_exit) { break; @@ -1598,7 +1841,10 @@ static void *APR_THREAD_FUNC start_threads(apr_thread_t * thd, void *dummy) /* Create the main pollset */ rv = apr_pollset_create(&event_pollset, - threads_per_child, + threads_per_child, /* XXX don't we need more, to handle + * connections in K-A or lingering + * close? + */ pchild, APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY); if (rv != APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, @@ -1973,6 +2219,7 @@ static int make_child(server_rec * s, int slot) event_note_child_lost_slot(slot, pid); } ap_scoreboard_image->parent[slot].quiescing = 0; + ap_scoreboard_image->parent[slot].not_accepting = 0; event_note_child_started(slot, pid); return 0; } @@ -2048,8 +2295,9 @@ static void perform_idle_server_maintenance(void) */ if (ps->pid != 0) { /* XXX just set all_dead_threads in outer for loop if no pid? not much else matters */ - if (status <= SERVER_READY && - !ps->quiescing && ps->generation == retained->my_generation) { + if (status <= SERVER_READY && !ps->quiescing && !ps->not_accepting + && ps->generation == retained->my_generation) + { ++idle_thread_count; } if (status >= SERVER_READY && status < SERVER_GRACEFUL) { diff --git a/server/mpm/event/fdqueue.c b/server/mpm/event/fdqueue.c index 26b5906d22..9566df3bb4 100644 --- a/server/mpm/event/fdqueue.c +++ b/server/mpm/event/fdqueue.c @@ -127,7 +127,19 @@ apr_status_t ap_queue_info_set_idle(fd_queue_info_t * queue_info, return APR_SUCCESS; } -apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t * queue_info) +apr_status_t ap_queue_info_try_get_idler(fd_queue_info_t * queue_info) +{ + int prev_idlers; + prev_idlers = apr_atomic_dec32((apr_uint32_t *)&(queue_info->idlers)); + if (prev_idlers <= 0) { + apr_atomic_inc32((apr_uint32_t *)&(queue_info->idlers)); /* back out dec */ + return APR_EAGAIN; + } + return APR_SUCCESS; +} + +apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t * queue_info, + int *had_to_block) { apr_status_t rv; int prev_idlers; @@ -165,6 +177,7 @@ apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t * queue_info) * threads are waiting on an idle worker. */ if (queue_info->idlers < 0) { + *had_to_block = 1; rv = apr_thread_cond_wait(queue_info->wait_for_idler, queue_info->idlers_mutex); if (rv != APR_SUCCESS) { @@ -191,6 +204,14 @@ apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t * queue_info) } } +apr_uint32_t ap_queue_info_get_idlers(fd_queue_info_t * queue_info) +{ + apr_int32_t val; + val = (apr_int32_t)apr_atomic_read32((apr_uint32_t *)&queue_info->idlers); + if (val < 0) + return 0; + return val; +} void ap_push_pool(fd_queue_info_t * queue_info, apr_pool_t * pool_to_recycle) diff --git a/server/mpm/event/fdqueue.h b/server/mpm/event/fdqueue.h index 9c1deb6441..a8878b1062 100644 --- a/server/mpm/event/fdqueue.h +++ b/server/mpm/event/fdqueue.h @@ -46,8 +46,11 @@ apr_status_t ap_queue_info_create(fd_queue_info_t ** queue_info, int max_recycled_pools); apr_status_t ap_queue_info_set_idle(fd_queue_info_t * queue_info, apr_pool_t * pool_to_recycle); -apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t * queue_info); +apr_status_t ap_queue_info_try_get_idler(fd_queue_info_t * queue_info); +apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t * queue_info, + int *had_to_block); apr_status_t ap_queue_info_term(fd_queue_info_t * queue_info); +apr_uint32_t ap_queue_info_get_idlers(fd_queue_info_t * queue_info); struct fd_queue_elem_t { -- 2.49.0