]> granicus.if.org Git - apache/commitdiff
mpm_event: move lingering close "sucker" from the listener to worker(s).
authorYann Ylavic <ylavic@apache.org>
Sat, 3 Feb 2018 18:10:23 +0000 (18:10 +0000)
committerYann Ylavic <ylavic@apache.org>
Sat, 3 Feb 2018 18:10:23 +0000 (18:10 +0000)
This was the last non-constant time action performed by the listener thread.

It's now handled by the worker thread directly after entering lingering close,
which should directly address the cases when the socket is already closed
remotely at that time, hence avoid more scheduling (it may be the common case
for some scenarios).

And it's only if the above would need blocking (i.e. more data to suck) that
the socket is added to the pollset for the listener to re-schedule a worker
later when ready. If no worker is available at that time then the socket is
forcibly closed (similarly to what's done for keepalive connections in this
case).

Also, since process_lingering_close() is now called by a worker thread and
with almost no depth in the call stack, we can grow the size of the "suck"
buffer from 2K to 32K to potentially call recv() up to sixteen times less.

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1823047 13f79535-47bb-0310-9956-ffa450edef68

server/mpm/event/event.c

index 0f78cfe825ece3d5b02b45cfae8ddd70b0f2339f..facc1c0454b5ee920731d802de26b4e79350b2c5 100644 (file)
@@ -837,19 +837,18 @@ static void notify_resume(event_conn_state_t *cs, int cleanup)
  */
 static int start_lingering_close_blocking(event_conn_state_t *cs)
 {
-    apr_status_t rv;
-    struct timeout_queue *q;
     apr_socket_t *csd = cs->pfd.desc.s;
 
     if (ap_start_lingering_close(cs->c)) {
         notify_suspend(cs);
         apr_socket_close(csd);
         ap_queue_info_push_pool(worker_queue_info, cs->p);
-        return 0;
+        return DONE;
     }
 
 #ifdef AP_DEBUG
     {
+        apr_status_t rv;
         rv = apr_socket_timeout_set(csd, 0);
         AP_DEBUG_ASSERT(rv == APR_SUCCESS);
     }
@@ -864,34 +863,15 @@ static int start_lingering_close_blocking(event_conn_state_t *cs)
      * DoS attacks.
      */
     if (apr_table_get(cs->c->notes, "short-lingering-close")) {
-        q = short_linger_q;
         cs->pub.state = CONN_STATE_LINGER_SHORT;
     }
     else {
-        q = linger_q;
         cs->pub.state = CONN_STATE_LINGER_NORMAL;
     }
     apr_atomic_inc32(&lingering_count);
     notify_suspend(cs);
 
-    cs->pfd.reqevents = (
-            cs->pub.sense == CONN_SENSE_WANT_WRITE ? APR_POLLOUT :
-                    APR_POLLIN) | APR_POLLHUP | APR_POLLERR;
-    cs->pub.sense = CONN_SENSE_DEFAULT;
-    apr_thread_mutex_lock(timeout_mutex);
-    TO_QUEUE_APPEND(q, cs);
-    rv = apr_pollset_add(event_pollset, &cs->pfd);
-    if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
-        TO_QUEUE_REMOVE(q, cs);
-        apr_thread_mutex_unlock(timeout_mutex);
-        ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03092)
-                     "start_lingering_close: apr_pollset_add failure");
-        apr_socket_close(cs->pfd.desc.s);
-        ap_queue_info_push_pool(worker_queue_info, cs->p);
-        return 0;
-    }
-    apr_thread_mutex_unlock(timeout_mutex);
-    return 1;
+    return OK;
 }
 
 /*
@@ -926,7 +906,7 @@ static int stop_lingering_close(event_conn_state_t *cs)
 {
     apr_socket_t *csd = ap_get_conn_socket(cs->c);
     ap_log_error(APLOG_MARK, APLOG_TRACE4, 0, ap_server_conf,
-                 "socket reached timeout in lingering-close state");
+                 "socket abort in state %i", (int)cs->pub.state);
     abort_socket_nonblocking(csd);
     ap_queue_info_push_pool(worker_queue_info, cs->p);
     if (dying)
@@ -1000,6 +980,9 @@ static int event_post_read_request(request_rec *r)
     return OK;
 }
 
+/* Forward declare */
+static void process_lingering_close(event_conn_state_t *cs);
+
 /*
  * process one connection in the worker
  */
@@ -1009,7 +992,9 @@ static void process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * soc
 {
     conn_rec *c;
     long conn_id = ID_FROM_CHILD_THREAD(my_child_num, my_thread_num);
-    int rc;
+    int clogging = 0;
+    apr_status_t rv;
+    int rc = OK;
 
     if (cs == NULL) {           /* This is a new connection */
         listener_poll_type *pt = apr_pcalloc(p, sizeof(*pt));
@@ -1066,6 +1051,7 @@ static void process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * soc
         cs->pub.state = CONN_STATE_READ_REQUEST_LINE;
 
         cs->pub.sense = CONN_SENSE_DEFAULT;
+        rc = OK;
     }
     else {
         c = cs->c;
@@ -1076,30 +1062,37 @@ static void process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * soc
         c->id = conn_id;
     }
 
-    rc = OK;
-    if (c->aborted || cs->pub.state == CONN_STATE_LINGER) {
+    if (c->aborted) {
         /* do lingering close below */
         cs->pub.state = CONN_STATE_LINGER;
     }
-    else if (c->clogging_input_filters) {
-        /* Since we have an input filter which 'clogs' the input stream,
-         * like mod_ssl used to, lets just do the normal read from input
-         * filters, like the Worker MPM does. Filters that need to write
-         * where they would otherwise read, or read where they would
-         * otherwise write, should set the sense appropriately.
-         */
-        apr_atomic_inc32(&clogged_count);
-        rc = ap_run_process_connection(c);
-        apr_atomic_dec32(&clogged_count);
-        if (rc == DONE) {
-            rc = OK;
-        }
+    else if (cs->pub.state >= CONN_STATE_LINGER) {
+        /* fall through */
     }
-    else if (cs->pub.state == CONN_STATE_READ_REQUEST_LINE) {
+    else {
+        clogging = c->clogging_input_filters;
+        if (clogging || cs->pub.state == CONN_STATE_READ_REQUEST_LINE) {
+            /* If we have an input filter which 'clogs' the input stream,
+             * like mod_ssl used to, lets just do the normal read from input
+             * filters, like the Worker MPM does. Filters that need to write
+             * where they would otherwise read, or read where they would
+             * otherwise write, should set the sense appropriately.
+             */
+            if (clogging) {
+                apr_atomic_inc32(&clogged_count);
+            }
 read_request:
-        rc = ap_run_process_connection(c);
-        if (rc == DONE) {
-            rc = OK;
+            rc = ap_run_process_connection(c);
+            if (clogging) {
+                apr_atomic_dec32(&clogged_count);
+                clogging = 0;
+            }
+            if (cs->pub.state > CONN_STATE_LINGER) {
+                cs->pub.state = CONN_STATE_LINGER;
+            }
+            if (rc == DONE) {
+                rc = OK;
+            }
         }
     }
     /*
@@ -1107,13 +1100,16 @@ read_request:
      * appropriately upon return, for event MPM to either:
      * - do lingering close (CONN_STATE_LINGER),
      * - wait for readability of the next request with respect to the keepalive
-     *   timeout (CONN_STATE_CHECK_REQUEST_LINE_READABLE),
+     *   timeout (state CONN_STATE_CHECK_REQUEST_LINE_READABLE),
+     * - wait for read/write-ability of the underlying socket with respect to
+     *   its timeout by setting c->clogging_input_filters to 1 and the sense
+     *   to CONN_SENSE_WANT_READ/WRITE (state CONN_STATE_WRITE_COMPLETION),
      * - keep flushing the output filters stack in nonblocking mode, and then
      *   if required wait for read/write-ability of the underlying socket with
-     *   respect to its own timeout (CONN_STATE_WRITE_COMPLETION); since write
+     *   respect to its own timeout (state CONN_STATE_WRITE_COMPLETION); since
      *   completion at some point may require reads (e.g. SSL_ERROR_WANT_READ),
-     *   an output filter can set the sense to CONN_SENSE_WANT_READ at any time
-     *   for event MPM to do the right thing,
+     *   an output filter can also set the sense to CONN_SENSE_WANT_READ at any
+     *   time for event MPM to do the right thing,
      * - suspend the connection (SUSPENDED) such that it now interracts with
      *   the MPM through suspend/resume_connection() hooks, and/or registered
      *   poll callbacks (PT_USER), and/or registered timed callbacks triggered
@@ -1127,7 +1123,7 @@ read_request:
      * while this was expected to do lingering close unconditionally with
      * worker or prefork MPMs for instance.
      */
-    if (rc != OK || (cs->pub.state != CONN_STATE_LINGER
+    if (rc != OK || (cs->pub.state < CONN_STATE_LINGER
                      && cs->pub.state != CONN_STATE_WRITE_COMPLETION
                      && cs->pub.state != CONN_STATE_CHECK_REQUEST_LINE_READABLE
                      && cs->pub.state != CONN_STATE_SUSPENDED)) {
@@ -1168,11 +1164,12 @@ read_request:
 
             apr_thread_mutex_lock(timeout_mutex);
             TO_QUEUE_APPEND(cs->sc->wc_q, cs);
-            rc = apr_pollset_add(event_pollset, &cs->pfd);
-            if (rc != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rc)) {
+            rv = apr_pollset_add(event_pollset, &cs->pfd);
+            if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
+                AP_DEBUG_ASSERT(0);
                 TO_QUEUE_REMOVE(cs->sc->wc_q, cs);
                 apr_thread_mutex_unlock(timeout_mutex);
-                ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf, APLOGNO(03465)
+                ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03465)
                              "process_socket: apr_pollset_add failure for "
                              "write completion");
                 apr_socket_close(cs->pfd.desc.s);
@@ -1198,10 +1195,7 @@ read_request:
         }
     }
 
-    if (cs->pub.state == CONN_STATE_LINGER) {
-        start_lingering_close_blocking(cs);
-    }
-    else if (cs->pub.state == CONN_STATE_CHECK_REQUEST_LINE_READABLE) {
+    if (cs->pub.state == CONN_STATE_CHECK_REQUEST_LINE_READABLE) {
         ap_update_child_status(cs->sbh, SERVER_BUSY_KEEPALIVE, NULL);
 
         /* It greatly simplifies the logic to use a single timeout value per q
@@ -1219,23 +1213,36 @@ read_request:
         cs->pfd.reqevents = APR_POLLIN;
         apr_thread_mutex_lock(timeout_mutex);
         TO_QUEUE_APPEND(cs->sc->ka_q, cs);
-        rc = apr_pollset_add(event_pollset, &cs->pfd);
-        if (rc != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rc)) {
+        rv = apr_pollset_add(event_pollset, &cs->pfd);
+        if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
+            AP_DEBUG_ASSERT(0);
             TO_QUEUE_REMOVE(cs->sc->ka_q, cs);
             apr_thread_mutex_unlock(timeout_mutex);
-            ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf, APLOGNO(03093)
+            ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03093)
                          "process_socket: apr_pollset_add failure for "
                          "keep alive");
             apr_socket_close(cs->pfd.desc.s);
             ap_queue_info_push_pool(worker_queue_info, cs->p);
-            return;
         }
-        apr_thread_mutex_unlock(timeout_mutex);
+        else {
+            apr_thread_mutex_unlock(timeout_mutex);
+        }
+        return;
     }
-    else if (cs->pub.state == CONN_STATE_SUSPENDED) {
+
+    if (cs->pub.state == CONN_STATE_SUSPENDED) {
         cs->c->suspended_baton = cs;
         apr_atomic_inc32(&suspended_count);
         notify_suspend(cs);
+        return;
+    }
+
+    if (cs->pub.state == CONN_STATE_LINGER) {
+        rc = start_lingering_close_blocking(cs);
+    }
+    if (rc == OK && (cs->pub.state == CONN_STATE_LINGER_NORMAL ||
+                     cs->pub.state == CONN_STATE_LINGER_SHORT)) {
+        process_lingering_close(cs);
     }
 }
 
@@ -1654,18 +1661,18 @@ static apr_status_t event_unregister_poll_callback(apr_array_header_t *pfds)
 
 /*
  * Close socket and clean up if remote closed its end while we were in
- * lingering close.
- * Only to be called in the listener thread;
- * Pre-condition: cs is in one of the linger queues and in the pollset
+ * lingering close. Only to be called in the worker thread, and since it's
+ * in immediate call stack, we can afford a comfortable buffer size to
+ * consume data quickly.
  */
-static void process_lingering_close(event_conn_state_t *cs, const apr_pollfd_t *pfd)
+#define LINGERING_BUF_SIZE (32 * 1024)
+static void process_lingering_close(event_conn_state_t *cs)
 {
     apr_socket_t *csd = ap_get_conn_socket(cs->c);
-    char dummybuf[2048];
+    char dummybuf[LINGERING_BUF_SIZE];
     apr_size_t nbytes;
     apr_status_t rv;
     struct timeout_queue *q;
-    q = (cs->pub.state == CONN_STATE_LINGER_SHORT) ? short_linger_q : linger_q;
 
     /* socket is already in non-blocking state */
     do {
@@ -1673,22 +1680,32 @@ static void process_lingering_close(event_conn_state_t *cs, const apr_pollfd_t *
         rv = apr_socket_recv(csd, dummybuf, &nbytes);
     } while (rv == APR_SUCCESS);
 
-    if (APR_STATUS_IS_EAGAIN(rv)) {
+    if (!APR_STATUS_IS_EAGAIN(rv)) {
+        rv = apr_socket_close(csd);
+        AP_DEBUG_ASSERT(rv == APR_SUCCESS);
+        ap_queue_info_push_pool(worker_queue_info, cs->p);
         return;
     }
 
+    /* Re-queue the connection to come back when readable */
+    cs->pfd.reqevents = APR_POLLIN;
+    cs->pub.sense = CONN_SENSE_DEFAULT;
+    q = (cs->pub.state == CONN_STATE_LINGER_SHORT) ? short_linger_q : linger_q;
     apr_thread_mutex_lock(timeout_mutex);
-    TO_QUEUE_REMOVE(q, cs);
-    rv = apr_pollset_remove(event_pollset, pfd);
+    TO_QUEUE_APPEND(q, cs);
+    rv = apr_pollset_add(event_pollset, &cs->pfd);
+    if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
+        AP_DEBUG_ASSERT(0);
+        TO_QUEUE_REMOVE(q, cs);
+        apr_thread_mutex_unlock(timeout_mutex);
+        ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03092)
+                     "process_lingering_close: apr_pollset_add failure");
+        rv = apr_socket_close(cs->pfd.desc.s);
+        AP_DEBUG_ASSERT(rv == APR_SUCCESS);
+        ap_queue_info_push_pool(worker_queue_info, cs->p);
+        return;
+    }
     apr_thread_mutex_unlock(timeout_mutex);
-    AP_DEBUG_ASSERT(rv == APR_SUCCESS ||  APR_STATUS_IS_NOTFOUND(rv));
-
-    rv = apr_socket_close(csd);
-    AP_DEBUG_ASSERT(rv == APR_SUCCESS);
-
-    ap_queue_info_push_pool(worker_queue_info, cs->p);
-    if (dying)
-        ap_queue_interrupt_one(worker_queue);
 }
 
 /* call 'func' for all elements of 'q' with timeout less than 'timeout_time'.
@@ -1743,6 +1760,7 @@ static void process_timeout_queue(struct timeout_queue *q,
             last = cs;
             rv = apr_pollset_remove(event_pollset, &cs->pfd);
             if (rv != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rv)) {
+                AP_DEBUG_ASSERT(0);
                 ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, cs->c, APLOGNO(00473)
                               "apr_pollset_remove failed");
             }
@@ -1967,24 +1985,43 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
             if (pt->type == PT_CSD) {
                 /* one of the sockets is readable */
                 event_conn_state_t *cs = (event_conn_state_t *) pt->baton;
-                struct timeout_queue *remove_from_q = cs->sc->wc_q;
-                int blocking = 1;
+                struct timeout_queue *remove_from_q = NULL;
+                /* don't wait for a worker for a keepalive request or
+                 * lingering close processing. */
+                int blocking = 0;
 
                 switch (cs->pub.state) {
+                case CONN_STATE_WRITE_COMPLETION:
+                    remove_from_q = cs->sc->wc_q;
+                    blocking = 1;
+                    break;
+
                 case CONN_STATE_CHECK_REQUEST_LINE_READABLE:
                     cs->pub.state = CONN_STATE_READ_REQUEST_LINE;
                     remove_from_q = cs->sc->ka_q;
-                    /* don't wait for a worker for a keepalive request */
-                    blocking = 0;
-                    /* FALL THROUGH */
-                case CONN_STATE_WRITE_COMPLETION:
-                    get_worker(&have_idle_worker, blocking,
-                               &workers_were_busy);
+                    break;
+
+                case CONN_STATE_LINGER_NORMAL:
+                    remove_from_q = linger_q;
+                    break;
+
+                case CONN_STATE_LINGER_SHORT:
+                    remove_from_q = short_linger_q;
+                    break;
+
+                default:
+                    ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
+                                 ap_server_conf, APLOGNO(03096)
+                                 "event_loop: unexpected state %d",
+                                 cs->pub.state);
+                    ap_assert(0);
+                }
+
+                if (remove_from_q) {
                     apr_thread_mutex_lock(timeout_mutex);
                     TO_QUEUE_REMOVE(remove_from_q, cs);
                     rc = apr_pollset_remove(event_pollset, &cs->pfd);
                     apr_thread_mutex_unlock(timeout_mutex);
-
                     /*
                      * Some of the pollset backends, like KQueue or Epoll
                      * automagically remove the FD if the socket is closed,
@@ -1992,35 +2029,32 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
                      * and we still want to keep going
                      */
                     if (rc != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rc)) {
+                        AP_DEBUG_ASSERT(0);
                         ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
                                      APLOGNO(03094) "pollset remove failed");
                         start_lingering_close_nonblocking(cs);
                         break;
                     }
 
-                    /* If we didn't get a worker immediately for a keep-alive
-                     * request, we close the connection, so that the client can
-                     * re-connect to a different process.
+                    /* If we don't get a worker immediately (nonblocking), we
+                     * close the connection; the client can re-connect to a
+                     * different process for keepalive, and for lingering close
+                     * the connection will be reset so the choice is to favor
+                     * incoming/alive connections.
                      */
+                    get_worker(&have_idle_worker, blocking,
+                               &workers_were_busy);
                     if (!have_idle_worker) {
-                        start_lingering_close_nonblocking(cs);
+                        if (remove_from_q == cs->sc->ka_q) {
+                            start_lingering_close_nonblocking(cs);
+                        }
+                        else {
+                            stop_lingering_close(cs);
+                        }
                     }
                     else if (push2worker(cs, NULL, NULL) == APR_SUCCESS) {
                         have_idle_worker = 0;
                     }
-                    break;
-
-                case CONN_STATE_LINGER_NORMAL:
-                case CONN_STATE_LINGER_SHORT:
-                    process_lingering_close(cs, out_pfd);
-                    break;
-
-                default:
-                    ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
-                                 ap_server_conf, APLOGNO(03096)
-                                 "event_loop: unexpected state %d",
-                                 cs->pub.state);
-                    ap_assert(0);
                 }
             }
             else if (pt->type == PT_ACCEPT && !listeners_disabled()) {