From 49098fff9147a70bfe157cda74cc13a135a290a7 Mon Sep 17 00:00:00 2001 From: Brian Pane Date: Fri, 19 Apr 2002 06:33:08 +0000 Subject: [PATCH] Replaced the mutex around the idle worker stack with atomic compare-and-swap loops git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@94705 13f79535-47bb-0310-9956-ffa450edef68 --- server/mpm/experimental/leader/leader.c | 235 ++++++++++++++---------- 1 file changed, 138 insertions(+), 97 deletions(-) diff --git a/server/mpm/experimental/leader/leader.c b/server/mpm/experimental/leader/leader.c index 5407f2d875..a3fe24e50f 100644 --- a/server/mpm/experimental/leader/leader.c +++ b/server/mpm/experimental/leader/leader.c @@ -72,6 +72,7 @@ #include "apr_thread_cond.h" #include "apr_thread_mutex.h" #include "apr_proc_mutex.h" +#include "apr_atomic.h" #define APR_WANT_STRFUNC #include "apr_want.h" @@ -172,6 +173,8 @@ static int requests_this_child; static int num_listensocks = 0; static int resource_shortage = 0; +typedef struct worker_wakeup_info worker_wakeup_info; + /* The structure used to pass unique initialization info to each thread */ typedef struct { int pid; @@ -243,102 +246,147 @@ static apr_proc_mutex_t *accept_mutex; /* Structure used to wake up an idle worker thread */ -typedef struct { +struct worker_wakeup_info { apr_thread_cond_t *cond; apr_thread_mutex_t *mutex; -} worker_wakeup_info; + apr_uint32_t next; /* index into worker_wakeups array, + * used to build a linked list + */ +}; + +static worker_wakeup_info *worker_wakeup_create(apr_pool_t *pool) +{ + apr_status_t rv; + worker_wakeup_info *wakeup; + + wakeup = (worker_wakeup_info *)apr_palloc(pool, sizeof(*wakeup)); + if ((rv = apr_thread_cond_create(&wakeup->cond, pool)) != APR_SUCCESS) { + return NULL; + } + if ((rv = apr_thread_mutex_create(&wakeup->mutex, APR_THREAD_MUTEX_DEFAULT, + pool)) != APR_SUCCESS) { + return NULL; + } + /* The wakeup's mutex will be unlocked automatically when + * the worker blocks on the condition variable + */ + apr_thread_mutex_lock(wakeup->mutex); + return wakeup; +} + /* Structure used to hold a stack of idle worker threads */ typedef struct { - apr_thread_mutex_t *mutex; - int no_listener; - worker_wakeup_info **stack; - apr_size_t nelts; - apr_size_t nalloc; + /* 'state' consists of several fields concatenated into a + * single 32-bit int for use with the apr_atomic_cas() API: + * state & STACK_FIRST is the thread ID of the first thread + * in a linked list of idle threads + * state & STACK_TERMINATED indicates whether the proc is shutting down + * state & STACK_NO_LISTENER indicates whether the process has + * no current listener thread + */ + apr_uint32_t state; } worker_stack; +#define STACK_FIRST 0xffff +#define STACK_LIST_END 0xffff +#define STACK_TERMINATED 0x10000 +#define STACK_NO_LISTENER 0x20000 + +static worker_wakeup_info **worker_wakeups = NULL; + static worker_stack* worker_stack_create(apr_pool_t *pool, apr_size_t max) { - apr_status_t rv; worker_stack *stack = (worker_stack *)apr_palloc(pool, sizeof(*stack)); - - if ((rv = apr_thread_mutex_create(&stack->mutex, APR_THREAD_MUTEX_DEFAULT, - pool)) != APR_SUCCESS) { - return NULL; - } - stack->no_listener = 1; - stack->nelts = 0; - stack->nalloc = max; - stack->stack = - (worker_wakeup_info **)apr_palloc(pool, stack->nalloc * - sizeof(worker_wakeup_info *)); + stack->state = STACK_NO_LISTENER | STACK_LIST_END; return stack; } static apr_status_t worker_stack_wait(worker_stack *stack, - worker_wakeup_info *wakeup) + apr_uint32_t worker_id) { - apr_status_t rv; - if ((rv = apr_thread_mutex_lock(stack->mutex)) != APR_SUCCESS) { - return rv; - } - if (stack->no_listener) { - /* this thread should become the new listener immediately */ - stack->no_listener = 0; - if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) { - return rv; - } - return APR_SUCCESS; - } - else { - /* push this thread onto the stack of idle workers, and block - * on the condition variable until awoken - */ - if (stack->nelts == stack->nalloc) { - return APR_ENOSPC; + worker_wakeup_info *wakeup = worker_wakeups[worker_id]; + + while (1) { + apr_uint32_t state = stack->state; + if (state & (STACK_TERMINATED | STACK_NO_LISTENER)) { + if (state & STACK_TERMINATED) { + return APR_EINVAL; + } + if (apr_atomic_cas(&(stack->state), STACK_LIST_END, state) != + state) { + continue; + } + else { + return APR_SUCCESS; + } } - stack->stack[stack->nelts++] = wakeup; - if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) { - return rv; + wakeup->next = state; + if (apr_atomic_cas(&(stack->state), worker_id, state) != state) { + continue; } - if ((rv = apr_thread_cond_wait(wakeup->cond, wakeup->mutex)) != - APR_SUCCESS) { - return rv; + else { + return apr_thread_cond_wait(wakeup->cond, wakeup->mutex); } - return APR_SUCCESS; - } + } } static apr_status_t worker_stack_awaken_next(worker_stack *stack) { - apr_status_t rv; - if ((rv = apr_thread_mutex_lock(stack->mutex)) != APR_SUCCESS) { - return rv; - } - if (stack->nelts) { - worker_wakeup_info *wakeup = stack->stack[--stack->nelts]; - if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) { - return rv; - } - /* Acquire and release the idle worker's mutex to ensure - * that it's actually waiting on its condition variable - */ - if ((rv = apr_thread_mutex_lock(wakeup->mutex)) != APR_SUCCESS) { - return rv; + + while (1) { + apr_uint32_t state = stack->state; + apr_uint32_t first = state & STACK_FIRST; + if (first == STACK_LIST_END) { + if (apr_atomic_cas(&(stack->state), state | STACK_NO_LISTENER, + state) != state) { + continue; + } + else { + return APR_SUCCESS; + } } - if ((rv = apr_thread_mutex_unlock(wakeup->mutex)) != APR_SUCCESS) { - return rv; + else { + worker_wakeup_info *wakeup = worker_wakeups[first]; + apr_uint32_t new_state = state & ~STACK_FIRST; + new_state |= wakeup->next; + if (apr_atomic_cas(&(stack->state), new_state, state) != state) { + continue; + } + else { + /* Acquire and release the idle worker's mutex to ensure + * that it's actually waiting on its condition variable + */ + apr_status_t rv; + if ((rv = apr_thread_mutex_lock(wakeup->mutex)) != + APR_SUCCESS) { + return rv; + } + if ((rv = apr_thread_mutex_unlock(wakeup->mutex)) != + APR_SUCCESS) { + return rv; + } + return apr_thread_cond_signal(wakeup->cond); + } } - apr_thread_mutex_unlock(wakeup->mutex); - if ((rv = apr_thread_cond_signal(wakeup->cond)) != APR_SUCCESS) { - apr_thread_mutex_unlock(stack->mutex); - return rv; + } +} + +static apr_status_t worker_stack_term(worker_stack *stack) +{ + int i; + apr_status_t rv; + + while (1) { + apr_uint32_t state = stack->state; + if (apr_atomic_cas(&(stack->state), state | STACK_TERMINATED, + state) == state) { + break; } } - else { - stack->no_listener = 1; - if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) { + for (i = 0; i < ap_threads_per_child; i++) { + if ((rv = worker_stack_awaken_next(stack)) != APR_SUCCESS) { return rv; } } @@ -355,16 +403,12 @@ static int terminate_mode = ST_INIT; static void signal_threads(int mode) { - int i; if (terminate_mode == mode) { return; } terminate_mode = mode; - workers_may_exit = 1; - for (i = 0; i < ap_threads_per_child; i++) { - (void)worker_stack_awaken_next(idle_worker_stack); - } + worker_stack_term(idle_worker_stack); } AP_DECLARE(apr_status_t) ap_mpm_query(int query_code, int *result) @@ -726,6 +770,7 @@ static void *worker_thread(apr_thread_t *thd, void * dummy) proc_info * ti = dummy; int process_slot = ti->pid; int thread_slot = ti->tid; + apr_uint32_t my_worker_num = (apr_uint32_t)(ti->tid); apr_pool_t *tpool = apr_thread_pool_get(thd); void *csd = NULL; apr_allocator_t *allocator; @@ -735,7 +780,6 @@ static void *worker_thread(apr_thread_t *thd, void * dummy) apr_pollfd_t *pollset; apr_status_t rv; ap_listen_rec *lr, *last_lr = ap_listeners; - worker_wakeup_info *wakeup; int is_listener; ap_update_child_status_from_indexes(process_slot, thread_slot, SERVER_STARTING, NULL); @@ -747,24 +791,6 @@ static void *worker_thread(apr_thread_t *thd, void * dummy) apr_allocator_set_owner(allocator, ptrans); bucket_alloc = apr_bucket_alloc_create(tpool); - wakeup = (worker_wakeup_info *)apr_palloc(tpool, sizeof(*wakeup)); - if ((rv = apr_thread_cond_create(&wakeup->cond, tpool)) != APR_SUCCESS) { - ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf, - "apr_thread_cond_create failed. Attempting to shutdown " - "process gracefully."); - signal_threads(ST_GRACEFUL); - goto done; - } - if ((rv = apr_thread_mutex_create(&wakeup->mutex, APR_THREAD_MUTEX_DEFAULT, - tpool)) != APR_SUCCESS) { - ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf, - "apr_thread_mutex_create failed. Attempting to shutdown " - "process gracefully."); - signal_threads(ST_GRACEFUL); - goto done; - } - apr_thread_mutex_lock(wakeup->mutex); - apr_poll_setup(&pollset, num_listensocks, tpool); for(lr = ap_listeners ; lr != NULL ; lr = lr->next) apr_poll_socket_add(pollset, lr->sd, APR_POLLIN); @@ -778,10 +804,12 @@ static void *worker_thread(apr_thread_t *thd, void * dummy) SERVER_READY, NULL); if (!is_listener) { /* Wait until it's our turn to become the listener */ - if ((rv = worker_stack_wait(idle_worker_stack, wakeup)) != + if ((rv = worker_stack_wait(idle_worker_stack, my_worker_num)) != APR_SUCCESS) { - ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf, - "worker_stack_wait failed. Shutting down"); + if (rv != APR_EINVAL) { + ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf, + "worker_stack_wait failed. Shutting down"); + } break; } if (workers_may_exit) { @@ -902,7 +930,8 @@ static void *worker_thread(apr_thread_t *thd, void * dummy) } } - done: + workers_may_exit = 1; + worker_stack_term(idle_worker_stack); dying = 1; ap_scoreboard_image->parent[process_slot].quiescing = 1; @@ -951,15 +980,27 @@ static void * APR_THREAD_FUNC start_threads(apr_thread_t *thd, void *dummy) clean_child_exit(APEXIT_CHILDFATAL); } + worker_wakeups = (worker_wakeup_info **) + apr_palloc(pchild, sizeof(worker_wakeup_info *) * + ap_threads_per_child); + loops = prev_threads_created = 0; while (1) { for (i = 0; i < ap_threads_per_child; i++) { int status = ap_scoreboard_image->servers[child_num_arg][i].status; + worker_wakeup_info *wakeup; if (status != SERVER_GRACEFUL && status != SERVER_DEAD) { continue; } + wakeup = worker_wakeup_create(pchild); + if (wakeup == NULL) { + ap_log_error(APLOG_MARK, APLOG_ALERT|APLOG_NOERRNO, 0, + ap_server_conf, "worker_wakeup_create failed"); + clean_child_exit(APEXIT_CHILDFATAL); + } + worker_wakeups[threads_created] = wakeup; my_info = (proc_info *)malloc(sizeof(proc_info)); if (my_info == NULL) { ap_log_error(APLOG_MARK, APLOG_ALERT, errno, ap_server_conf, -- 2.50.1