From c32f6f16d2844fc57cfab412a7f3809bde94801e Mon Sep 17 00:00:00 2001 From: Brian Pane Date: Fri, 26 Apr 2002 07:14:26 +0000 Subject: [PATCH] Switch back from atomic_cas to mutexes, based on all the current portability challenges with atomics git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@94810 13f79535-47bb-0310-9956-ffa450edef68 --- server/mpm/experimental/leader/leader.c | 182 +++++++++++++----------- 1 file changed, 99 insertions(+), 83 deletions(-) diff --git a/server/mpm/experimental/leader/leader.c b/server/mpm/experimental/leader/leader.c index 7804684092..e64a1f311c 100644 --- a/server/mpm/experimental/leader/leader.c +++ b/server/mpm/experimental/leader/leader.c @@ -111,7 +111,6 @@ #include #include /* for INT_MAX */ -#include "apr_atomic.h" /* Limit on the total --- clients will be locked out if more servers than * this are needed. It is intended solely to keep the server from crashing @@ -279,15 +278,12 @@ static worker_wakeup_info *worker_wakeup_create(apr_pool_t *pool) /* Structure used to hold a stack of idle worker threads */ typedef struct { - /* 'state' consists of several fields concatenated into a - * single 32-bit int for use with the apr_atomic_cas() API: - * state & STACK_FIRST is the thread ID of the first thread - * in a linked list of idle threads - * state & STACK_TERMINATED indicates whether the proc is shutting down - * state & STACK_NO_LISTENER indicates whether the process has - * no current listener thread - */ - apr_uint32_t state; + apr_thread_mutex_t *mutex; + int no_listener; + int terminated; + worker_wakeup_info **stack; + apr_size_t nelts; + apr_size_t nalloc; } worker_stack; #define STACK_FIRST 0xffff @@ -295,82 +291,94 @@ typedef struct { #define STACK_TERMINATED 0x10000 #define STACK_NO_LISTENER 0x20000 -static worker_wakeup_info **worker_wakeups = NULL; - static worker_stack* worker_stack_create(apr_pool_t *pool, apr_size_t max) { + apr_status_t rv; worker_stack *stack = (worker_stack *)apr_palloc(pool, sizeof(*stack)); - stack->state = STACK_NO_LISTENER | STACK_LIST_END; + + if ((rv = apr_thread_mutex_create(&stack->mutex, APR_THREAD_MUTEX_DEFAULT, + pool)) != APR_SUCCESS) { + return NULL; + } + stack->no_listener = 1; + stack->terminated = 0; + stack->nelts = 0; + stack->nalloc = max; + stack->stack = + (worker_wakeup_info **)apr_palloc(pool, stack->nalloc * + sizeof(worker_wakeup_info *)); return stack; } static apr_status_t worker_stack_wait(worker_stack *stack, - apr_uint32_t worker_id) + worker_wakeup_info *wakeup) { - worker_wakeup_info *wakeup = worker_wakeups[worker_id]; - - while (1) { - apr_uint32_t state = stack->state; - if (state & (STACK_TERMINATED | STACK_NO_LISTENER)) { - if (state & STACK_TERMINATED) { - return APR_EINVAL; - } - if (apr_atomic_cas(&(stack->state), STACK_LIST_END, state) != - state) { - continue; - } - else { - return APR_SUCCESS; - } + apr_status_t rv; + if ((rv = apr_thread_mutex_lock(stack->mutex)) != APR_SUCCESS) { + return rv; + } + if (stack->terminated) { + return APR_EINVAL; + } + if (stack->no_listener) { + /* this thread should become the new listener immediately */ + stack->no_listener = 0; + if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) { + return rv; } - wakeup->next = state; - if (apr_atomic_cas(&(stack->state), worker_id, state) != state) { - continue; + return APR_SUCCESS; + } + else { + /* push this thread onto the stack of idle workers, and block + * on the condition variable until awoken + */ + if (stack->nelts == stack->nalloc) { + return APR_ENOSPC; } - else { - return apr_thread_cond_wait(wakeup->cond, wakeup->mutex); + stack->stack[stack->nelts++] = wakeup; + if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) { + return rv; + } + if ((rv = apr_thread_cond_wait(wakeup->cond, wakeup->mutex)) != + APR_SUCCESS) { + return rv; } - } + return APR_SUCCESS; + } } static apr_status_t worker_stack_awaken_next(worker_stack *stack) { - - while (1) { - apr_uint32_t state = stack->state; - apr_uint32_t first = state & STACK_FIRST; - if (first == STACK_LIST_END) { - if (apr_atomic_cas(&(stack->state), state | STACK_NO_LISTENER, - state) != state) { - continue; - } - else { - return APR_SUCCESS; - } + apr_status_t rv; + if ((rv = apr_thread_mutex_lock(stack->mutex)) != APR_SUCCESS) { + return rv; + } + if (stack->nelts) { + worker_wakeup_info *wakeup = stack->stack[--stack->nelts]; + if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) { + return rv; } - else { - worker_wakeup_info *wakeup = worker_wakeups[first]; - if (apr_atomic_cas(&(stack->state), (state ^ first) | wakeup->next, - state) != state) { - continue; - } - else { - /* Acquire and release the idle worker's mutex to ensure - * that it's actually waiting on its condition variable - */ - apr_status_t rv; - if ((rv = apr_thread_mutex_lock(wakeup->mutex)) != - APR_SUCCESS) { - return rv; - } - if ((rv = apr_thread_mutex_unlock(wakeup->mutex)) != - APR_SUCCESS) { - return rv; - } - return apr_thread_cond_signal(wakeup->cond); - } + /* Acquire and release the idle worker's mutex to ensure + * that it's actually waiting on its condition variable + */ + if ((rv = apr_thread_mutex_lock(wakeup->mutex)) != APR_SUCCESS) { + return rv; + } + if ((rv = apr_thread_mutex_unlock(wakeup->mutex)) != APR_SUCCESS) { + return rv; + } + if ((rv = apr_thread_cond_signal(wakeup->cond)) != APR_SUCCESS) { + apr_thread_mutex_unlock(stack->mutex); + return rv; + } + } + else { + stack->no_listener = 1; + if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) { + return rv; } } + return APR_SUCCESS; } static apr_status_t worker_stack_term(worker_stack *stack) @@ -378,18 +386,29 @@ static apr_status_t worker_stack_term(worker_stack *stack) int i; apr_status_t rv; - while (1) { - apr_uint32_t state = stack->state; - if (apr_atomic_cas(&(stack->state), state | STACK_TERMINATED, - state) == state) { - break; - } + if ((rv = apr_thread_mutex_lock(stack->mutex)) != APR_SUCCESS) { + return rv; } - for (i = 0; i < ap_threads_per_child; i++) { - if ((rv = worker_stack_awaken_next(stack)) != APR_SUCCESS) { + stack->terminated = 1; + while (stack->nelts) { + worker_wakeup_info *wakeup = stack->stack[--stack->nelts]; + /* Acquire and release the idle worker's mutex to ensure + * that it's actually waiting on its condition variable + */ + if ((rv = apr_thread_mutex_lock(wakeup->mutex)) != APR_SUCCESS) { + return rv; + } + if ((rv = apr_thread_mutex_unlock(wakeup->mutex)) != APR_SUCCESS) { + return rv; + } + if ((rv = apr_thread_cond_signal(wakeup->cond)) != APR_SUCCESS) { + apr_thread_mutex_unlock(stack->mutex); return rv; } } + if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) { + return rv; + } return APR_SUCCESS; } @@ -770,6 +789,7 @@ static void *worker_thread(apr_thread_t *thd, void * dummy) proc_info * ti = dummy; int process_slot = ti->pid; int thread_slot = ti->tid; + worker_wakeup_info *wakeup; apr_uint32_t my_worker_num = (apr_uint32_t)(ti->tid); apr_pool_t *tpool = apr_thread_pool_get(thd); void *csd = NULL; @@ -795,6 +815,8 @@ static void *worker_thread(apr_thread_t *thd, void * dummy) for(lr = ap_listeners ; lr != NULL ; lr = lr->next) apr_poll_socket_add(pollset, lr->sd, APR_POLLIN); + wakeup = worker_wakeup_create(tpool); + /* TODO: Switch to a system where threads reuse the results from earlier poll calls - manoj */ is_listener = 0; @@ -804,7 +826,7 @@ static void *worker_thread(apr_thread_t *thd, void * dummy) SERVER_READY, NULL); if (!is_listener) { /* Wait until it's our turn to become the listener */ - if ((rv = worker_stack_wait(idle_worker_stack, my_worker_num)) != + if ((rv = worker_stack_wait(idle_worker_stack, wakeup)) != APR_SUCCESS) { if (rv != APR_EINVAL) { ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf, @@ -980,10 +1002,6 @@ static void * APR_THREAD_FUNC start_threads(apr_thread_t *thd, void *dummy) clean_child_exit(APEXIT_CHILDFATAL); } - worker_wakeups = (worker_wakeup_info **) - apr_palloc(pchild, sizeof(worker_wakeup_info *) * - ap_threads_per_child); - loops = prev_threads_created = 0; while (1) { for (i = 0; i < ap_threads_per_child; i++) { @@ -994,13 +1012,11 @@ static void * APR_THREAD_FUNC start_threads(apr_thread_t *thd, void *dummy) continue; } - wakeup = worker_wakeup_create(pchild); if (wakeup == NULL) { ap_log_error(APLOG_MARK, APLOG_ALERT|APLOG_NOERRNO, 0, ap_server_conf, "worker_wakeup_create failed"); clean_child_exit(APEXIT_CHILDFATAL); } - worker_wakeups[threads_created] = wakeup; my_info = (proc_info *)malloc(sizeof(proc_info)); if (my_info == NULL) { ap_log_error(APLOG_MARK, APLOG_ALERT, errno, ap_server_conf, -- 2.40.0