]> granicus.if.org Git - apache/commitdiff
Merge r1823047, r1824454, r1824463, r1824464, r1824497, r1824862, r1824877 from trunk:
authorYann Ylavic <ylavic@apache.org>
Tue, 20 Feb 2018 13:42:59 +0000 (13:42 +0000)
committerYann Ylavic <ylavic@apache.org>
Tue, 20 Feb 2018 13:42:59 +0000 (13:42 +0000)
mpm_event: move lingering close "sucker" from the listener to worker(s).

This was the last non-constant time action performed by the listener thread.

It's now handled by the worker thread directly after entering lingering close,
which should directly address the cases when the socket is already closed
remotely at that time, hence avoid more scheduling (it may be the common case
for some scenarios).

And it's only if the above would need blocking (i.e. more data to suck) that
the socket is added to the pollset for the listener to re-schedule a worker
later when ready. If no worker is available at that time then the socket is
forcibly closed (similarly to what's done for keepalive connections in this
case).

Also, since process_lingering_close() is now called by a worker thread and
with almost no depth in the call stack, we can grow the size of the "suck"
buffer from 2K to 32K to potentially call recv() up to sixteen times less.

mpm_event: follow up to r1823047.

Update clogged counter on read_request retry too.

mpm_event: follow up to r1823047: simplify "clogging" logic (reentrance).

mpm_event: follow up to r1823047: complete state validation after processing.

mpm_event: follow up to r1823047: CHANGES entry.

mpm_event: follow up to r1823047 and r1824464.

MMN bump for CONN_STATE_NUM, plus don't consider CONN_STATE_LINGER_* as valid
states returned process_connection (never have been).

mpm_event: follow up to r1823047 and r1824862.

Revert (broken) functional change from r1824862.

Submitted by: ylavic
Reviewed by: ylavic, minfrin, jim

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/branches/2.4.x@1824879 13f79535-47bb-0310-9956-ffa450edef68

CHANGES
include/ap_mmn.h
include/httpd.h
server/mpm/event/event.c

diff --git a/CHANGES b/CHANGES
index 0b17453d3107d76f09016c9b8c0138aabad8d217..3ab0b273763053e3f87b81fd2616fd794bd2f323 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -5,6 +5,8 @@ Changes with Apache 2.4.31
      improper merging of the cache lock in vhost config.
      PR 43164 [Eric Covener]
 
+  *) mpm_event: Do lingering close in worker(s).  [Yann Ylavic]
+
   *) mpm_queue: Put fdqueue code in common for MPMs event and worker.
      [Yann Ylavic]
 
index b18ea724bcc04e22f477b8da7ef6f8e2774c9105..f57d64ad1b50202c83ea397154bdd21342787822 100644 (file)
  *                          ap_regcomp_set_default_cflags and
  *                          ap_regcomp_default_cflag_by_name
  * 20120211.75 (2.4.30-dev) Add hostname_ex to proxy_worker_shared
+ * 20120211.76 (2.4.30-dev) Add CONN_STATE_NUM to enum conn_state_e
  */
 
 #define MODULE_MAGIC_COOKIE 0x41503234UL /* "AP24" */
 #ifndef MODULE_MAGIC_NUMBER_MAJOR
 #define MODULE_MAGIC_NUMBER_MAJOR 20120211
 #endif
-#define MODULE_MAGIC_NUMBER_MINOR 75                  /* 0...n */
+#define MODULE_MAGIC_NUMBER_MINOR 76                  /* 0...n */
 
 /**
  * Determine if the server's current MODULE_MAGIC_NUMBER is at least a
index 61ab2e6ed62042b5a3ce8588951f46ca0c49b8df..a9fe056f34cc71569bcfdb951c83c301838cb08d 100644 (file)
@@ -1199,7 +1199,9 @@ typedef enum  {
     CONN_STATE_SUSPENDED,
     CONN_STATE_LINGER,          /* connection may be closed with lingering */
     CONN_STATE_LINGER_NORMAL,   /* MPM has started lingering close with normal timeout */
-    CONN_STATE_LINGER_SHORT     /* MPM has started lingering close with short timeout */
+    CONN_STATE_LINGER_SHORT,    /* MPM has started lingering close with short timeout */
+
+    CONN_STATE_NUM              /* Number of states (keep/kept last) */
 } conn_state_e;
 
 typedef enum  {
index 74d2596afccd3228676334bf376385529e4220b1..31fa04af0295ac1c5ff1cd80a9f2a92f3394f2c0 100644 (file)
@@ -799,19 +799,18 @@ static void notify_resume(event_conn_state_t *cs, int cleanup)
  */
 static int start_lingering_close_blocking(event_conn_state_t *cs)
 {
-    apr_status_t rv;
-    struct timeout_queue *q;
     apr_socket_t *csd = cs->pfd.desc.s;
 
     if (ap_start_lingering_close(cs->c)) {
         notify_suspend(cs);
         apr_socket_close(csd);
         ap_queue_info_push_pool(worker_queue_info, cs->p);
-        return 0;
+        return DONE;
     }
 
 #ifdef AP_DEBUG
     {
+        apr_status_t rv;
         rv = apr_socket_timeout_set(csd, 0);
         AP_DEBUG_ASSERT(rv == APR_SUCCESS);
     }
@@ -826,34 +825,15 @@ static int start_lingering_close_blocking(event_conn_state_t *cs)
      * DoS attacks.
      */
     if (apr_table_get(cs->c->notes, "short-lingering-close")) {
-        q = short_linger_q;
         cs->pub.state = CONN_STATE_LINGER_SHORT;
     }
     else {
-        q = linger_q;
         cs->pub.state = CONN_STATE_LINGER_NORMAL;
     }
     apr_atomic_inc32(&lingering_count);
     notify_suspend(cs);
 
-    cs->pfd.reqevents = (
-            cs->pub.sense == CONN_SENSE_WANT_WRITE ? APR_POLLOUT :
-                    APR_POLLIN) | APR_POLLHUP | APR_POLLERR;
-    cs->pub.sense = CONN_SENSE_DEFAULT;
-    apr_thread_mutex_lock(timeout_mutex);
-    TO_QUEUE_APPEND(q, cs);
-    rv = apr_pollset_add(event_pollset, &cs->pfd);
-    if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
-        TO_QUEUE_REMOVE(q, cs);
-        apr_thread_mutex_unlock(timeout_mutex);
-        ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03092)
-                     "start_lingering_close: apr_pollset_add failure");
-        apr_socket_close(cs->pfd.desc.s);
-        ap_queue_info_push_pool(worker_queue_info, cs->p);
-        return 0;
-    }
-    apr_thread_mutex_unlock(timeout_mutex);
-    return 1;
+    return OK;
 }
 
 /*
@@ -888,7 +868,7 @@ static int stop_lingering_close(event_conn_state_t *cs)
 {
     apr_socket_t *csd = ap_get_conn_socket(cs->c);
     ap_log_error(APLOG_MARK, APLOG_TRACE4, 0, ap_server_conf,
-                 "socket reached timeout in lingering-close state");
+                 "socket abort in state %i", (int)cs->pub.state);
     abort_socket_nonblocking(csd);
     ap_queue_info_push_pool(worker_queue_info, cs->p);
     if (dying)
@@ -962,6 +942,9 @@ static int event_post_read_request(request_rec *r)
     return OK;
 }
 
+/* Forward declare */
+static void process_lingering_close(event_conn_state_t *cs);
+
 /*
  * process one connection in the worker
  */
@@ -971,7 +954,9 @@ static void process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * soc
 {
     conn_rec *c;
     long conn_id = ID_FROM_CHILD_THREAD(my_child_num, my_thread_num);
-    int rc;
+    int clogging = 0;
+    apr_status_t rv;
+    int rc = OK;
 
     if (cs == NULL) {           /* This is a new connection */
         listener_poll_type *pt = apr_pcalloc(p, sizeof(*pt));
@@ -1028,6 +1013,7 @@ static void process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * soc
         cs->pub.state = CONN_STATE_READ_REQUEST_LINE;
 
         cs->pub.sense = CONN_SENSE_DEFAULT;
+        rc = OK;
     }
     else {
         c = cs->c;
@@ -1038,30 +1024,37 @@ static void process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * soc
         c->id = conn_id;
     }
 
-    rc = OK;
-    if (c->aborted || cs->pub.state == CONN_STATE_LINGER) {
+    if (c->aborted) {
         /* do lingering close below */
         cs->pub.state = CONN_STATE_LINGER;
     }
-    else if (c->clogging_input_filters) {
-        /* Since we have an input filter which 'clogs' the input stream,
-         * like mod_ssl used to, lets just do the normal read from input
-         * filters, like the Worker MPM does. Filters that need to write
-         * where they would otherwise read, or read where they would
-         * otherwise write, should set the sense appropriately.
-         */
-        apr_atomic_inc32(&clogged_count);
-        rc = ap_run_process_connection(c);
-        apr_atomic_dec32(&clogged_count);
-        if (rc == DONE) {
-            rc = OK;
-        }
+    else if (cs->pub.state >= CONN_STATE_LINGER) {
+        /* fall through */
     }
-    else if (cs->pub.state == CONN_STATE_READ_REQUEST_LINE) {
+    else {
+        if (cs->pub.state == CONN_STATE_READ_REQUEST_LINE
+            /* If we have an input filter which 'clogs' the input stream,
+             * like mod_ssl used to, lets just do the normal read from input
+             * filters, like the Worker MPM does. Filters that need to write
+             * where they would otherwise read, or read where they would
+             * otherwise write, should set the sense appropriately.
+             */
+             || c->clogging_input_filters) {
 read_request:
-        rc = ap_run_process_connection(c);
-        if (rc == DONE) {
-            rc = OK;
+            clogging = c->clogging_input_filters;
+            if (clogging) {
+                apr_atomic_inc32(&clogged_count);
+            }
+            rc = ap_run_process_connection(c);
+            if (clogging) {
+                apr_atomic_dec32(&clogged_count);
+            }
+            if (cs->pub.state > CONN_STATE_LINGER) {
+                cs->pub.state = CONN_STATE_LINGER;
+            }
+            if (rc == DONE) {
+                rc = OK;
+            }
         }
     }
     /*
@@ -1069,13 +1062,16 @@ read_request:
      * appropriately upon return, for event MPM to either:
      * - do lingering close (CONN_STATE_LINGER),
      * - wait for readability of the next request with respect to the keepalive
-     *   timeout (CONN_STATE_CHECK_REQUEST_LINE_READABLE),
+     *   timeout (state CONN_STATE_CHECK_REQUEST_LINE_READABLE),
+     * - wait for read/write-ability of the underlying socket with respect to
+     *   its timeout by setting c->clogging_input_filters to 1 and the sense
+     *   to CONN_SENSE_WANT_READ/WRITE (state CONN_STATE_WRITE_COMPLETION),
      * - keep flushing the output filters stack in nonblocking mode, and then
      *   if required wait for read/write-ability of the underlying socket with
-     *   respect to its own timeout (CONN_STATE_WRITE_COMPLETION); since write
+     *   respect to its own timeout (state CONN_STATE_WRITE_COMPLETION); since
      *   completion at some point may require reads (e.g. SSL_ERROR_WANT_READ),
-     *   an output filter can set the sense to CONN_SENSE_WANT_READ at any time
-     *   for event MPM to do the right thing,
+     *   an output filter can also set the sense to CONN_SENSE_WANT_READ at any
+     *   time for event MPM to do the right thing,
      * - suspend the connection (SUSPENDED) such that it now interracts with
      *   the MPM through suspend/resume_connection() hooks, and/or registered
      *   poll callbacks (PT_USER), and/or registered timed callbacks triggered
@@ -1089,7 +1085,8 @@ read_request:
      * while this was expected to do lingering close unconditionally with
      * worker or prefork MPMs for instance.
      */
-    if (rc != OK || (cs->pub.state != CONN_STATE_LINGER
+    if (rc != OK || (cs->pub.state >= CONN_STATE_NUM)
+                 || (cs->pub.state < CONN_STATE_LINGER
                      && cs->pub.state != CONN_STATE_WRITE_COMPLETION
                      && cs->pub.state != CONN_STATE_CHECK_REQUEST_LINE_READABLE
                      && cs->pub.state != CONN_STATE_SUSPENDED)) {
@@ -1137,11 +1134,12 @@ read_request:
 
             apr_thread_mutex_lock(timeout_mutex);
             TO_QUEUE_APPEND(cs->sc->wc_q, cs);
-            rc = apr_pollset_add(event_pollset, &cs->pfd);
-            if (rc != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rc)) {
+            rv = apr_pollset_add(event_pollset, &cs->pfd);
+            if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
+                AP_DEBUG_ASSERT(0);
                 TO_QUEUE_REMOVE(cs->sc->wc_q, cs);
                 apr_thread_mutex_unlock(timeout_mutex);
-                ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf, APLOGNO(03465)
+                ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03465)
                              "process_socket: apr_pollset_add failure for "
                              "write completion");
                 apr_socket_close(cs->pfd.desc.s);
@@ -1165,10 +1163,7 @@ read_request:
         }
     }
 
-    if (cs->pub.state == CONN_STATE_LINGER) {
-        start_lingering_close_blocking(cs);
-    }
-    else if (cs->pub.state == CONN_STATE_CHECK_REQUEST_LINE_READABLE) {
+    if (cs->pub.state == CONN_STATE_CHECK_REQUEST_LINE_READABLE) {
         ap_update_child_status(cs->sbh, SERVER_BUSY_KEEPALIVE, NULL);
 
         /* It greatly simplifies the logic to use a single timeout value per q
@@ -1186,22 +1181,35 @@ read_request:
         cs->pfd.reqevents = APR_POLLIN;
         apr_thread_mutex_lock(timeout_mutex);
         TO_QUEUE_APPEND(cs->sc->ka_q, cs);
-        rc = apr_pollset_add(event_pollset, &cs->pfd);
-        if (rc != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rc)) {
+        rv = apr_pollset_add(event_pollset, &cs->pfd);
+        if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
+            AP_DEBUG_ASSERT(0);
             TO_QUEUE_REMOVE(cs->sc->ka_q, cs);
             apr_thread_mutex_unlock(timeout_mutex);
-            ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf, APLOGNO(03093)
+            ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03093)
                          "process_socket: apr_pollset_add failure for "
                          "keep alive");
             apr_socket_close(cs->pfd.desc.s);
             ap_queue_info_push_pool(worker_queue_info, cs->p);
-            return;
         }
-        apr_thread_mutex_unlock(timeout_mutex);
+        else {
+            apr_thread_mutex_unlock(timeout_mutex);
+        }
+        return;
     }
-    else if (cs->pub.state == CONN_STATE_SUSPENDED) {
+
+    if (cs->pub.state == CONN_STATE_SUSPENDED) {
         apr_atomic_inc32(&suspended_count);
         notify_suspend(cs);
+        return;
+    }
+
+    if (cs->pub.state == CONN_STATE_LINGER) {
+        rc = start_lingering_close_blocking(cs);
+    }
+    if (rc == OK && (cs->pub.state == CONN_STATE_LINGER_NORMAL ||
+                     cs->pub.state == CONN_STATE_LINGER_SHORT)) {
+        process_lingering_close(cs);
     }
 }
 
@@ -1452,18 +1460,18 @@ static apr_status_t event_register_timed_callback(apr_time_t t,
 
 /*
  * Close socket and clean up if remote closed its end while we were in
- * lingering close.
- * Only to be called in the listener thread;
- * Pre-condition: cs is in one of the linger queues and in the pollset
+ * lingering close. Only to be called in the worker thread, and since it's
+ * in immediate call stack, we can afford a comfortable buffer size to
+ * consume data quickly.
  */
-static void process_lingering_close(event_conn_state_t *cs, const apr_pollfd_t *pfd)
+#define LINGERING_BUF_SIZE (32 * 1024)
+static void process_lingering_close(event_conn_state_t *cs)
 {
     apr_socket_t *csd = ap_get_conn_socket(cs->c);
-    char dummybuf[2048];
+    char dummybuf[LINGERING_BUF_SIZE];
     apr_size_t nbytes;
     apr_status_t rv;
     struct timeout_queue *q;
-    q = (cs->pub.state == CONN_STATE_LINGER_SHORT) ? short_linger_q : linger_q;
 
     /* socket is already in non-blocking state */
     do {
@@ -1471,22 +1479,32 @@ static void process_lingering_close(event_conn_state_t *cs, const apr_pollfd_t *
         rv = apr_socket_recv(csd, dummybuf, &nbytes);
     } while (rv == APR_SUCCESS);
 
-    if (APR_STATUS_IS_EAGAIN(rv)) {
+    if (!APR_STATUS_IS_EAGAIN(rv)) {
+        rv = apr_socket_close(csd);
+        AP_DEBUG_ASSERT(rv == APR_SUCCESS);
+        ap_queue_info_push_pool(worker_queue_info, cs->p);
         return;
     }
 
+    /* Re-queue the connection to come back when readable */
+    cs->pfd.reqevents = APR_POLLIN;
+    cs->pub.sense = CONN_SENSE_DEFAULT;
+    q = (cs->pub.state == CONN_STATE_LINGER_SHORT) ? short_linger_q : linger_q;
     apr_thread_mutex_lock(timeout_mutex);
-    TO_QUEUE_REMOVE(q, cs);
-    rv = apr_pollset_remove(event_pollset, pfd);
+    TO_QUEUE_APPEND(q, cs);
+    rv = apr_pollset_add(event_pollset, &cs->pfd);
+    if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
+        AP_DEBUG_ASSERT(0);
+        TO_QUEUE_REMOVE(q, cs);
+        apr_thread_mutex_unlock(timeout_mutex);
+        ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03092)
+                     "process_lingering_close: apr_pollset_add failure");
+        rv = apr_socket_close(cs->pfd.desc.s);
+        AP_DEBUG_ASSERT(rv == APR_SUCCESS);
+        ap_queue_info_push_pool(worker_queue_info, cs->p);
+        return;
+    }
     apr_thread_mutex_unlock(timeout_mutex);
-    AP_DEBUG_ASSERT(rv == APR_SUCCESS ||  APR_STATUS_IS_NOTFOUND(rv));
-
-    rv = apr_socket_close(csd);
-    AP_DEBUG_ASSERT(rv == APR_SUCCESS);
-
-    ap_queue_info_push_pool(worker_queue_info, cs->p);
-    if (dying)
-        ap_queue_interrupt_one(worker_queue);
 }
 
 /* call 'func' for all elements of 'q' with timeout less than 'timeout_time'.
@@ -1541,6 +1559,7 @@ static void process_timeout_queue(struct timeout_queue *q,
             last = cs;
             rv = apr_pollset_remove(event_pollset, &cs->pfd);
             if (rv != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rv)) {
+                AP_DEBUG_ASSERT(0);
                 ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, cs->c, APLOGNO(00473)
                               "apr_pollset_remove failed");
             }
@@ -1740,24 +1759,43 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
             if (pt->type == PT_CSD) {
                 /* one of the sockets is readable */
                 event_conn_state_t *cs = (event_conn_state_t *) pt->baton;
-                struct timeout_queue *remove_from_q = cs->sc->wc_q;
-                int blocking = 1;
+                struct timeout_queue *remove_from_q = NULL;
+                /* don't wait for a worker for a keepalive request or
+                 * lingering close processing. */
+                int blocking = 0;
 
                 switch (cs->pub.state) {
+                case CONN_STATE_WRITE_COMPLETION:
+                    remove_from_q = cs->sc->wc_q;
+                    blocking = 1;
+                    break;
+
                 case CONN_STATE_CHECK_REQUEST_LINE_READABLE:
                     cs->pub.state = CONN_STATE_READ_REQUEST_LINE;
                     remove_from_q = cs->sc->ka_q;
-                    /* don't wait for a worker for a keepalive request */
-                    blocking = 0;
-                    /* FALL THROUGH */
-                case CONN_STATE_WRITE_COMPLETION:
-                    get_worker(&have_idle_worker, blocking,
-                               &workers_were_busy);
+                    break;
+
+                case CONN_STATE_LINGER_NORMAL:
+                    remove_from_q = linger_q;
+                    break;
+
+                case CONN_STATE_LINGER_SHORT:
+                    remove_from_q = short_linger_q;
+                    break;
+
+                default:
+                    ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
+                                 ap_server_conf, APLOGNO(03096)
+                                 "event_loop: unexpected state %d",
+                                 cs->pub.state);
+                    ap_assert(0);
+                }
+
+                if (remove_from_q) {
                     apr_thread_mutex_lock(timeout_mutex);
                     TO_QUEUE_REMOVE(remove_from_q, cs);
                     rc = apr_pollset_remove(event_pollset, &cs->pfd);
                     apr_thread_mutex_unlock(timeout_mutex);
-
                     /*
                      * Some of the pollset backends, like KQueue or Epoll
                      * automagically remove the FD if the socket is closed,
@@ -1765,35 +1803,32 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
                      * and we still want to keep going
                      */
                     if (rc != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rc)) {
+                        AP_DEBUG_ASSERT(0);
                         ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
                                      APLOGNO(03094) "pollset remove failed");
                         start_lingering_close_nonblocking(cs);
                         break;
                     }
 
-                    /* If we didn't get a worker immediately for a keep-alive
-                     * request, we close the connection, so that the client can
-                     * re-connect to a different process.
+                    /* If we don't get a worker immediately (nonblocking), we
+                     * close the connection; the client can re-connect to a
+                     * different process for keepalive, and for lingering close
+                     * the connection will be reset so the choice is to favor
+                     * incoming/alive connections.
                      */
+                    get_worker(&have_idle_worker, blocking,
+                               &workers_were_busy);
                     if (!have_idle_worker) {
-                        start_lingering_close_nonblocking(cs);
+                        if (remove_from_q == cs->sc->ka_q) {
+                            start_lingering_close_nonblocking(cs);
+                        }
+                        else {
+                            stop_lingering_close(cs);
+                        }
                     }
                     else if (push2worker(cs, NULL, NULL) == APR_SUCCESS) {
                         have_idle_worker = 0;
                     }
-                    break;
-
-                case CONN_STATE_LINGER_NORMAL:
-                case CONN_STATE_LINGER_SHORT:
-                    process_lingering_close(cs, out_pfd);
-                    break;
-
-                default:
-                    ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
-                                 ap_server_conf, APLOGNO(03096)
-                                 "event_loop: unexpected state %d",
-                                 cs->pub.state);
-                    ap_assert(0);
                 }
             }
             else if (pt->type == PT_ACCEPT && !listeners_disabled()) {