]> granicus.if.org Git - apache/commitdiff
Revert Paul's lock free circular queue changes done on MPM-event.
authorStefan Fritsch <sf@apache.org>
Fri, 24 Feb 2012 21:27:13 +0000 (21:27 +0000)
committerStefan Fritsch <sf@apache.org>
Fri, 24 Feb 2012 21:27:13 +0000 (21:27 +0000)
The changes have been moved to branches/mpm-event-optimization.

This puts trunk's MPM-event back to the state of 2.4.x, except for serf
support.

The following commits are reverted:
 ------------------------------------------------------------------------
r1203404 | trawick | 2011-11-17 23:48:35 +0100

event no longer requires APR_POLLSET_THREADSAFE, or any
other non-standard feature (all but OS/2 have APR_POLLSET_WAKEABLE)

config-foo: don't require thread-safe pollset in order to
            build event

event.c: don't stress APR_ENOTIMPL in apr_pollset_create errors;
         just give the standard "check system or user limits"
         message

 ------------------------------------------------------------------------
r1203858 | trawick | 2011-11-18 22:39:33 +0100

follow up to r1202257 -- perform normal wakeup processing when
APR_EINTR is seen from apr_pollset_poll(), with expectation that
it was triggered by apr_pollset_wakeup()

 ------------------------------------------------------------------------
r1202395 | trawick | 2011-11-15 20:38:31 +0100

spellcheck r1202258

 ------------------------------------------------------------------------
r1202329 | pquerna | 2011-11-15 18:47:33 +0100

Calculate the power of two size of the buffer before allocating it.

Spotted by: Rüdiger Plüm

 ------------------------------------------------------------------------
r1202259 | pquerna | 2011-11-15 16:52:59 +0100

Use apr_pollset_wakeup to ensure that the listener thread will process most enqueue'd events quickly

 ------------------------------------------------------------------------
r1202258 | pquerna | 2011-11-15 16:52:00 +0100

Because the pollset is now only mutated from the event thread, we no longer need the APR_POLLSET_THREADSAFE flag for the event_pollset

 ------------------------------------------------------------------------
r1202257 | pquerna | 2011-11-15 16:51:03 +0100

Create a new lock free circular queue, and use it in the EventMPM to remove the timeout mutex that was wrapping both timeout queue operations and pollset operations.

 ------------------------------------------------------------------------
r1202256 | pquerna | 2011-11-15 16:50:09 +0100

Instead of disabling the listening sockets from the pollset when under load, just stop calling the accept call, but leave the sockets in the pollset.

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1293426 13f79535-47bb-0310-9956-ffa450edef68

server/mpm/event/config.m4
server/mpm/event/config3.m4
server/mpm/event/equeue.c [deleted file]
server/mpm/event/equeue.h [deleted file]
server/mpm/event/event.c

index 5308af8f2989bb9c837696a97f37fd97f19bf914..351f1acf4bd7b3d3a4087821a57e161a6634cd10 100644 (file)
@@ -5,6 +5,8 @@ elif test $ac_cv_define_APR_HAS_THREADS != yes; then
     AC_MSG_RESULT(no - APR does not support threads)
 elif test $have_threaded_sig_graceful != yes; then
     AC_MSG_RESULT(no - SIG_GRACEFUL cannot be used with a threaded MPM)
+elif test $ac_cv_have_threadsafe_pollset != yes; then
+    AC_MSG_RESULT(no - APR_POLLSET_THREADSAFE is not supported)
 else
     AC_MSG_RESULT(yes)
     APACHE_MPM_SUPPORTED(event, yes, yes)
index c0bf202b5d1c10b6fe6f93bc31264542e5654a2c..5c96fe3c30df811f9249b464dc5214f4ca295435 100644 (file)
@@ -6,6 +6,6 @@ if test "$ac_cv_serf" = yes ; then
 fi
 APACHE_SUBST(MOD_MPM_EVENT_LDADD)
 
-APACHE_MPM_MODULE(event, $enable_mpm_event, event.lo fdqueue.lo equeue.lo pod.lo,[
+APACHE_MPM_MODULE(event, $enable_mpm_event, event.lo fdqueue.lo pod.lo,[
     AC_CHECK_FUNCS(pthread_kill)
 ], , [\$(MOD_MPM_EVENT_LDADD)])
diff --git a/server/mpm/event/equeue.c b/server/mpm/event/equeue.c
deleted file mode 100644 (file)
index 4750ab1..0000000
+++ /dev/null
@@ -1,125 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "equeue.h"
-
-#include <apr_atomic.h>
-#include <sched.h>
-
-struct ap_equeue_t {
-    apr_uint32_t nelem;
-    apr_size_t elem_size;
-    uint8_t *bytes;
-    volatile apr_uint32_t writeCount;
-    volatile apr_uint32_t readCount;
-};
-
-
-static APR_INLINE apr_uint32_t count_to_index(ap_equeue_t *eq, apr_uint32_t count)
-{
-    return (count & (eq->nelem - 1));
-}
-
-static APR_INLINE void* index_to_bytes(ap_equeue_t *eq, apr_uint32_t idx)
-{
-    apr_size_t offset = idx * eq->elem_size;
-    return (void*)&eq->bytes[offset];
-}
-
-static APR_INLINE apr_uint32_t nearest_power(apr_uint32_t num)
-{
-    apr_uint32_t n = 1;
-    while (n < num) {
-        n <<= 1;
-    }
-
-    return n;
-}
-
-#if 0
-static void dump_queue(ap_equeue_t *eq)
-{
-    apr_uint32_t i;
-
-    fprintf(stderr, "dumping %p\n", eq);
-    fprintf(stderr, "  nelem:   %u\n", eq->nelem);
-    fprintf(stderr, "  esize:   %"APR_SIZE_T_FMT"\n", eq->elem_size);
-    fprintf(stderr, "  wcnt:    %u\n", eq->writeCount);
-    fprintf(stderr, "  rcnt:    %u\n", eq->writeCount);
-    fprintf(stderr, "  bytes:   %p\n", eq->bytes);
-    for (i = 0; i < eq->nelem; i++) {
-        fprintf(stderr, "    [%u] = %p\n", i, index_to_bytes(eq, i));
-    }
-
-    fprintf(stderr, "\n");
-    fflush(stderr);
-}
-#endif
-
-apr_status_t
-ap_equeue_create(apr_pool_t *p, apr_uint32_t nelem, apr_size_t elem_size, ap_equeue_t **eqout)
-{
-    ap_equeue_t *eq;
-
-    *eqout = NULL;
-
-    eq = apr_palloc(p, sizeof(ap_equeue_t));
-    eq->nelem = nearest_power(nelem);
-    eq->bytes = apr_palloc(p, eq->nelem * elem_size);
-    eq->elem_size = elem_size;
-    eq->writeCount = 0;
-    eq->readCount = 0;
-    *eqout = eq;
-
-    return APR_SUCCESS;
-}
-
-void *
-ap_equeue_reader_next(ap_equeue_t *eq)
-{
-    if (apr_atomic_read32(&eq->writeCount) == eq->readCount) {
-        return NULL;
-    }
-    else {
-        apr_uint32_t idx = count_to_index(eq, apr_atomic_inc32(&eq->readCount));
-        return index_to_bytes(eq, idx);
-    }
-}
-
-void *
-ap_equeue_writer_value(ap_equeue_t *eq)
-{
-    apr_uint32_t idx;
-
-    while (1) {
-        apr_uint32_t readCount = apr_atomic_read32(&eq->readCount);
-
-        if (count_to_index(eq, eq->writeCount + 1) != count_to_index(eq, readCount)) {
-            break;
-        }
-        /* TODO: research if sched_yield is even worth doing  */
-        sched_yield();
-    }
-
-    idx = count_to_index(eq, eq->writeCount);
-    return index_to_bytes(eq, idx);
-}
-
-
-void ap_equeue_writer_onward(ap_equeue_t *eq)
-{
-    apr_atomic_inc32(&eq->writeCount);
-}
diff --git a/server/mpm/event/equeue.h b/server/mpm/event/equeue.h
deleted file mode 100644 (file)
index 9738b00..0000000
+++ /dev/null
@@ -1,50 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef _event_mpm_equeue_h_
-#define _event_mpm_equeue_h_
-
-#include "httpd.h"
-
-typedef struct ap_equeue_t ap_equeue_t;
-
-apr_status_t
-ap_equeue_create(apr_pool_t *p,
-                 unsigned int nelem,
-                 apr_size_t elem_size,
-                 ap_equeue_t **eqout);
-
-
-/**
- * Current value of the reader, returns NULL if the reader is caught up
- * with the writer
- */
-void* ap_equeue_reader_next(ap_equeue_t *eq);
-
-/**
- * Returns pointer to next available write slot.  May block
- * in a spin lock if none are available.
- */
-void* ap_equeue_writer_value(ap_equeue_t *eq);
-
-/**
- * Move the write position up one, making the previously 
- * editted value available to the reader.
- */
-void ap_equeue_writer_onward(ap_equeue_t *eq);
-
-
-#endif
index 982e8e5d1e72981eb36faddb378e3e284b16de4d..d3aba1e4ce0a0bcce23a51a72c86cd45e4c1833c 100644 (file)
@@ -97,8 +97,6 @@
 #include <limits.h>             /* for INT_MAX */
 
 
-#include "equeue.h"
-
 #if HAVE_SERF
 #include "mod_serf.h"
 #include "serf.h"
@@ -185,12 +183,7 @@ static fd_queue_t *worker_queue;
 static fd_queue_info_t *worker_queue_info;
 static int mpm_state = AP_MPMQ_STARTING;
 
-typedef enum {
-    TIMEOUT_WRITE_COMPLETION,
-    TIMEOUT_KEEPALIVE,
-    TIMEOUT_LINGER,
-    TIMEOUT_SHORT_LINGER
-} timeout_type_e;
+static apr_thread_mutex_t *timeout_mutex;
 
 struct event_conn_state_t {
     /** APR_RING of expiration timeouts */
@@ -208,15 +201,8 @@ struct event_conn_state_t {
     /** public parts of the connection state */
     conn_state_t pub;
 };
-
-typedef struct pollset_op_t {
-    timeout_type_e timeout_type;
-    event_conn_state_t *cs;
-    const char *tag;
-} pollset_op_t;
-
-
 APR_RING_HEAD(timeout_head_t, event_conn_state_t);
+
 struct timeout_queue {
     struct timeout_head_t head;
     int count;
@@ -388,7 +374,6 @@ static apr_os_thread_t *listener_os_thread;
  * perform a non-graceful (forced) shutdown of the server.
  */
 static apr_socket_t **worker_sockets;
-static ap_equeue_t **worker_equeues;
 
 static void disable_listensocks(int process_slot)
 {
@@ -770,50 +755,20 @@ static void set_signals(void)
 #endif
 }
 
-static void process_pollop(pollset_op_t *op)
-{
-    apr_status_t rv;
-    event_conn_state_t *cs = op->cs;
-
-    switch (op->timeout_type) {
-    case TIMEOUT_WRITE_COMPLETION:
-        TO_QUEUE_APPEND(write_completion_q, cs);
-        break;
-    case TIMEOUT_KEEPALIVE:
-        TO_QUEUE_APPEND(keepalive_q, cs);
-        break;
-    case TIMEOUT_LINGER:
-        TO_QUEUE_APPEND(linger_q, cs);
-        break;
-    case TIMEOUT_SHORT_LINGER:
-        TO_QUEUE_APPEND(short_linger_q, cs);
-        break;
-    }
-
-    rv = apr_pollset_add(event_pollset, &op->cs->pfd);
-
-    if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
-        ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(00467)
-                     "%s: apr_pollset_add failure", op->tag);
-    }
-}
-
 /*
  * close our side of the connection
  * Pre-condition: cs is not in any timeout queue and not in the pollset,
  *                timeout_mutex is not locked
  * return: 0 if connection is fully closed,
  *         1 if connection is lingering
- * may be called by listener or by worker thread.
- * the eq may be null if called from the listener thread,
- * and the pollset operations are done directly by this function.
+ * may be called by listener or by worker thread
  */
-static int start_lingering_close(event_conn_state_t *cs, ap_equeue_t *eq)
+static int start_lingering_close(event_conn_state_t *cs)
 {
     apr_status_t rv;
 
     cs->c->sbh = NULL;  /* prevent scoreboard updates from the listener 
-                         * worker will loop around soon and set SERVER_READY
+                         * worker will loop around and set SERVER_READY soon
                          */
 
     if (ap_start_lingering_close(cs->c)) {
@@ -823,15 +778,7 @@ static int start_lingering_close(event_conn_state_t *cs, ap_equeue_t *eq)
     }
     else {
         apr_socket_t *csd = ap_get_conn_socket(cs->c);
-        pollset_op_t localv;
-        pollset_op_t *v;
-
-        if (eq) {
-            v = ap_equeue_writer_value(eq);
-        }
-        else {
-            v = &localv;
-        }
+        struct timeout_queue *q;
 
 #ifdef AP_DEBUG
         {
@@ -849,26 +796,30 @@ static int start_lingering_close(event_conn_state_t *cs, ap_equeue_t *eq)
         if (apr_table_get(cs->c->notes, "short-lingering-close")) {
             cs->expiration_time =
                 apr_time_now() + apr_time_from_sec(SECONDS_TO_LINGER);
-            v->timeout_type = TIMEOUT_SHORT_LINGER;
-            v->tag = "start_lingering_close(short)";
+            q = &short_linger_q;
             cs->pub.state = CONN_STATE_LINGER_SHORT;
         }
         else {
             cs->expiration_time =
                 apr_time_now() + apr_time_from_sec(MAX_SECS_TO_LINGER);
-            v->timeout_type = TIMEOUT_LINGER;
-            v->tag = "start_lingering_close(normal)";
+            q = &linger_q;
             cs->pub.state = CONN_STATE_LINGER_NORMAL;
         }
-
+        apr_thread_mutex_lock(timeout_mutex);
+        TO_QUEUE_APPEND(*q, cs);
         cs->pfd.reqevents = APR_POLLIN | APR_POLLHUP | APR_POLLERR;
-        v->cs = cs;
-        if (eq != NULL) {
-            ap_equeue_writer_onward(eq);
-            apr_pollset_wakeup(event_pollset);
-        }
-        else {
-            process_pollop(v);
+        rv = apr_pollset_add(event_pollset, &cs->pfd);
+        apr_thread_mutex_unlock(timeout_mutex);
+        if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
+            ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf,
+                         "start_lingering_close: apr_pollset_add failure");
+            apr_thread_mutex_lock(timeout_mutex);
+            TO_QUEUE_REMOVE(*q, cs);
+            apr_thread_mutex_unlock(timeout_mutex);
+            apr_socket_close(cs->pfd.desc.s);
+            apr_pool_clear(cs->p);
+            ap_push_pool(worker_queue_info, cs->p);
+            return 0;
         }
     }
     return 1;
@@ -880,7 +831,7 @@ static int start_lingering_close(event_conn_state_t *cs, ap_equeue_t *eq)
  * Pre-condition: cs is not in any timeout queue and not in the pollset
  * return: irrelevant (need same prototype as start_lingering_close)
  */
-static int stop_lingering_close(event_conn_state_t *cs, ap_equeue_t *eq)
+static int stop_lingering_close(event_conn_state_t *cs)
 {
     apr_status_t rv;
     apr_socket_t *csd = ap_get_conn_socket(cs->c);
@@ -902,9 +853,7 @@ static int stop_lingering_close(event_conn_state_t *cs, ap_equeue_t *eq)
  *         0 if it is still open and waiting for some event
  */
 static int process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * sock,
-                          event_conn_state_t * cs,
-                          ap_equeue_t *eq,
-                          int my_child_num,
+                          event_conn_state_t * cs, int my_child_num,
                           int my_thread_num)
 {
     conn_rec *c;
@@ -938,6 +887,7 @@ static int process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * sock
         pt->type = PT_CSD;
         pt->baton = cs;
         cs->pfd.client_data = pt;
+        TO_QUEUE_ELEM_INIT(cs);
 
         ap_update_vhost_given_ip(c);
 
@@ -1014,17 +964,12 @@ read_request:
              * Set a write timeout for this connection, and let the
              * event thread poll for writeability.
              */
-            pollset_op_t *v = ap_equeue_writer_value(eq);
-
             cs->expiration_time = ap_server_conf->timeout + apr_time_now();
+            apr_thread_mutex_lock(timeout_mutex);
+            TO_QUEUE_APPEND(write_completion_q, cs);
             cs->pfd.reqevents = APR_POLLOUT | APR_POLLHUP | APR_POLLERR;
-
-            v->cs = cs;
-            v->timeout_type = TIMEOUT_WRITE_COMPLETION;
-            v->tag = "process_socket(write_completion)";
-
-            ap_equeue_writer_onward(eq);
-            apr_pollset_wakeup(event_pollset);
+            rc = apr_pollset_add(event_pollset, &cs->pfd);
+            apr_thread_mutex_unlock(timeout_mutex);
             return 1;
         }
         else if (c->keepalive != AP_CONN_KEEPALIVE || c->aborted ||
@@ -1041,12 +986,11 @@ read_request:
     }
 
     if (cs->pub.state == CONN_STATE_LINGER) {
-        if (!start_lingering_close(cs, eq)) {
+        if (!start_lingering_close(cs))
             return 0;
-        }
     }
     else if (cs->pub.state == CONN_STATE_CHECK_REQUEST_LINE_READABLE) {
-        pollset_op_t *v;
+        apr_status_t rc;
 
         /* It greatly simplifies the logic to use a single timeout value here
          * because the new element can just be added to the end of the list and
@@ -1058,15 +1002,19 @@ read_request:
          */
         cs->expiration_time = ap_server_conf->keep_alive_timeout +
                               apr_time_now();
+        apr_thread_mutex_lock(timeout_mutex);
+        TO_QUEUE_APPEND(keepalive_q, cs);
 
         /* Add work to pollset. */
-        v = ap_equeue_writer_value(eq);
-        v->timeout_type = TIMEOUT_KEEPALIVE;
-        v->cs = cs;
         cs->pfd.reqevents = APR_POLLIN;
-        v->tag = "process_socket(keepalive)";
-        ap_equeue_writer_onward(eq);
-        apr_pollset_wakeup(event_pollset);
+        rc = apr_pollset_add(event_pollset, &cs->pfd);
+        apr_thread_mutex_unlock(timeout_mutex);
+
+        if (rc != APR_SUCCESS) {
+            ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
+                         "process_socket: apr_pollset_add failure");
+            AP_DEBUG_ASSERT(rc == APR_SUCCESS);
+        }
     }
     return 1;
 }
@@ -1354,6 +1302,7 @@ static void process_lingering_close(event_conn_state_t *cs, const apr_pollfd_t *
         return;
     }
 
+    apr_thread_mutex_lock(timeout_mutex);
     rv = apr_pollset_remove(event_pollset, pfd);
     AP_DEBUG_ASSERT(rv == APR_SUCCESS);
 
@@ -1361,6 +1310,7 @@ static void process_lingering_close(event_conn_state_t *cs, const apr_pollfd_t *
     AP_DEBUG_ASSERT(rv == APR_SUCCESS);
 
     TO_QUEUE_REMOVE(*q, cs);
+    apr_thread_mutex_unlock(timeout_mutex);
     TO_QUEUE_ELEM_INIT(cs);
 
     apr_pool_clear(cs->p);
@@ -1373,7 +1323,7 @@ static void process_lingering_close(event_conn_state_t *cs, const apr_pollfd_t *
  */
 static void process_timeout_queue(struct timeout_queue *q,
                                   apr_time_t timeout_time,
-                                  int (*func)(event_conn_state_t *, ap_equeue_t *eq))
+                                  int (*func)(event_conn_state_t *))
 {
     int count = 0;
     event_conn_state_t *first, *cs, *last;
@@ -1401,13 +1351,15 @@ static void process_timeout_queue(struct timeout_queue *q,
     APR_RING_UNSPLICE(first, last, timeout_list);
     AP_DEBUG_ASSERT(q->count >= count);
     q->count -= count;
+    apr_thread_mutex_unlock(timeout_mutex);
     while (count) {
         cs = APR_RING_NEXT(first, timeout_list);
         TO_QUEUE_ELEM_INIT(first);
-        func(first, NULL);
+        func(first);
         first = cs;
         count--;
     }
+    apr_thread_mutex_lock(timeout_mutex);
 }
 
 static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
@@ -1474,12 +1426,14 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
             /* trace log status every second */
             if (now - last_log > apr_time_from_msec(1000)) {
                 last_log = now;
+                apr_thread_mutex_lock(timeout_mutex);
                 ap_log_error(APLOG_MARK, APLOG_TRACE6, 0, ap_server_conf,
                              "connections: %d (write-completion: %d "
                              "keep-alive: %d lingering: %d)",
                              connection_count, write_completion_q.count,
                              keepalive_q.count,
                              linger_q.count + short_linger_q.count);
+                apr_thread_mutex_unlock(timeout_mutex);
             }
         }
 
@@ -1505,13 +1459,16 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
         }
 #endif
         rc = apr_pollset_poll(event_pollset, timeout_interval, &num, &out_pfd);
-        if (rc != APR_SUCCESS
-            && !APR_STATUS_IS_EINTR(rc)
-            && !APR_STATUS_IS_TIMEUP(rc)) {
-            ap_log_error(APLOG_MARK, APLOG_CRIT, rc, ap_server_conf,
-                         "apr_pollset_poll failed.  Attempting to "
-                         "shutdown process gracefully");
-            signal_threads(ST_GRACEFUL);
+        if (rc != APR_SUCCESS) {
+            if (APR_STATUS_IS_EINTR(rc)) {
+                continue;
+            }
+            if (!APR_STATUS_IS_TIMEUP(rc)) {
+                ap_log_error(APLOG_MARK, APLOG_CRIT, rc, ap_server_conf,
+                             "apr_pollset_poll failed.  Attempting to "
+                             "shutdown process gracefully");
+                signal_threads(ST_GRACEFUL);
+            }
         }
 
         if (listener_may_exit) {
@@ -1544,7 +1501,7 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
                 /* one of the sockets is readable */
                 struct timeout_queue *remove_from_q = &write_completion_q;
                 int blocking = 1;
-                cs = (event_conn_state_t *)pt->baton;
+                cs = (event_conn_state_t *) pt->baton;
                 switch (cs->pub.state) {
                 case CONN_STATE_CHECK_REQUEST_LINE_READABLE:
                     cs->pub.state = CONN_STATE_READ_REQUEST_LINE;
@@ -1555,6 +1512,7 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
                 case CONN_STATE_WRITE_COMPLETION:
                     get_worker(&have_idle_worker, blocking,
                                &workers_were_busy);
+                    apr_thread_mutex_lock(timeout_mutex);
                     TO_QUEUE_REMOVE(*remove_from_q, cs);
                     rc = apr_pollset_remove(event_pollset, &cs->pfd);
 
@@ -1567,17 +1525,19 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
                     if (rc != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rc)) {
                         ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
                                      "pollset remove failed");
-                        start_lingering_close(cs, NULL);
+                        apr_thread_mutex_unlock(timeout_mutex);
+                        start_lingering_close(cs);
                         break;
                     }
 
+                    apr_thread_mutex_unlock(timeout_mutex);
                     TO_QUEUE_ELEM_INIT(cs);
                     /* If we didn't get a worker immediately for a keep-alive
                      * request, we close the connection, so that the client can
                      * re-connect to a different process.
                      */
                     if (!have_idle_worker) {
-                        start_lingering_close(cs, NULL);
+                        start_lingering_close(cs);
                         break;
                     }
                     rc = push2worker(out_pfd, event_pollset);
@@ -1602,35 +1562,35 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
                 }
             }
             else if (pt->type == PT_ACCEPT) {
-                int skip_accept = 0;
-                int connection_count_local = connection_count;
-
                 /* A Listener Socket is ready for an accept() */
                 if (workers_were_busy) {
-                    skip_accept = 1;
+                    if (!listeners_disabled)
+                        disable_listensocks(process_slot);
+                    listeners_disabled = 1;
                     ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf,
                                  "All workers busy, not accepting new conns"
                                  "in this process");
                 }
-                else if (listeners_disabled) {
-                    listeners_disabled = 0;
-                    enable_listensocks(process_slot);
-                }
-                else if (connection_count_local > threads_per_child
+                else if (apr_atomic_read32(&connection_count) > threads_per_child
                          + ap_queue_info_get_idlers(worker_queue_info) *
                            worker_factor / WORKER_FACTOR_SCALE)
                 {
-                    skip_accept = 1;
+                    if (!listeners_disabled)
+                        disable_listensocks(process_slot);
                     ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf,
                                  "Too many open connections (%u), "
                                  "not accepting new conns in this process",
-                                 connection_count_local);
+                                 apr_atomic_read32(&connection_count));
                     ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
                                  "Idle workers: %u",
                                  ap_queue_info_get_idlers(worker_queue_info));
+                    listeners_disabled = 1;
                 }
-
-                if (skip_accept == 0) {
+                else if (listeners_disabled) {
+                    listeners_disabled = 0;
+                    enable_listensocks(process_slot);
+                }
+                if (!listeners_disabled) {
                     lr = (ap_listen_rec *) pt->baton;
                     ap_pop_pool(&ptrans, worker_queue_info);
 
@@ -1701,20 +1661,6 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
             num--;
         }                   /* while for processing poll */
 
-        {
-            /* TODO: break out to separate function */
-            int i;
-
-            for (i = 0; i < threads_per_child; i++) {
-                ap_equeue_t *eq = worker_equeues[i];
-                pollset_op_t *op = NULL;
-
-                while ((op = ap_equeue_reader_next(eq)) != NULL) {
-                    process_pollop(op);
-                }
-            }
-        }
-
         /* XXX possible optimization: stash the current time for use as
          * r->request_time for new requests
          */
@@ -1725,6 +1671,7 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
             timeout_time = now + TIMEOUT_FUDGE_FACTOR;
 
             /* handle timed out sockets */
+            apr_thread_mutex_lock(timeout_mutex);
 
             /* Step 1: keepalive timeouts */
             /* If all workers are busy, we kill older keep-alive connections so that they
@@ -1754,6 +1701,7 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
             ps->write_completion = write_completion_q.count;
             ps->lingering_close = linger_q.count + short_linger_q.count;
             ps->keep_alive = keepalive_q.count;
+            apr_thread_mutex_unlock(timeout_mutex);
 
             ps->connections = apr_atomic_read32(&connection_count);
             /* XXX: should count CONN_STATE_SUSPENDED and set ps->suspended */
@@ -1797,7 +1745,6 @@ static void *APR_THREAD_FUNC worker_thread(apr_thread_t * thd, void *dummy)
     apr_status_t rv;
     int is_idle = 0;
     timer_event_t *te = NULL;
-    ap_equeue_t *eq = worker_equeues[thread_slot];
 
     free(ti);
 
@@ -1870,7 +1817,7 @@ static void *APR_THREAD_FUNC worker_thread(apr_thread_t * thd, void *dummy)
         else {
             is_idle = 0;
             worker_sockets[thread_slot] = csd;
-            rv = process_socket(thd, ptrans, csd, cs, eq, process_slot, thread_slot);
+            rv = process_socket(thd, ptrans, csd, cs, process_slot, thread_slot);
             if (!rv) {
                 requests_this_child--;
             }
@@ -1967,32 +1914,33 @@ static void *APR_THREAD_FUNC start_threads(apr_thread_t * thd, void *dummy)
         clean_child_exit(APEXIT_CHILDFATAL);
     }
 
+    /* Create the timeout mutex and main pollset before the listener
+     * thread starts.
+     */
+    rv = apr_thread_mutex_create(&timeout_mutex, APR_THREAD_MUTEX_DEFAULT,
+                                 pchild);
+    if (rv != APR_SUCCESS) {
+        ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf,
+                     "creation of the timeout mutex failed.");
+        clean_child_exit(APEXIT_CHILDFATAL);
+    }
+
     /* Create the main pollset */
     rv = apr_pollset_create(&event_pollset,
                             threads_per_child, /* XXX don't we need more, to handle
                                                 * connections in K-A or lingering
                                                 * close?
                                                 */
-                            pchild, APR_POLLSET_WAKEABLE|APR_POLLSET_NOCOPY);
+                            pchild, APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY);
     if (rv != APR_SUCCESS) {
         ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf,
-                     "apr_pollset_create failed; check system or user limits");
+                     "apr_pollset_create with Thread Safety failed.");
         clean_child_exit(APEXIT_CHILDFATAL);
     }
 
     worker_sockets = apr_pcalloc(pchild, threads_per_child
                                  * sizeof(apr_socket_t *));
 
-    worker_equeues = apr_palloc(pchild, threads_per_child * sizeof(ap_equeue_t*));
-
-    for (i = 0; i < threads_per_child; i++) {
-        ap_equeue_t* eq = NULL;
-        /* TODO: research/test optimal size of queue here */
-        ap_equeue_create(pchild, 16, sizeof(pollset_op_t), &eq);
-        /* same as thread ID */
-        worker_equeues[i] = eq;
-    }
-
     loops = prev_threads_created = 0;
     while (1) {
         /* threads_per_child does not include the listener thread */
@@ -2889,10 +2837,12 @@ static int event_pre_config(apr_pool_t * pconf, apr_pool_t * plog,
     ++retained->module_loads;
     if (retained->module_loads == 2) {
         rv = apr_pollset_create(&event_pollset, 1, plog,
-                                APR_POLLSET_WAKEABLE|APR_POLLSET_NOCOPY);
+                                APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY);
         if (rv != APR_SUCCESS) {
             ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL, APLOGNO(00495)
-                         "apr_pollset_create failed; check system or user limits");
+                         "Couldn't create a Thread Safe Pollset. "
+                         "Is it supported on your platform?"
+                         "Also check system or user limits!");
             return HTTP_INTERNAL_SERVER_ERROR;
         }
         apr_pollset_destroy(event_pollset);