]> granicus.if.org Git - apache/commitdiff
Some improvements for handling of many connections for MPM event:
authorStefan Fritsch <sf@apache.org>
Sun, 19 Jun 2011 12:23:42 +0000 (12:23 +0000)
committerStefan Fritsch <sf@apache.org>
Sun, 19 Jun 2011 12:23:42 +0000 (12:23 +0000)
- Process lingering close asynchronously instead of tying up worker threads
  (based on patch by Jeff Trawick).

- If the number of connections of a process is above

     threads_per_child  +  WORKER_OVERCOMMIT * (idle_workers - 1)

  (WORKER_OVERCOMMIT is fixed at 2, at the moment), or if all workers are busy,
  don't accept new connections in that process. Such a dynamic connection limit
  is necessary because we may have both async and non-async (ssl) connections.
  WORKER_OVERCOMMIT should be a config option.

- Don't count idle workers of not-accepting processes against MinSpareThreads,
  so that the parent will spawn new processes when necessary.

- If we receive a keep-alive request while all workers are busy, don't block
  but close the connection immediately so that the client will re-connect to a
  different process.

Related changes:

- Log what is going on at trace loglevels.
- Remove the bypass_push poll type flag, this code cannot be hit anymore
  (if it ever could?).
- Add some macro helpers for dealing with timeout queues.

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1137358 13f79535-47bb-0310-9956-ffa450edef68

CHANGES
include/ap_mmn.h
include/http_connection.h
include/httpd.h
include/scoreboard.h
server/connection.c
server/mpm/event/event.c
server/mpm/event/fdqueue.c
server/mpm/event/fdqueue.h

diff --git a/CHANGES b/CHANGES
index 346513884413e5be665a1a691f0b41947edacc29..03e72a387b92fcefe872c47239220dcd171e64a6 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -2,6 +2,13 @@
 
 Changes with Apache 2.3.13
 
+  *) mpm_event: If the number of connections of a process is very high, or if
+     all workers are busy, don't accept new connections in that process.
+     [Stefan Fritsch]
+
+  *) mpm_event: Process lingering close asynchronously instead of tying up
+     worker threads. [Jeff Trawick, Stefan Fritsch]
+
   *) mpm_event: If MaxMemFree is set, limit the number of pools that is kept
      around. [Stefan Fritsch]
 
index b7f927ca26c6a9f817dc7b1999969492d66a9b74..f5c934645924a9b5e0e3e1fdc4800c1dcb302ebd 100644 (file)
  *                         Add ap_context_*(), ap_set_context_info(), ap_set_document_root()
  * 20110605.1 (2.3.13-dev) add ap_(get|set)_core_module_config()
  * 20110605.2 (2.3.13-dev) add ap_get_conn_socket()
+ * 20110619.0 (2.3.13-dev) add async connection infos to process_score in scoreboard,
+ *                         add ap_start_lingering_close(),
+ *                         add conn_state_e:CONN_STATE_LINGER_NORMAL and CONN_STATE_LINGER_SHORT
  */
 
 #define MODULE_MAGIC_COOKIE 0x41503234UL /* "AP24" */
 
 #ifndef MODULE_MAGIC_NUMBER_MAJOR
-#define MODULE_MAGIC_NUMBER_MAJOR 20110605
+#define MODULE_MAGIC_NUMBER_MAJOR 20110619
 #endif
-#define MODULE_MAGIC_NUMBER_MINOR 2                    /* 0...n */
+#define MODULE_MAGIC_NUMBER_MINOR 0                    /* 0...n */
 
 /**
  * Determine if the server's current MODULE_MAGIC_NUMBER is at least a
index 37437cfd5abd1ad0ad9614aa6267017f7e059a0a..d5bca043bb22b06d0400adc05e0eb70c8167a807 100644 (file)
@@ -70,7 +70,9 @@ AP_CORE_DECLARE(void) ap_flush_conn(conn_rec *c);
  */
 AP_DECLARE(void) ap_lingering_close(conn_rec *c);
 
-  /* Hooks */
+AP_DECLARE(int) ap_start_lingering_close(conn_rec *c);
+
+/* Hooks */
 /**
  * create_connection is a RUN_FIRST hook which allows modules to create 
  * connections. In general, you should not install filters with the 
index 89d421a5f9676c3f4893c76c0337fdda9c7db5da..b09a9d040ff3e6cef46842c74e3266e72f0aa867 100644 (file)
@@ -1133,7 +1133,9 @@ typedef enum  {
     CONN_STATE_HANDLER,
     CONN_STATE_WRITE_COMPLETION,
     CONN_STATE_SUSPENDED,
-    CONN_STATE_LINGER
+    CONN_STATE_LINGER,
+    CONN_STATE_LINGER_NORMAL,
+    CONN_STATE_LINGER_SHORT
 } conn_state_e;
 
 /** 
index 1bf0ad2de47b27b208b9c3cf08c23a3b0613a44c..98fd7cf7480c1398e433d9a95adb8f12f3e559b9 100644 (file)
@@ -132,9 +132,17 @@ typedef struct process_score process_score;
 struct process_score {
     pid_t pid;
     ap_generation_t generation;        /* generation of this child */
-    int quiescing;          /* the process whose pid is stored above is
+    char quiescing;         /* the process whose pid is stored above is
                              * going down gracefully
                              */
+    char not_accepting;     /* the process is busy and is not accepting more
+                             * connections (for async MPMs)
+                             */
+    apr_uint32_t connections;       /* total connections (for async MPMs) */
+    apr_uint32_t write_completion;  /* async connections doing write completion */
+    apr_uint32_t lingering_close;   /* async connections in lingering close */
+    apr_uint32_t keep_alive;        /* async connections in keep alive */
+    apr_uint32_t suspended;         /* connections suspended by some module */
 };
 
 /* Scoreboard is now in 'local' memory, since it isn't updated once created,
index 31279b3338e1936cbb1b90b833ec6477dc167519..13ba951b0c2410570a84adda38d84cde7bc8d0b1 100644 (file)
@@ -93,15 +93,13 @@ AP_CORE_DECLARE(void) ap_flush_conn(conn_rec *c)
  * all the response data has been sent to the client.
  */
 #define SECONDS_TO_LINGER  2
-AP_DECLARE(void) ap_lingering_close(conn_rec *c)
+
+AP_DECLARE(int) ap_start_lingering_close(conn_rec *c)
 {
-    char dummybuf[512];
-    apr_size_t nbytes;
-    apr_time_t timeup = 0;
     apr_socket_t *csd = ap_get_conn_socket(c);
 
     if (!csd) {
-        return;
+        return 1;
     }
 
     ap_update_child_status(c->sbh, SERVER_CLOSING, NULL);
@@ -109,7 +107,7 @@ AP_DECLARE(void) ap_lingering_close(conn_rec *c)
 #ifdef NO_LINGCLOSE
     ap_flush_conn(c); /* just close it */
     apr_socket_close(csd);
-    return;
+    return 1;
 #endif
 
     /* Close the connection, being careful to send out whatever is still
@@ -122,7 +120,7 @@ AP_DECLARE(void) ap_lingering_close(conn_rec *c)
 
     if (c->aborted) {
         apr_socket_close(csd);
-        return;
+        return 1;
     }
 
     /* Shut down the socket for write, which will send a FIN
@@ -131,6 +129,20 @@ AP_DECLARE(void) ap_lingering_close(conn_rec *c)
     if (apr_socket_shutdown(csd, APR_SHUTDOWN_WRITE) != APR_SUCCESS
         || c->aborted) {
         apr_socket_close(csd);
+        return 1;
+    }
+
+    return 0;
+}
+
+AP_DECLARE(void) ap_lingering_close(conn_rec *c)
+{
+    char dummybuf[512];
+    apr_size_t nbytes;
+    apr_time_t timeup = 0;
+    apr_socket_t *csd = ap_get_conn_socket(c);
+
+    if (ap_start_lingering_close(c)) {
         return;
     }
 
index b002f56910c19b3e063951d33964ec0c12b9b251..432ba81fbc507a724a5a05676eb4f725b5f5c1e8 100644 (file)
 #define apr_time_from_msec(x) (x * 1000)
 #endif
 
+#ifndef MAX_SECS_TO_LINGER
+#define MAX_SECS_TO_LINGER 30
+#endif
+#define SECONDS_TO_LINGER  2
+
 /*
  * Actual definitions of config globals
  */
@@ -174,7 +179,38 @@ static int mpm_state = AP_MPMQ_STARTING;
 
 static apr_thread_mutex_t *timeout_mutex;
 APR_RING_HEAD(timeout_head_t, conn_state_t);
-static struct timeout_head_t timeout_head, keepalive_timeout_head;
+struct timeout_queue {
+    struct timeout_head_t head;
+    int count;
+    const char *tag;
+};
+static struct timeout_queue write_completion_q, keepalive_q, linger_q,
+                            short_linger_q;
+static apr_pollfd_t *listener_pollfd;
+
+/*
+ * Macros for accessing struct timeout_queue.
+ * For TO_QUEUE_APPEND and TO_QUEUE_REMOVE, timeout_mutex must be held.
+ */
+#define TO_QUEUE_APPEND(q, el)                                            \
+    do {                                                                  \
+        APR_RING_INSERT_TAIL(&(q).head, el, conn_state_t, timeout_list);  \
+        (q).count++;                                                      \
+    } while (0)
+
+#define TO_QUEUE_REMOVE(q, el)             \
+    do {                                   \
+        APR_RING_REMOVE(el, timeout_list); \
+        (q).count--;                       \
+    } while (0)
+
+#define TO_QUEUE_INIT(q)                                            \
+    do {                                                            \
+            APR_RING_INIT(&(q).head, conn_state_t, timeout_list);   \
+            (q).tag = #q;                                           \
+    } while (0)
+
+#define TO_QUEUE_ELEM_INIT(el) APR_RING_ELEM_INIT(el, timeout_list)
 
 static apr_pollset_t *event_pollset;
 
@@ -218,7 +254,6 @@ typedef enum
 typedef struct
 {
     poll_type_e type;
-    int bypass_push;
     void *baton;
 } listener_poll_type;
 
@@ -299,6 +334,32 @@ static apr_os_thread_t *listener_os_thread;
  */
 static apr_socket_t **worker_sockets;
 
+static void disable_listensocks(int process_slot)
+{
+    int i;
+    for (i = 0; i < num_listensocks; i++) {
+        apr_pollset_remove(event_pollset, &listener_pollfd[i]);
+    }
+    ap_scoreboard_image->parent[process_slot].not_accepting = 1;
+}
+
+static void enable_listensocks(int process_slot)
+{
+    int i;
+    ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf,
+                 "Accepting new connections again: "
+                 "%u active conns, %u idle workers",
+                 apr_atomic_read32(&connection_count),
+                 ap_queue_info_get_idlers(worker_queue_info));
+    for (i = 0; i < num_listensocks; i++)
+        apr_pollset_add(event_pollset, &listener_pollfd[i]);
+    /*
+     * XXX: This is not yet optimal. If many workers suddenly become available,
+     * XXX: the parent may kill some processes off too soon.
+     */
+    ap_scoreboard_image->parent[process_slot].not_accepting = 0;
+}
+
 static void close_worker_sockets(void)
 {
     int i;
@@ -654,6 +715,69 @@ static void set_signals(void)
 #endif
 }
 
+static int start_lingering_close(conn_state_t *cs)
+{
+    apr_status_t rv;
+    if (ap_start_lingering_close(cs->c)) {
+        apr_pool_clear(cs->p);
+        ap_push_pool(worker_queue_info, cs->p);
+        return 0;
+    }
+    else {
+        apr_socket_t *csd = ap_get_conn_socket(cs->c);
+        struct timeout_queue *q;
+
+        rv = apr_socket_timeout_set(csd, 0);
+        AP_DEBUG_ASSERT(rv == APR_SUCCESS);
+        /*
+         * If some module requested a shortened waiting period, only wait for
+         * 2s (SECONDS_TO_LINGER). This is useful for mitigating certain
+         * DoS attacks.
+         */
+        if (apr_table_get(cs->c->notes, "short-lingering-close")) {
+            cs->expiration_time =
+                apr_time_now() + apr_time_from_sec(SECONDS_TO_LINGER);
+            q = &short_linger_q;
+            cs->state = CONN_STATE_LINGER_SHORT;
+        }
+        else {
+            cs->expiration_time =
+                apr_time_now() + apr_time_from_sec(MAX_SECS_TO_LINGER);
+            q = &linger_q;
+            cs->state = CONN_STATE_LINGER_NORMAL;
+        }
+        apr_thread_mutex_lock(timeout_mutex);
+        TO_QUEUE_APPEND(*q, cs);
+        apr_thread_mutex_unlock(timeout_mutex);
+        cs->pfd.reqevents = APR_POLLIN | APR_POLLHUP | APR_POLLERR;
+        rv = apr_pollset_add(event_pollset, &cs->pfd);
+        if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
+            ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf,
+                         "start_lingering_close: apr_pollset_add failure");
+            AP_DEBUG_ASSERT(0);
+        }
+    }
+    return 1;
+}
+
+static int stop_lingering_close(conn_state_t *cs)
+{
+    apr_status_t rv;
+    apr_socket_t *csd = ap_get_conn_socket(cs->c);
+    ap_log_error(APLOG_MARK, APLOG_TRACE4, 0, ap_server_conf,
+                 "socket reached timeout in lingering-close state");
+    rv = apr_socket_close(csd);
+    if (rv != APR_SUCCESS) {
+        ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, "error closing socket");
+        AP_DEBUG_ASSERT(0);
+    }
+    apr_pool_clear(cs->p);
+    ap_push_pool(worker_queue_info, cs->p);
+    return 0;
+}
+
+
+
 /*****************************************************************
  * Child process main loop.
  */
@@ -689,10 +813,9 @@ static int process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * sock
         cs->pfd.reqevents = APR_POLLIN;
         cs->pfd.desc.s = sock;
         pt->type = PT_CSD;
-        pt->bypass_push = 1;
         pt->baton = cs;
         cs->pfd.client_data = pt;
-        APR_RING_ELEM_INIT(cs, timeout_list);
+        TO_QUEUE_ELEM_INIT(cs);
 
         ap_update_vhost_given_ip(c);
 
@@ -755,6 +878,7 @@ read_request:
     if (cs->state == CONN_STATE_WRITE_COMPLETION) {
         ap_filter_t *output_filter = c->output_filters;
         apr_status_t rv;
+        ap_update_child_status_from_conn(sbh, SERVER_BUSY_WRITE, c);
         while (output_filter->next != NULL) {
             output_filter = output_filter->next;
         }
@@ -771,9 +895,8 @@ read_request:
              */
             cs->expiration_time = ap_server_conf->timeout + apr_time_now();
             apr_thread_mutex_lock(timeout_mutex);
-            APR_RING_INSERT_TAIL(&timeout_head, cs, conn_state_t, timeout_list);
+            TO_QUEUE_APPEND(write_completion_q, cs);
             apr_thread_mutex_unlock(timeout_mutex);
-            pt->bypass_push = 0;
             cs->pfd.reqevents = APR_POLLOUT | APR_POLLHUP | APR_POLLERR;
             rc = apr_pollset_add(event_pollset, &cs->pfd);
             return 1;
@@ -792,14 +915,11 @@ read_request:
     }
 
     if (cs->state == CONN_STATE_LINGER) {
-        ap_lingering_close(c);
-        apr_pool_clear(p);
-        ap_push_pool(worker_queue_info, p);
-        return 0;
+        if (!start_lingering_close(cs))
+            return 0;
     }
     else if (cs->state == CONN_STATE_CHECK_REQUEST_LINE_READABLE) {
         apr_status_t rc;
-        listener_poll_type *pt = (listener_poll_type *) cs->pfd.client_data;
 
         /* It greatly simplifies the logic to use a single timeout value here
          * because the new element can just be added to the end of the list and
@@ -812,10 +932,9 @@ read_request:
         cs->expiration_time = ap_server_conf->keep_alive_timeout +
                               apr_time_now();
         apr_thread_mutex_lock(timeout_mutex);
-        APR_RING_INSERT_TAIL(&keepalive_timeout_head, cs, conn_state_t, timeout_list);
+        TO_QUEUE_APPEND(keepalive_q, cs);
         apr_thread_mutex_unlock(timeout_mutex);
 
-        pt->bypass_push = 0;
         /* Add work to pollset. */
         cs->pfd.reqevents = APR_POLLIN;
         rc = apr_pollset_add(event_pollset, &cs->pfd);
@@ -844,21 +963,15 @@ static void check_infinite_requests(void)
 
 static void close_listeners(int process_slot, int *closed) {
     if (!*closed) {
-        ap_listen_rec *lr;
         int i;
-        for (lr = ap_listeners; lr != NULL; lr = lr->next) {
-            apr_pollfd_t *pfd = apr_pcalloc(pchild, sizeof(*pfd));
-            pfd->desc_type = APR_POLL_SOCKET;
-            pfd->desc.s = lr->sd;
-            apr_pollset_remove(event_pollset, pfd);
-        }
+        disable_listensocks(process_slot);
         ap_close_listeners();
         *closed = 1;
         dying = 1;
         ap_scoreboard_image->parent[process_slot].quiescing = 1;
         for (i = 0; i < threads_per_child; ++i) {
             ap_update_child_status_from_indexes(process_slot, i,
-                                                SERVER_DEAD, NULL);
+                                                SERVER_GRACEFUL, NULL);
         }
         /* wake up the main thread */
         kill(ap_my_pid, SIGTERM);
@@ -918,12 +1031,18 @@ static apr_status_t init_pollset(apr_pool_t *p)
 #endif
     ap_listen_rec *lr;
     listener_poll_type *pt;
-
-    APR_RING_INIT(&timeout_head, conn_state_t, timeout_list);
-    APR_RING_INIT(&keepalive_timeout_head, conn_state_t, timeout_list);
-
-    for (lr = ap_listeners; lr != NULL; lr = lr->next) {
-        apr_pollfd_t *pfd = apr_palloc(p, sizeof(*pfd));
+    int i = 0;
+
+    TO_QUEUE_INIT(write_completion_q);
+    TO_QUEUE_INIT(keepalive_q);
+    TO_QUEUE_INIT(linger_q);
+    TO_QUEUE_INIT(short_linger_q);
+
+    listener_pollfd = apr_palloc(p, sizeof(apr_pollfd_t) * num_listensocks);
+    for (lr = ap_listeners; lr != NULL; lr = lr->next, i++) {
+        apr_pollfd_t *pfd;
+        AP_DEBUG_ASSERT(i < num_listensocks);
+        pfd = &listener_pollfd[i];
         pt = apr_pcalloc(p, sizeof(*pt));
         pfd->desc_type = APR_POLL_SOCKET;
         pfd->desc.s = lr->sd;
@@ -970,12 +1089,6 @@ static apr_status_t push2worker(const apr_pollfd_t * pfd,
     conn_state_t *cs = (conn_state_t *) pt->baton;
     apr_status_t rc;
 
-    if (pt->bypass_push) {
-        return APR_SUCCESS;
-    }
-
-    pt->bypass_push = 1;
-
     rc = apr_pollset_remove(pollset, pfd);
 
     /*
@@ -987,7 +1100,8 @@ static apr_status_t push2worker(const apr_pollfd_t * pfd,
     if (rc != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rc)) {
         ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
                      "pollset remove failed");
-        cs->state = CONN_STATE_LINGER;
+        start_lingering_close(cs);
+        return rc;
     }
 
     rc = ap_queue_push(worker_queue, cs->pfd.desc.s, cs, cs->p);
@@ -1007,36 +1121,41 @@ static apr_status_t push2worker(const apr_pollfd_t * pfd,
 }
 
 /* get_worker:
- *     reserve a worker thread, block if all are currently busy.
- *     this prevents the worker queue from overflowing and lets
- *     other processes accept new connections in the mean time.
+ *     If *have_idle_worker_p == 0, reserve a worker thread, and set
+ *     *have_idle_worker_p = 1.
+ *     If *have_idle_worker_p is already 1, will do nothing.
+ *     If blocking == 1, block if all workers are currently busy.
+ *     If no worker was available immediately, will set *all_busy to 1.
+ *     XXX: If there are no workers, we should not block immediately but
+ *     XXX: close all keep-alive connections first.
  */
-static int get_worker(int *have_idle_worker_p)
+static void get_worker(int *have_idle_worker_p, int blocking, int *all_busy)
 {
     apr_status_t rc;
 
-    if (!*have_idle_worker_p) {
-        rc = ap_queue_info_wait_for_idler(worker_queue_info);
-
-        if (rc == APR_SUCCESS) {
-            *have_idle_worker_p = 1;
-            return 1;
-        }
-        else {
-            if (!APR_STATUS_IS_EOF(rc)) {
-                ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
-                             "ap_queue_info_wait_for_idler failed.  "
-                             "Attempting to shutdown process gracefully");
-                signal_threads(ST_GRACEFUL);
-            }
-            return 0;
-        }
-    }
-    else {
+    if (*have_idle_worker_p) {
         /* already reserved a worker thread - must have hit a
          * transient error on a previous pass
          */
-        return 1;
+        return;
+    }
+
+    if (blocking)
+        rc = ap_queue_info_wait_for_idler(worker_queue_info, all_busy);
+    else
+        rc = ap_queue_info_try_get_idler(worker_queue_info);
+
+    if (rc == APR_SUCCESS) {
+        *have_idle_worker_p = 1;
+    }
+    else if (!blocking && rc == APR_EAGAIN) {
+        *all_busy = 1;
+    }
+    else if (!APR_STATUS_IS_EOF(rc)) {
+        ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
+                     "ap_queue_info_wait_for_idler failed.  "
+                     "Attempting to shutdown process gracefully");
+        signal_threads(ST_GRACEFUL);
     }
 }
 
@@ -1098,6 +1217,79 @@ static apr_status_t event_register_timed_callback(apr_time_t t,
     return APR_SUCCESS;
 }
 
+static void process_lingering_close(conn_state_t *cs, const apr_pollfd_t *pfd)
+{
+    apr_socket_t *csd = ap_get_conn_socket(cs->c);
+    char dummybuf[2048];
+    apr_size_t nbytes;
+    apr_status_t rv;
+    struct timeout_queue *q;
+    q = (cs->state == CONN_STATE_LINGER_SHORT) ?  &short_linger_q : &linger_q;
+
+    /* socket is already in non-blocking state */
+    do {
+        nbytes = sizeof(dummybuf);
+        rv = apr_socket_recv(csd, dummybuf, &nbytes);
+    } while (rv == APR_SUCCESS);
+
+    if (!APR_STATUS_IS_EOF(rv)) {
+        return;
+    }
+
+    rv = apr_pollset_remove(event_pollset, pfd);
+    AP_DEBUG_ASSERT(rv == APR_SUCCESS);
+
+    rv = apr_socket_close(csd);
+    AP_DEBUG_ASSERT(rv == APR_SUCCESS);
+
+    apr_thread_mutex_lock(timeout_mutex);
+    TO_QUEUE_REMOVE(*q, cs);
+    apr_thread_mutex_unlock(timeout_mutex);
+    TO_QUEUE_ELEM_INIT(cs);
+
+    apr_pool_clear(cs->p);
+    ap_push_pool(worker_queue_info, cs->p);
+}
+
+/* call 'func' for all elements of 'q' with timeout less than 'timeout_time'.
+ * Pre-condition: timeout_mutex must already be locked
+ * Post-condition: timeout_mutex will be locked again
+ */
+static void process_timeout_queue(struct timeout_queue *q,
+                                  apr_time_t timeout_time,
+                                  int (*func)(conn_state_t *))
+{
+    int count = 0;
+    conn_state_t *first, *cs, *last;
+    if (!q->count) {
+        return;
+    }
+    AP_DEBUG_ASSERT(!APR_RING_EMPTY(&q->head, conn_state_t, timeout_list));
+
+    cs = first = APR_RING_FIRST(&q->head);
+    while (cs != APR_RING_SENTINEL(&q->head, conn_state_t, timeout_list)
+           && cs->expiration_time < timeout_time) {
+        last = cs;
+        cs = APR_RING_NEXT(cs, timeout_list);
+        count++;
+    }
+    if (!count)
+        return;
+
+    APR_RING_UNSPLICE(first, last, timeout_list);
+    AP_DEBUG_ASSERT(q->count >= count);
+    q->count -= count;
+    apr_thread_mutex_unlock(timeout_mutex);
+    while (count) {
+        cs = APR_RING_NEXT(first, timeout_list);
+        TO_QUEUE_ELEM_INIT(first);
+        func(first);
+        first = cs;
+        count--;
+    }
+    apr_thread_mutex_lock(timeout_mutex);
+}
+
 static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
 {
     timer_event_t *ep;
@@ -1114,10 +1306,11 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
     const apr_pollfd_t *out_pfd;
     apr_int32_t num = 0;
     apr_interval_time_t timeout_interval;
-    apr_time_t timeout_time, now;
+    apr_time_t timeout_time = 0, now, last_log;
     listener_poll_type *pt;
-    int closed = 0;
+    int closed = 0, listeners_disabled = 0;
 
+    last_log = apr_time_now();
     free(ti);
 
     /* the following times out events that are really close in the future
@@ -1128,6 +1321,9 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
 #define TIMEOUT_FUDGE_FACTOR 100000
 #define EVENT_FUDGE_FACTOR 10000
 
+/* XXX: this should be a config options */
+#define WORKER_OVERCOMMIT 2
+
     rc = init_pollset(tpool);
     if (rc != APR_SUCCESS) {
         ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
@@ -1144,6 +1340,7 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
     apr_signal(LISTENER_SIGNAL, dummy_signal_handler);
 
     for (;;) {
+        int workers_were_busy = 0;
         if (listener_may_exit) {
             close_listeners(process_slot, &closed);
             if (terminate_mode == ST_UNGRACEFUL
@@ -1155,8 +1352,22 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
             check_infinite_requests();
         }
 
-
         now = apr_time_now();
+        if (APLOGtrace6(ap_server_conf)) {
+            /* trace log status every second */
+            if (now - last_log > apr_time_from_msec(1000)) {
+                last_log = now;
+                apr_thread_mutex_lock(timeout_mutex);
+                ap_log_error(APLOG_MARK, APLOG_TRACE6, 0, ap_server_conf,
+                             "connections: %d (write-completion: %d "
+                             "keep-alive: %d lingering: %d)",
+                             connection_count, write_completion_q.count,
+                             keepalive_q.count,
+                             linger_q.count + short_linger_q.count);
+                apr_thread_mutex_unlock(timeout_mutex);
+            }
+        }
+
         apr_thread_mutex_lock(g_timer_ring_mtx);
         if (!APR_RING_EMPTY(&timer_ring, timer_event_t, link)) {
             te = APR_RING_FIRST(&timer_ring);
@@ -1179,7 +1390,6 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
         }
 #endif
         rc = apr_pollset_poll(event_pollset, timeout_interval, &num, &out_pfd);
-
         if (rc != APR_SUCCESS) {
             if (APR_STATUS_IS_EINTR(rc)) {
                 continue;
@@ -1216,16 +1426,47 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
         }
         apr_thread_mutex_unlock(g_timer_ring_mtx);
 
-        while (num && get_worker(&have_idle_worker)) {
+        while (num) {
             pt = (listener_poll_type *) out_pfd->client_data;
             if (pt->type == PT_CSD) {
                 /* one of the sockets is readable */
+                struct timeout_queue *remove_from_q = &write_completion_q;
+                int blocking = 1;
                 cs = (conn_state_t *) pt->baton;
                 switch (cs->state) {
                 case CONN_STATE_CHECK_REQUEST_LINE_READABLE:
                     cs->state = CONN_STATE_READ_REQUEST_LINE;
-                    break;
+                    remove_from_q = &keepalive_q;
+                    /* don't wait for a worker for a keepalive request */
+                    blocking = 0;
+                    /* FALL THROUGH */
                 case CONN_STATE_WRITE_COMPLETION:
+                    get_worker(&have_idle_worker, blocking,
+                               &workers_were_busy);
+                    apr_thread_mutex_lock(timeout_mutex);
+                    TO_QUEUE_REMOVE(*remove_from_q, cs);
+                    apr_thread_mutex_unlock(timeout_mutex);
+                    TO_QUEUE_ELEM_INIT(cs);
+                    /* If we didn't get a worker immediately for a keep-alive
+                     * request, we close the connection, so that the client can
+                     * re-connect to a different process.
+                     */
+                    if (!have_idle_worker) {
+                        start_lingering_close(cs);
+                        break;
+                    }
+                    rc = push2worker(out_pfd, event_pollset);
+                    if (rc != APR_SUCCESS) {
+                        ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
+                                     ap_server_conf, "push2worker failed");
+                    }
+                    else {
+                        have_idle_worker = 0;
+                    }
+                    break;
+                case CONN_STATE_LINGER_NORMAL:
+                case CONN_STATE_LINGER_SHORT:
+                    process_lingering_close(cs, out_pfd);
                     break;
                 default:
                     ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
@@ -1234,86 +1475,99 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
                                  cs->state);
                     AP_DEBUG_ASSERT(0);
                 }
-
-                apr_thread_mutex_lock(timeout_mutex);
-                APR_RING_REMOVE(cs, timeout_list);
-                apr_thread_mutex_unlock(timeout_mutex);
-                APR_RING_ELEM_INIT(cs, timeout_list);
-
-                rc = push2worker(out_pfd, event_pollset);
-                if (rc != APR_SUCCESS) {
-                    ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
-                                 ap_server_conf, "push2worker failed");
-                }
-                else {
-                    have_idle_worker = 0;
-                }
             }
             else if (pt->type == PT_ACCEPT) {
                 /* A Listener Socket is ready for an accept() */
+                if (workers_were_busy) {
+                    if (!listeners_disabled)
+                        disable_listensocks(process_slot);
+                    listeners_disabled = 1;
+                    ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf,
+                                 "All workers busy, not accepting new conns"
+                                 "in this process");
+                }
+                else if (apr_atomic_read32(&connection_count) > threads_per_child
+                         + ap_queue_info_get_idlers(worker_queue_info) * WORKER_OVERCOMMIT)
+                {
+                    if (!listeners_disabled)
+                        disable_listensocks(process_slot);
+                    ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf,
+                                 "Too many open connections (%u), "
+                                 "not accepting new conns in this process",
+                                 apr_atomic_read32(&connection_count));
+                    ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
+                                 "Idle workers: %u",
+                                 ap_queue_info_get_idlers(worker_queue_info));
+                    listeners_disabled = 1;
+                }
+                else if (listeners_disabled) {
+                    listeners_disabled = 0;
+                    enable_listensocks(process_slot);
+                }
+                if (!listeners_disabled) {
+                    lr = (ap_listen_rec *) pt->baton;
+                    ap_pop_pool(&ptrans, worker_queue_info);
 
-                lr = (ap_listen_rec *) pt->baton;
-
-                ap_pop_pool(&ptrans, worker_queue_info);
-
-                if (ptrans == NULL) {
-                    /* create a new transaction pool for each accepted socket */
-                    apr_allocator_t *allocator;
-
-                    apr_allocator_create(&allocator);
-                    apr_allocator_max_free_set(allocator,
-                                               ap_max_mem_free);
-                    apr_pool_create_ex(&ptrans, pconf, NULL, allocator);
-                    apr_allocator_owner_set(allocator, ptrans);
                     if (ptrans == NULL) {
-                        ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
-                                     ap_server_conf,
-                                     "Failed to create transaction pool");
-                        signal_threads(ST_GRACEFUL);
-                        return NULL;
+                        /* create a new transaction pool for each accepted socket */
+                        apr_allocator_t *allocator;
+
+                        apr_allocator_create(&allocator);
+                        apr_allocator_max_free_set(allocator,
+                                                   ap_max_mem_free);
+                        apr_pool_create_ex(&ptrans, pconf, NULL, allocator);
+                        apr_allocator_owner_set(allocator, ptrans);
+                        if (ptrans == NULL) {
+                            ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
+                                         ap_server_conf,
+                                         "Failed to create transaction pool");
+                            signal_threads(ST_GRACEFUL);
+                            return NULL;
+                        }
                     }
-                }
-                apr_pool_tag(ptrans, "transaction");
+                    apr_pool_tag(ptrans, "transaction");
 
-                rc = lr->accept_func(&csd, lr, ptrans);
+                    get_worker(&have_idle_worker, 1, &workers_were_busy);
+                    rc = lr->accept_func(&csd, lr, ptrans);
 
-                /* later we trash rv and rely on csd to indicate
-                 * success/failure
-                 */
-                AP_DEBUG_ASSERT(rc == APR_SUCCESS || !csd);
+                    /* later we trash rv and rely on csd to indicate
+                     * success/failure
+                     */
+                    AP_DEBUG_ASSERT(rc == APR_SUCCESS || !csd);
 
-                if (rc == APR_EGENERAL) {
-                    /* E[NM]FILE, ENOMEM, etc */
-                    resource_shortage = 1;
-                    signal_threads(ST_GRACEFUL);
-                }
+                    if (rc == APR_EGENERAL) {
+                        /* E[NM]FILE, ENOMEM, etc */
+                        resource_shortage = 1;
+                        signal_threads(ST_GRACEFUL);
+                    }
 
-                if (csd != NULL) {
-                    rc = ap_queue_push(worker_queue, csd, NULL, ptrans);
-                    if (rc != APR_SUCCESS) {
-                        /* trash the connection; we couldn't queue the connected
-                         * socket to a worker
-                         */
-                        apr_socket_close(csd);
-                        ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
-                                     ap_server_conf,
-                                     "ap_queue_push failed");
-                        apr_pool_clear(ptrans);
-                        ap_push_pool(worker_queue_info, ptrans);
+                    if (csd != NULL) {
+                        rc = ap_queue_push(worker_queue, csd, NULL, ptrans);
+                        if (rc != APR_SUCCESS) {
+                            /* trash the connection; we couldn't queue the connected
+                             * socket to a worker
+                             */
+                            apr_socket_close(csd);
+                            ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
+                                         ap_server_conf,
+                                         "ap_queue_push failed");
+                            apr_pool_clear(ptrans);
+                            ap_push_pool(worker_queue_info, ptrans);
+                        }
+                        else {
+                            have_idle_worker = 0;
+                        }
                     }
                     else {
-                        have_idle_worker = 0;
+                        apr_pool_clear(ptrans);
+                        ap_push_pool(worker_queue_info, ptrans);
                     }
                 }
-                else {
-                    apr_pool_clear(ptrans);
-                    ap_push_pool(worker_queue_info, ptrans);
-                }
             }               /* if:else on pt->type */
 #if HAVE_SERF
             else if (pt->type == PT_SERF) {
                 /* send socket to serf. */
-                /* XXXX: this doesn't require get_worker(&have_idle_worker) */
+                /* XXXX: this doesn't require get_worker() */
                 serf_event_trigger(g_serf, pt->baton, out_pfd);
             }
 #endif
@@ -1325,70 +1579,59 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
          * r->request_time for new requests
          */
         now = apr_time_now();
+        /* we only do this once per 0.1s (TIMEOUT_FUDGE_FACTOR) */
+        if (now > timeout_time) {
+            struct process_score *ps;
+            timeout_time = now + TIMEOUT_FUDGE_FACTOR;
 
-        /* handle timed out sockets */
-        apr_thread_mutex_lock(timeout_mutex);
-
-        /* Step 1: keepalive timeouts */
-        cs = APR_RING_FIRST(&keepalive_timeout_head);
-        timeout_time = now + TIMEOUT_FUDGE_FACTOR;
-        while (!APR_RING_EMPTY(&keepalive_timeout_head, conn_state_t, timeout_list)
-               && cs->expiration_time < timeout_time) {
-
-            cs->state = CONN_STATE_LINGER;
-
-            APR_RING_REMOVE(cs, timeout_list);
-            apr_thread_mutex_unlock(timeout_mutex);
+            /* handle timed out sockets */
+            apr_thread_mutex_lock(timeout_mutex);
 
-            if (!get_worker(&have_idle_worker)) {
-                apr_thread_mutex_lock(timeout_mutex);
-                APR_RING_INSERT_HEAD(&keepalive_timeout_head, cs,
-                                     conn_state_t, timeout_list);
-                break;
+            /* Step 1: keepalive timeouts */
+            /* If all workers are busy, we kill older keep-alive connections so that they
+             * may connect to another process.
+             */
+            if (workers_were_busy && keepalive_q.count) {
+                ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
+                             "All workers are busy, will close %d keep-alive "
+                             "connections",
+                             keepalive_q.count);
+                process_timeout_queue(&keepalive_q,
+                                      timeout_time + ap_server_conf->keep_alive_timeout,
+                                      start_lingering_close);
             }
-
-            rc = push2worker(&cs->pfd, event_pollset);
-
-            if (rc != APR_SUCCESS) {
-                return NULL;
-                /* XXX return NULL looks wrong - not an init failure
-                 * that bypasses all the cleanup outside the main loop
-                 * break seems more like it
-                 * need to evaluate seriousness of push2worker failures
-                 */
+            else {
+                process_timeout_queue(&keepalive_q, timeout_time,
+                                      start_lingering_close);
             }
-            have_idle_worker = 0;
-            apr_thread_mutex_lock(timeout_mutex);
-            cs = APR_RING_FIRST(&keepalive_timeout_head);
-        }
-
-        /* Step 2: write completion timeouts */
-        cs = APR_RING_FIRST(&timeout_head);
-        while (!APR_RING_EMPTY(&timeout_head, conn_state_t, timeout_list)
-               && cs->expiration_time < timeout_time) {
-
-            cs->state = CONN_STATE_LINGER;
-            APR_RING_REMOVE(cs, timeout_list);
+            /* Step 2: write completion timeouts */
+            process_timeout_queue(&write_completion_q, timeout_time, start_lingering_close);
+            /* Step 3: (normal) lingering close completion timeouts */
+            process_timeout_queue(&linger_q, timeout_time, stop_lingering_close);
+            /* Step 4: (short) lingering close completion timeouts */
+            process_timeout_queue(&short_linger_q, timeout_time, stop_lingering_close);
+
+            ps = ap_get_scoreboard_process(process_slot);
+            ps->write_completion = write_completion_q.count;
+            ps->lingering_close = linger_q.count + short_linger_q.count;
+            ps->keep_alive = keepalive_q.count;
             apr_thread_mutex_unlock(timeout_mutex);
 
-            if (!get_worker(&have_idle_worker)) {
-                apr_thread_mutex_lock(timeout_mutex);
-                APR_RING_INSERT_HEAD(&timeout_head, cs,
-                                     conn_state_t, timeout_list);
-                break;
-            }
-
-            rc = push2worker(&cs->pfd, event_pollset);
-            if (rc != APR_SUCCESS) {
-                return NULL;
-            }
-            have_idle_worker = 0;
-            apr_thread_mutex_lock(timeout_mutex);
-            cs = APR_RING_FIRST(&timeout_head);
+            ps->connections = apr_atomic_read32(&connection_count);
+            /* XXX: should count CONN_STATE_SUSPENDED and set ps->suspended */
         }
-
-        apr_thread_mutex_unlock(timeout_mutex);
-
+        if (listeners_disabled && !workers_were_busy &&
+            (int)apr_atomic_read32(&connection_count) <
+            ((int)ap_queue_info_get_idlers(worker_queue_info) - 1) * WORKER_OVERCOMMIT +
+            threads_per_child)
+        {
+            listeners_disabled = 0;
+            enable_listensocks(process_slot);
+        }
+        /*
+         * XXX: do we need to set some timeout that re-enables the listensocks
+         * XXX: in case no other event occurs?
+         */
     }     /* listener main loop */
 
     close_listeners(process_slot, &closed);
@@ -1439,7 +1682,7 @@ static void *APR_THREAD_FUNC worker_thread(apr_thread_t * thd, void *dummy)
         }
 
         ap_update_child_status_from_indexes(process_slot, thread_slot,
-                                            dying ? SERVER_DEAD : SERVER_READY, NULL);
+                                            dying ? SERVER_GRACEFUL : SERVER_READY, NULL);
       worker_pop:
         if (workers_may_exit) {
             break;
@@ -1598,7 +1841,10 @@ static void *APR_THREAD_FUNC start_threads(apr_thread_t * thd, void *dummy)
 
     /* Create the main pollset */
     rv = apr_pollset_create(&event_pollset,
-                            threads_per_child,
+                            threads_per_child, /* XXX don't we need more, to handle
+                                                * connections in K-A or lingering
+                                                * close?
+                                                */
                             pchild, APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY);
     if (rv != APR_SUCCESS) {
         ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf,
@@ -1973,6 +2219,7 @@ static int make_child(server_rec * s, int slot)
         event_note_child_lost_slot(slot, pid);
     }
     ap_scoreboard_image->parent[slot].quiescing = 0;
+    ap_scoreboard_image->parent[slot].not_accepting = 0;
     event_note_child_started(slot, pid);
     return 0;
 }
@@ -2048,8 +2295,9 @@ static void perform_idle_server_maintenance(void)
              */
             if (ps->pid != 0) { /* XXX just set all_dead_threads in outer
                                    for loop if no pid?  not much else matters */
-                if (status <= SERVER_READY &&
-                        !ps->quiescing && ps->generation == retained->my_generation) {
+                if (status <= SERVER_READY && !ps->quiescing && !ps->not_accepting
+                    && ps->generation == retained->my_generation)
+                {
                     ++idle_thread_count;
                 }
                 if (status >= SERVER_READY && status < SERVER_GRACEFUL) {
index 26b5906d22bb9881381d02480a62f144a3046045..9566df3bb42097845ec6e31f83e2a46618399b43 100644 (file)
@@ -127,7 +127,19 @@ apr_status_t ap_queue_info_set_idle(fd_queue_info_t * queue_info,
     return APR_SUCCESS;
 }
 
-apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t * queue_info)
+apr_status_t ap_queue_info_try_get_idler(fd_queue_info_t * queue_info)
+{
+    int prev_idlers;
+    prev_idlers = apr_atomic_dec32((apr_uint32_t *)&(queue_info->idlers));
+    if (prev_idlers <= 0) {
+        apr_atomic_inc32((apr_uint32_t *)&(queue_info->idlers));    /* back out dec */
+        return APR_EAGAIN;
+    }
+    return APR_SUCCESS;
+}
+
+apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t * queue_info,
+                                          int *had_to_block)
 {
     apr_status_t rv;
     int prev_idlers;
@@ -165,6 +177,7 @@ apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t * queue_info)
          *     threads are waiting on an idle worker.
          */
         if (queue_info->idlers < 0) {
+            *had_to_block = 1;
             rv = apr_thread_cond_wait(queue_info->wait_for_idler,
                                       queue_info->idlers_mutex);
             if (rv != APR_SUCCESS) {
@@ -191,6 +204,14 @@ apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t * queue_info)
     }
 }
 
+apr_uint32_t ap_queue_info_get_idlers(fd_queue_info_t * queue_info)
+{
+    apr_int32_t val;
+    val = (apr_int32_t)apr_atomic_read32((apr_uint32_t *)&queue_info->idlers);
+    if (val < 0)
+        return 0;
+    return val;
+}
 
 void ap_push_pool(fd_queue_info_t * queue_info,
                                     apr_pool_t * pool_to_recycle)
index 9c1deb6441b366a36494c757c6aa041b7e5c27fd..a8878b106226b465abda4106b23449e501d413f9 100644 (file)
@@ -46,8 +46,11 @@ apr_status_t ap_queue_info_create(fd_queue_info_t ** queue_info,
                                   int max_recycled_pools);
 apr_status_t ap_queue_info_set_idle(fd_queue_info_t * queue_info,
                                     apr_pool_t * pool_to_recycle);
-apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t * queue_info);
+apr_status_t ap_queue_info_try_get_idler(fd_queue_info_t * queue_info);
+apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t * queue_info,
+                                          int *had_to_block);
 apr_status_t ap_queue_info_term(fd_queue_info_t * queue_info);
+apr_uint32_t ap_queue_info_get_idlers(fd_queue_info_t * queue_info);
 
 struct fd_queue_elem_t
 {