]> granicus.if.org Git - apache/commitdiff
Merge r1802875 from trunk:
authorYann Ylavic <ylavic@apache.org>
Fri, 22 Sep 2017 10:55:22 +0000 (10:55 +0000)
committerYann Ylavic <ylavic@apache.org>
Fri, 22 Sep 2017 10:55:22 +0000 (10:55 +0000)
event: Avoid possible blocking in the listener thread when shutting down
connections. PR 60956.

start_lingering_close_nonblocking() now puts connections in defer_linger_chain
which is emptied by any worker thread (all atomically) after its usual work,
hence any possibly blocking flush and lingering close run outside the listener.

The listener may create a dedicated worker if it fills defer_linger_chain or
while it's not empty, calling push2worker with a NULL cs.

The state machine in process_socket() is slighly modified to be able to enter
with CONN_STATE_LINGER directly w/o clogging_input_filters to interfer.

New abort_socket_nonblocking() allows to reset connections when nonblocking is
required and we can't do much about the connection anymore, nor we want the
system to linger on its own after close().

Many thanks to Stefan Priebe for his heavy testing on many event's changes!

Submitted by: ylavic
Reviewed by: ylavic, jim, icing

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/branches/2.4.x@1809299 13f79535-47bb-0310-9956-ffa450edef68

CHANGES
STATUS
server/mpm/event/event.c

diff --git a/CHANGES b/CHANGES
index 4ecb2748e33bcedbc79fb3629b67136de20c6172..4bc2011ff800425d673f44e4b8ee9e6c111113e6 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -6,6 +6,9 @@ Changes with Apache 2.4.28
      main configuration file (httpd.conf) to register HTTP methods before the
      .htaccess files.  [Yann Ylavic]
 
+  *) event: Avoid possible blocking in the listener thread when shutting down
+     connections. PR 60956.  [Yann Ylavic]
+
   *) mod_speling: Don't embed referer data in a link in error page.
      PR 38923 [Nick Kew]
 
diff --git a/STATUS b/STATUS
index a904fcad04e480a28a7cc267e1dee0566d44daa2..4053fc785efe7f38ca643f0688fd8271951809b5 100644 (file)
--- a/STATUS
+++ b/STATUS
@@ -115,13 +115,6 @@ RELEASE SHOWSTOPPERS:
 PATCHES ACCEPTED TO BACKPORT FROM TRUNK:
   [ start all new proposals below, under PATCHES PROPOSED. ]
 
-  *) event: Avoid possible blocking in the listener thread when shutting down
-     connections. PR 60956.
-     trunk patch: http://svn.apache.org/r1802875
-     2.4.x patch: trunks works (module CHANGES)
-                  svn merge -c 1802875 ^/httpd/httpd/trunk .
-     +1: ylavic, jim, icing
-
 
 PATCHES PROPOSED TO BACKPORT FROM TRUNK:
   [ New proposals should be added at the end of the list ]
index 4742895334fa88fee02861355a1f1ecf11547cac..891ed3b313075785a963f320e65351442ff933f3 100644 (file)
@@ -212,6 +212,12 @@ static apr_pollfd_t *listener_pollfd;
  */
 static apr_pollset_t *event_pollset;
 
+/*
+ * The chain of connections to be shutdown by a worker thread (deferred),
+ * linked list updated atomically.
+ */
+static event_conn_state_t *volatile defer_linger_chain;
+
 struct event_conn_state_t {
     /** APR_RING of expiration timeouts */
     APR_RING_ENTRY(event_conn_state_t) timeout_list;
@@ -236,7 +242,10 @@ struct event_conn_state_t {
     apr_pollfd_t pfd;
     /** public parts of the connection state */
     conn_state_t pub;
+    /** chaining in defer_linger_chain */
+    struct event_conn_state_t *chain;
 };
+
 APR_RING_HEAD(timeout_head_t, event_conn_state_t);
 
 struct timeout_queue {
@@ -474,14 +483,55 @@ static void enable_listensocks(int process_slot)
     ap_scoreboard_image->parent[process_slot].not_accepting = 0;
 }
 
+static void abort_socket_nonblocking(apr_socket_t *csd)
+{
+    apr_status_t rv;
+    apr_socket_timeout_set(csd, 0);
+#if defined(SOL_SOCKET) && defined(SO_LINGER)
+    /* This socket is over now, and we don't want to block nor linger
+     * anymore, so reset it. A normal close could still linger in the
+     * system, while RST is fast, nonblocking, and what the peer will
+     * get if it sends us further data anyway.
+     */
+    {
+        apr_os_sock_t osd = -1;
+        struct linger opt;
+        opt.l_onoff = 1;
+        opt.l_linger = 0; /* zero timeout is RST */
+        apr_os_sock_get(&osd, csd);
+        setsockopt(osd, SOL_SOCKET, SO_LINGER, (void *)&opt, sizeof opt);
+    }
+#endif
+    rv = apr_socket_close(csd);
+    if (rv != APR_SUCCESS) {
+        ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(00468)
+                     "error closing socket");
+        AP_DEBUG_ASSERT(0);
+    }
+}
+
 static void close_worker_sockets(void)
 {
     int i;
     for (i = 0; i < threads_per_child; i++) {
-        if (worker_sockets[i]) {
-            apr_socket_close(worker_sockets[i]);
+        apr_socket_t *csd = worker_sockets[i];
+        if (csd) {
             worker_sockets[i] = NULL;
+            abort_socket_nonblocking(csd);
+        }
+    }
+    for (;;) {
+        event_conn_state_t *cs = defer_linger_chain;
+        if (!cs) {
+            break;
+        }
+        if (apr_atomic_casptr((void *)&defer_linger_chain, cs->chain,
+                              cs) != cs) {
+            /* Race lost, try again */
+            continue;
         }
+        cs->chain = NULL;
+        abort_socket_nonblocking(cs->pfd.desc.s);
     }
 }
 
@@ -693,11 +743,27 @@ static void notify_resume(event_conn_state_t *cs, ap_sb_handle_t *sbh)
     ap_run_resume_connection(cs->c, cs->r);
 }
 
-static int start_lingering_close_common(event_conn_state_t *cs, int in_worker)
+/*
+ * Close our side of the connection, flushing data to the client first.
+ * Pre-condition: cs is not in any timeout queue and not in the pollset,
+ *                timeout_mutex is not locked
+ * return: 0 if connection is fully closed,
+ *         1 if connection is lingering
+ * May only be called by worker thread.
+ */
+static int start_lingering_close_blocking(event_conn_state_t *cs)
 {
     apr_status_t rv;
     struct timeout_queue *q;
     apr_socket_t *csd = cs->pfd.desc.s;
+
+    if (ap_start_lingering_close(cs->c)) {
+        notify_suspend(cs);
+        apr_socket_close(csd);
+        ap_push_pool(worker_queue_info, cs->p);
+        return 0;
+    }
+
 #ifdef AP_DEBUG
     {
         rv = apr_socket_timeout_set(csd, 0);
@@ -706,6 +772,7 @@ static int start_lingering_close_common(event_conn_state_t *cs, int in_worker)
 #else
     apr_socket_timeout_set(csd, 0);
 #endif
+
     cs->queue_timestamp = apr_time_now();
     /*
      * If some module requested a shortened waiting period, only wait for
@@ -721,12 +788,8 @@ static int start_lingering_close_common(event_conn_state_t *cs, int in_worker)
         cs->pub.state = CONN_STATE_LINGER_NORMAL;
     }
     apr_atomic_inc32(&lingering_count);
-    if (in_worker) { 
-        notify_suspend(cs);
-    }
-    else {
-        cs->c->sbh = NULL;
-    }
+    notify_suspend(cs);
+
     cs->pfd.reqevents = (
             cs->pub.sense == CONN_SENSE_WANT_WRITE ? APR_POLLOUT :
                     APR_POLLIN) | APR_POLLHUP | APR_POLLERR;
@@ -749,49 +812,25 @@ static int start_lingering_close_common(event_conn_state_t *cs, int in_worker)
 }
 
 /*
- * Close our side of the connection, flushing data to the client first.
- * Pre-condition: cs is not in any timeout queue and not in the pollset,
- *                timeout_mutex is not locked
- * return: 0 if connection is fully closed,
- *         1 if connection is lingering
- * May only be called by worker thread.
- */
-static int start_lingering_close_blocking(event_conn_state_t *cs)
-{
-    if (ap_start_lingering_close(cs->c)) {
-        notify_suspend(cs);
-        ap_push_pool(worker_queue_info, cs->p);
-        return 0;
-    }
-    return start_lingering_close_common(cs, 1);
-}
-
-/*
- * Close our side of the connection, NOT flushing data to the client.
- * This should only be called if there has been an error or if we know
- * that our send buffers are empty.
+ * Defer flush and close of the connection by adding it to defer_linger_chain,
+ * for a worker to grab it and do the job (should that be blocking).
  * Pre-condition: cs is not in any timeout queue and not in the pollset,
  *                timeout_mutex is not locked
- * return: 0 if connection is fully closed,
- *         1 if connection is lingering
- * may be called by listener thread
+ * return: 1 connection is alive (but aside and about to linger)
+ * May be called by listener thread.
  */
 static int start_lingering_close_nonblocking(event_conn_state_t *cs)
 {
-    conn_rec *c = cs->c;
-    apr_socket_t *csd = cs->pfd.desc.s;
-
-    if (ap_prep_lingering_close(c)
-        || c->aborted
-        || ap_shutdown_conn(c, 0) != APR_SUCCESS || c->aborted
-        || apr_socket_shutdown(csd, APR_SHUTDOWN_WRITE) != APR_SUCCESS) {
-        apr_socket_close(csd);
-        ap_push_pool(worker_queue_info, cs->p);
-        if (dying)
-            ap_queue_interrupt_one(worker_queue);
-        return 0;
+    event_conn_state_t *chain;
+    for (;;) {
+        cs->chain = chain = defer_linger_chain;
+        if (apr_atomic_casptr((void *)&defer_linger_chain, cs,
+                              chain) != chain) {
+            /* Race lost, try again */
+            continue;
+        }
+        return 1;
     }
-    return start_lingering_close_common(cs, 0);
 }
 
 /*
@@ -802,15 +841,10 @@ static int start_lingering_close_nonblocking(event_conn_state_t *cs)
  */
 static int stop_lingering_close(event_conn_state_t *cs)
 {
-    apr_status_t rv;
     apr_socket_t *csd = ap_get_conn_socket(cs->c);
     ap_log_error(APLOG_MARK, APLOG_TRACE4, 0, ap_server_conf,
                  "socket reached timeout in lingering-close state");
-    rv = apr_socket_close(csd);
-    if (rv != APR_SUCCESS) {
-        ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(00468) "error closing socket");
-        AP_DEBUG_ASSERT(0);
-    }
+    abort_socket_nonblocking(csd);
     ap_push_pool(worker_queue_info, cs->p);
     if (dying)
         ap_queue_interrupt_one(worker_queue);
@@ -959,9 +993,16 @@ static void process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * soc
         c->current_thread = thd;
         /* Subsequent request on a conn, and thread number is part of ID */
         c->id = conn_id;
+
+        if (c->aborted) {
+            cs->pub.state = CONN_STATE_LINGER;
+        }
     }
 
-    if (c->clogging_input_filters && !c->aborted) {
+    if (cs->pub.state == CONN_STATE_LINGER) {
+        /* do lingering close below */
+    }
+    else if (c->clogging_input_filters) {
         /* Since we have an input filter which 'clogs' the input stream,
          * like mod_ssl used to, lets just do the normal read from input
          * filters, like the Worker MPM does. Filters that need to write
@@ -975,20 +1016,14 @@ static void process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * soc
         }
         apr_atomic_dec32(&clogged_count);
     }
-
+    else if (cs->pub.state == CONN_STATE_READ_REQUEST_LINE) {
 read_request:
-    if (cs->pub.state == CONN_STATE_READ_REQUEST_LINE) {
-        if (!c->aborted) {
-            ap_run_process_connection(c);
+        ap_run_process_connection(c);
 
-            /* state will be updated upon return
-             * fall thru to either wait for readability/timeout or
-             * do lingering close
-             */
-        }
-        else {
-            cs->pub.state = CONN_STATE_LINGER;
-        }
+        /* state will be updated upon return
+         * fall thru to either wait for readability/timeout or
+         * do lingering close
+         */
     }
 
     if (cs->pub.state == CONN_STATE_WRITE_COMPLETION) {
@@ -1032,7 +1067,7 @@ read_request:
             return;
         }
         else if (c->keepalive != AP_CONN_KEEPALIVE || c->aborted ||
-            listener_may_exit) {
+                 listener_may_exit) {
             cs->pub.state = CONN_STATE_LINGER;
         }
         else if (c->data_in_input_filters) {
@@ -1177,26 +1212,32 @@ static apr_status_t push_timer2worker(timer_event_t* te)
 }
 
 /*
- * Pre-condition: pfd->cs is neither in pollset nor timeout queue
+ * Pre-condition: cs is neither in event_pollset nor a timeout queue
  * this function may only be called by the listener
  */
-static apr_status_t push2worker(const apr_pollfd_t * pfd,
-                                apr_pollset_t * pollset)
+static apr_status_t push2worker(event_conn_state_t *cs, apr_socket_t *csd,
+                                apr_pool_t *ptrans)
 {
-    listener_poll_type *pt = (listener_poll_type *) pfd->client_data;
-    event_conn_state_t *cs = (event_conn_state_t *) pt->baton;
     apr_status_t rc;
 
-    rc = ap_queue_push(worker_queue, cs->pfd.desc.s, cs, cs->p);
+    if (cs) {
+        csd = cs->pfd.desc.s;
+        ptrans = cs->p;
+    }
+    rc = ap_queue_push(worker_queue, csd, cs, ptrans);
     if (rc != APR_SUCCESS) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rc, ap_server_conf, APLOGNO(00471)
+                     "push2worker: ap_queue_push failed");
         /* trash the connection; we couldn't queue the connected
          * socket to a worker
          */
-        apr_bucket_alloc_destroy(cs->bucket_alloc);
-        apr_socket_close(cs->pfd.desc.s);
-        ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
-                     ap_server_conf, APLOGNO(00471) "push2worker: ap_queue_push failed");
-        ap_push_pool(worker_queue_info, cs->p);
+        if (csd) {
+            abort_socket_nonblocking(csd);
+        }
+        if (ptrans) {
+            ap_push_pool(worker_queue_info, ptrans);
+        }
+        signal_threads(ST_GRACEFUL);
     }
 
     return rc;
@@ -1632,6 +1673,8 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
                     TO_QUEUE_REMOVE(remove_from_q, cs);
                     rc = apr_pollset_remove(event_pollset, &cs->pfd);
                     apr_thread_mutex_unlock(timeout_mutex);
+                    TO_QUEUE_ELEM_INIT(cs);
+
                     /*
                      * Some of the pollset backends, like KQueue or Epoll
                      * automagically remove the FD if the socket is closed,
@@ -1645,29 +1688,23 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
                         break;
                     }
 
-                    TO_QUEUE_ELEM_INIT(cs);
                     /* If we didn't get a worker immediately for a keep-alive
                      * request, we close the connection, so that the client can
                      * re-connect to a different process.
                      */
                     if (!have_idle_worker) {
                         start_lingering_close_nonblocking(cs);
-                        break;
-                    }
-                    rc = push2worker(out_pfd, event_pollset);
-                    if (rc != APR_SUCCESS) {
-                        ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
-                                     ap_server_conf, APLOGNO(03095)
-                                     "push2worker failed");
                     }
-                    else {
+                    else if (push2worker(cs, NULL, NULL) == APR_SUCCESS) {
                         have_idle_worker = 0;
                     }
                     break;
+
                 case CONN_STATE_LINGER_NORMAL:
                 case CONN_STATE_LINGER_SHORT:
                     process_lingering_close(cs, out_pfd);
                     break;
+
                 default:
                     ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
                                  ap_server_conf, APLOGNO(03096)
@@ -1746,18 +1783,7 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
 
                     if (csd != NULL) {
                         conns_this_child--;
-                        rc = ap_queue_push(worker_queue, csd, NULL, ptrans);
-                        if (rc != APR_SUCCESS) {
-                            /* trash the connection; we couldn't queue the connected
-                             * socket to a worker
-                             */
-                            apr_socket_close(csd);
-                            ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
-                                         ap_server_conf, APLOGNO(03098)
-                                         "ap_queue_push failed");
-                            ap_push_pool(worker_queue_info, ptrans);
-                        }
-                        else {
+                        if (push2worker(NULL, csd, ptrans) == APR_SUCCESS) {
                             have_idle_worker = 0;
                         }
                     }
@@ -1822,6 +1848,24 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
             ps->keep_alive = 0;
         }
 
+        /* If there are some lingering closes to defer (to a worker), schedule
+         * them now. We might wakeup a worker spuriously if another one empties
+         * defer_linger_chain in the meantime, but there also may be no active
+         * or all busy workers for an undefined time.  In any case a deferred
+         * lingering close can't starve if we do that here since the chain is
+         * filled only above in the listener and it's emptied only in the
+         * worker(s); thus a NULL here means it will stay so while the listener
+         * waits (possibly indefinitely) in poll().
+         */
+        if (defer_linger_chain) {
+            get_worker(&have_idle_worker, 0, &workers_were_busy);
+            if (have_idle_worker
+                    && defer_linger_chain /* re-test */
+                    && push2worker(NULL, NULL, NULL) == APR_SUCCESS) {
+                have_idle_worker = 0;
+            }
+        }
+
         if (listeners_disabled && !workers_were_busy
             && (int)apr_atomic_read32(&connection_count)
                - (int)apr_atomic_read32(&lingering_count)
@@ -1965,8 +2009,35 @@ static void *APR_THREAD_FUNC worker_thread(apr_thread_t * thd, void *dummy)
         }
         else {
             is_idle = 0;
-            worker_sockets[thread_slot] = csd;
-            process_socket(thd, ptrans, csd, cs, process_slot, thread_slot);
+            if (csd != NULL) {
+                worker_sockets[thread_slot] = csd;
+                process_socket(thd, ptrans, csd, cs, process_slot, thread_slot);
+                worker_sockets[thread_slot] = NULL;
+            }
+        }
+
+        /* If there are deferred lingering closes, handle them now. */
+        while (!workers_may_exit) {
+            cs = defer_linger_chain;
+            if (!cs) {
+                break;
+            }
+            if (apr_atomic_casptr((void *)&defer_linger_chain, cs->chain,
+                                  cs) != cs) {
+                /* Race lost, try again */
+                continue;
+            }
+            cs->chain = NULL;
+
+            worker_sockets[thread_slot] = csd = cs->pfd.desc.s;
+#ifdef AP_DEBUG
+            rv = apr_socket_timeout_set(csd, SECONDS_TO_LINGER);
+            AP_DEBUG_ASSERT(rv == APR_SUCCESS);
+#else
+            apr_socket_timeout_set(csd, SECONDS_TO_LINGER);
+#endif
+            cs->pub.state = CONN_STATE_LINGER;
+            process_socket(thd, cs->p, csd, cs, process_slot, thread_slot);
             worker_sockets[thread_slot] = NULL;
         }
     }
@@ -3224,6 +3295,7 @@ static int event_pre_config(apr_pool_t * pconf, apr_pool_t * plog,
     active_daemons_limit = server_limit;
     threads_per_child = DEFAULT_THREADS_PER_CHILD;
     max_workers = active_daemons_limit * threads_per_child;
+    defer_linger_chain = NULL;
     had_healthy_child = 0;
     ap_extended_status = 0;