]> granicus.if.org Git - apache/commitdiff
Replaced the mutex around the idle worker stack with
authorBrian Pane <brianp@apache.org>
Fri, 19 Apr 2002 06:33:08 +0000 (06:33 +0000)
committerBrian Pane <brianp@apache.org>
Fri, 19 Apr 2002 06:33:08 +0000 (06:33 +0000)
atomic compare-and-swap loops

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@94705 13f79535-47bb-0310-9956-ffa450edef68

server/mpm/experimental/leader/leader.c

index 5407f2d8754078b89d919613d31c68a3c3225a45..a3fe24e50f842b21ec71e99bf0b50d9c9a70d6f8 100644 (file)
@@ -72,6 +72,7 @@
 #include "apr_thread_cond.h"
 #include "apr_thread_mutex.h"
 #include "apr_proc_mutex.h"
+#include "apr_atomic.h"
 #define APR_WANT_STRFUNC
 #include "apr_want.h"
 
@@ -172,6 +173,8 @@ static int requests_this_child;
 static int num_listensocks = 0;
 static int resource_shortage = 0;
 
+typedef struct worker_wakeup_info worker_wakeup_info;
+
 /* The structure used to pass unique initialization info to each thread */
 typedef struct {
     int pid;
@@ -243,102 +246,147 @@ static apr_proc_mutex_t *accept_mutex;
 
 /* Structure used to wake up an idle worker thread
  */
-typedef struct {
+struct worker_wakeup_info {
     apr_thread_cond_t *cond;
     apr_thread_mutex_t *mutex;
-} worker_wakeup_info;
+    apr_uint32_t next; /* index into worker_wakeups array,
+                        * used to build a linked list
+                        */
+};
+
+static worker_wakeup_info *worker_wakeup_create(apr_pool_t *pool)
+{
+    apr_status_t rv;
+    worker_wakeup_info *wakeup;
+
+    wakeup = (worker_wakeup_info *)apr_palloc(pool, sizeof(*wakeup));
+    if ((rv = apr_thread_cond_create(&wakeup->cond, pool)) != APR_SUCCESS) {
+        return NULL;
+    }
+    if ((rv = apr_thread_mutex_create(&wakeup->mutex, APR_THREAD_MUTEX_DEFAULT,
+                                      pool)) != APR_SUCCESS) {
+        return NULL;
+    }
+    /* The wakeup's mutex will be unlocked automatically when
+     * the worker blocks on the condition variable
+     */
+    apr_thread_mutex_lock(wakeup->mutex);
+    return wakeup;
+}
+
 
 /* Structure used to hold a stack of idle worker threads 
  */
 typedef struct {
-    apr_thread_mutex_t *mutex;
-    int no_listener;
-    worker_wakeup_info **stack;
-    apr_size_t nelts;
-    apr_size_t nalloc;
+    /* 'state' consists of several fields concatenated into a
+     * single 32-bit int for use with the apr_atomic_cas() API:
+     *   state & STACK_FIRST  is the thread ID of the first thread
+     *                        in a linked list of idle threads
+     *   state & STACK_TERMINATED  indicates whether the proc is shutting down
+     *   state & STACK_NO_LISTENER indicates whether the process has
+     *                             no current listener thread
+     */
+    apr_uint32_t state;
 } worker_stack;
 
+#define STACK_FIRST  0xffff
+#define STACK_LIST_END  0xffff
+#define STACK_TERMINATED 0x10000
+#define STACK_NO_LISTENER 0x20000
+
+static worker_wakeup_info **worker_wakeups = NULL;
+
 static worker_stack* worker_stack_create(apr_pool_t *pool, apr_size_t max)
 {
-    apr_status_t rv;
     worker_stack *stack = (worker_stack *)apr_palloc(pool, sizeof(*stack));
-
-    if ((rv = apr_thread_mutex_create(&stack->mutex, APR_THREAD_MUTEX_DEFAULT,
-                                      pool)) != APR_SUCCESS) {
-        return NULL;
-    }
-    stack->no_listener = 1;
-    stack->nelts = 0;
-    stack->nalloc = max;
-    stack->stack =
-        (worker_wakeup_info **)apr_palloc(pool, stack->nalloc *
-                                          sizeof(worker_wakeup_info *));
+    stack->state = STACK_NO_LISTENER | STACK_LIST_END;
     return stack;
 }
 
 static apr_status_t worker_stack_wait(worker_stack *stack,
-                                      worker_wakeup_info *wakeup)
+                                      apr_uint32_t worker_id)
 {
-    apr_status_t rv;
-    if ((rv = apr_thread_mutex_lock(stack->mutex)) != APR_SUCCESS) {
-        return rv;
-    }
-    if (stack->no_listener) {
-        /* this thread should become the new listener immediately */
-        stack->no_listener = 0;
-        if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) {
-            return rv;
-        }
-        return APR_SUCCESS;
-    }
-    else {
-        /* push this thread onto the stack of idle workers, and block
-         * on the condition variable until awoken
-         */
-        if (stack->nelts == stack->nalloc) {
-            return APR_ENOSPC;
+    worker_wakeup_info *wakeup = worker_wakeups[worker_id];
+
+    while (1) {
+        apr_uint32_t state = stack->state;
+        if (state & (STACK_TERMINATED | STACK_NO_LISTENER)) {
+            if (state & STACK_TERMINATED) {
+                return APR_EINVAL;
+            }
+            if (apr_atomic_cas(&(stack->state), STACK_LIST_END, state) !=
+                state) {
+                continue;
+            }
+            else {
+                return APR_SUCCESS;
+            }
         }
-        stack->stack[stack->nelts++] = wakeup;
-        if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) {
-            return rv;
+        wakeup->next = state;
+        if (apr_atomic_cas(&(stack->state), worker_id, state) != state) {
+            continue;
         }
-        if ((rv = apr_thread_cond_wait(wakeup->cond, wakeup->mutex)) !=
-            APR_SUCCESS) {
-            return rv;
+        else {
+            return apr_thread_cond_wait(wakeup->cond, wakeup->mutex);
         }
-        return APR_SUCCESS;
-    }
+    }    
 }
 
 static apr_status_t worker_stack_awaken_next(worker_stack *stack)
 {
-    apr_status_t rv;
-    if ((rv = apr_thread_mutex_lock(stack->mutex)) != APR_SUCCESS) {
-        return rv;
-    }
-    if (stack->nelts) {
-        worker_wakeup_info *wakeup = stack->stack[--stack->nelts];
-        if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) {
-            return rv;
-        }
-        /* Acquire and release the idle worker's mutex to ensure
-         * that it's actually waiting on its condition variable
-         */
-        if ((rv = apr_thread_mutex_lock(wakeup->mutex)) != APR_SUCCESS) {
-            return rv;
+
+    while (1) {
+        apr_uint32_t state = stack->state;
+        apr_uint32_t first = state & STACK_FIRST;
+        if (first == STACK_LIST_END) {
+            if (apr_atomic_cas(&(stack->state), state | STACK_NO_LISTENER,
+                               state) != state) {
+                continue;
+            }
+            else {
+                return APR_SUCCESS;
+            }
         }
-        if ((rv = apr_thread_mutex_unlock(wakeup->mutex)) != APR_SUCCESS) {
-            return rv;
+        else {
+            worker_wakeup_info *wakeup = worker_wakeups[first];
+            apr_uint32_t new_state = state & ~STACK_FIRST;
+            new_state |= wakeup->next;
+            if (apr_atomic_cas(&(stack->state), new_state, state) != state) {
+                continue;
+            }
+            else {
+                /* Acquire and release the idle worker's mutex to ensure
+                 * that it's actually waiting on its condition variable
+                 */
+                apr_status_t rv;
+                if ((rv = apr_thread_mutex_lock(wakeup->mutex)) !=
+                    APR_SUCCESS) {
+                    return rv;
+                }
+                if ((rv = apr_thread_mutex_unlock(wakeup->mutex)) !=
+                    APR_SUCCESS) {
+                    return rv;
+                }
+                return apr_thread_cond_signal(wakeup->cond);
+            }
         }
-        apr_thread_mutex_unlock(wakeup->mutex);
-        if ((rv = apr_thread_cond_signal(wakeup->cond)) != APR_SUCCESS) {
-            apr_thread_mutex_unlock(stack->mutex);
-            return rv;
+    }
+}
+
+static apr_status_t worker_stack_term(worker_stack *stack)
+{
+    int i;
+    apr_status_t rv;
+
+    while (1) {
+        apr_uint32_t state = stack->state;
+        if (apr_atomic_cas(&(stack->state), state | STACK_TERMINATED,
+                           state) == state) {
+            break;
         }
     }
-    else {
-        stack->no_listener = 1;
-        if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) {
+    for (i = 0; i < ap_threads_per_child; i++) {
+        if ((rv = worker_stack_awaken_next(stack)) != APR_SUCCESS) {
             return rv;
         }
     }
@@ -355,16 +403,12 @@ static int terminate_mode = ST_INIT;
 
 static void signal_threads(int mode)
 {
-    int i;
     if (terminate_mode == mode) {
         return;
     }
     terminate_mode = mode;
 
-    workers_may_exit = 1;
-    for (i = 0; i < ap_threads_per_child; i++) {
-        (void)worker_stack_awaken_next(idle_worker_stack);
-    }
+    worker_stack_term(idle_worker_stack);
 }
 
 AP_DECLARE(apr_status_t) ap_mpm_query(int query_code, int *result)
@@ -726,6 +770,7 @@ static void *worker_thread(apr_thread_t *thd, void * dummy)
     proc_info * ti = dummy;
     int process_slot = ti->pid;
     int thread_slot = ti->tid;
+    apr_uint32_t my_worker_num = (apr_uint32_t)(ti->tid);
     apr_pool_t *tpool = apr_thread_pool_get(thd);
     void *csd = NULL;
     apr_allocator_t *allocator;
@@ -735,7 +780,6 @@ static void *worker_thread(apr_thread_t *thd, void * dummy)
     apr_pollfd_t *pollset;
     apr_status_t rv;
     ap_listen_rec *lr, *last_lr = ap_listeners;
-    worker_wakeup_info *wakeup;
     int is_listener;
 
     ap_update_child_status_from_indexes(process_slot, thread_slot, SERVER_STARTING, NULL);
@@ -747,24 +791,6 @@ static void *worker_thread(apr_thread_t *thd, void * dummy)
     apr_allocator_set_owner(allocator, ptrans);
     bucket_alloc = apr_bucket_alloc_create(tpool);
 
-    wakeup = (worker_wakeup_info *)apr_palloc(tpool, sizeof(*wakeup));
-    if ((rv = apr_thread_cond_create(&wakeup->cond, tpool)) != APR_SUCCESS) {
-        ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
-                     "apr_thread_cond_create failed. Attempting to shutdown "
-                     "process gracefully.");
-        signal_threads(ST_GRACEFUL);
-        goto done;
-    }
-    if ((rv = apr_thread_mutex_create(&wakeup->mutex, APR_THREAD_MUTEX_DEFAULT,
-                                      tpool)) != APR_SUCCESS) {
-        ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
-                     "apr_thread_mutex_create failed. Attempting to shutdown "
-                     "process gracefully.");
-        signal_threads(ST_GRACEFUL);
-        goto done;
-    }
-    apr_thread_mutex_lock(wakeup->mutex);
-
     apr_poll_setup(&pollset, num_listensocks, tpool);
     for(lr = ap_listeners ; lr != NULL ; lr = lr->next)
         apr_poll_socket_add(pollset, lr->sd, APR_POLLIN);
@@ -778,10 +804,12 @@ static void *worker_thread(apr_thread_t *thd, void * dummy)
                                             SERVER_READY, NULL);
         if (!is_listener) {
             /* Wait until it's our turn to become the listener */
-            if ((rv = worker_stack_wait(idle_worker_stack, wakeup)) !=
+            if ((rv = worker_stack_wait(idle_worker_stack, my_worker_num)) !=
                 APR_SUCCESS) {
-                ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
-                             "worker_stack_wait failed. Shutting down");
+                if (rv != APR_EINVAL) {
+                    ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+                                 "worker_stack_wait failed. Shutting down");
+                }
                 break;
             }
             if (workers_may_exit) {
@@ -902,7 +930,8 @@ static void *worker_thread(apr_thread_t *thd, void * dummy)
         }
     }
 
- done:
+    workers_may_exit = 1;
+    worker_stack_term(idle_worker_stack);
     dying = 1;
     ap_scoreboard_image->parent[process_slot].quiescing = 1;
 
@@ -951,15 +980,27 @@ static void * APR_THREAD_FUNC start_threads(apr_thread_t *thd, void *dummy)
         clean_child_exit(APEXIT_CHILDFATAL);
     }
 
+    worker_wakeups = (worker_wakeup_info **)
+        apr_palloc(pchild, sizeof(worker_wakeup_info *) *
+                   ap_threads_per_child);
+
     loops = prev_threads_created = 0;
     while (1) {
         for (i = 0; i < ap_threads_per_child; i++) {
             int status = ap_scoreboard_image->servers[child_num_arg][i].status;
+            worker_wakeup_info *wakeup;
 
             if (status != SERVER_GRACEFUL && status != SERVER_DEAD) {
                 continue;
             }
 
+            wakeup = worker_wakeup_create(pchild);
+            if (wakeup == NULL) {
+                ap_log_error(APLOG_MARK, APLOG_ALERT|APLOG_NOERRNO, 0,
+                             ap_server_conf, "worker_wakeup_create failed");
+                clean_child_exit(APEXIT_CHILDFATAL);
+            }
+            worker_wakeups[threads_created] = wakeup;
             my_info = (proc_info *)malloc(sizeof(proc_info));
             if (my_info == NULL) {
                 ap_log_error(APLOG_MARK, APLOG_ALERT, errno, ap_server_conf,