From: Aaron Bannert Date: Sun, 28 Apr 2002 22:25:16 +0000 (+0000) Subject: Some threadpool fixes: X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=f00ec6fdf1ec1ab40dc75b17cab0640290527220;p=apache Some threadpool fixes: - We don't need the listener_blocked flag, just check if the element we're adding makes the stack non-empty. - When we are terminated, return APR_EOF; catch this event in the worker thread and the listener thread. - When shutting down, always signal a potential listener thread. - Wait to signal the listener thread until after we add the worker element to the stack. git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@94841 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/server/mpm/experimental/threadpool/threadpool.c b/server/mpm/experimental/threadpool/threadpool.c index a1c122af99..33f4a17ad4 100644 --- a/server/mpm/experimental/threadpool/threadpool.c +++ b/server/mpm/experimental/threadpool/threadpool.c @@ -269,7 +269,6 @@ typedef struct { typedef struct { apr_thread_mutex_t *mutex; apr_thread_cond_t *cond; - int listener_blocked; worker_wakeup_info **stack; apr_size_t nelts; apr_size_t nalloc; @@ -288,7 +287,6 @@ static worker_stack* worker_stack_create(apr_pool_t *pool, apr_size_t max) if ((rv = apr_thread_cond_create(&stack->cond, pool)) != APR_SUCCESS) { return NULL; } - stack->listener_blocked = 0; stack->nelts = 0; stack->nalloc = max; stack->stack = @@ -307,27 +305,32 @@ static apr_status_t worker_stack_wait(worker_stack *stack, return rv; } if (stack->terminated) { - wakeup->csd = NULL; if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) { return rv; } - return APR_EINTR; + return APR_EOF; } if (stack->nelts == stack->nalloc) { - (void)apr_thread_mutex_unlock(stack->mutex); - return APR_ENOSPC; - } - if (stack->listener_blocked) { - stack->listener_blocked = 0; - if ((rv = apr_thread_cond_signal(stack->cond)) != - APR_SUCCESS) { + if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) { return rv; } + return APR_ENOSPC; + } + stack->stack[stack->nelts] = wakeup; + /* Signal a blocking listener thread only if we just made the + * stack non-empty. */ + if (stack->nelts++ == 0) { + (void)apr_thread_cond_signal(stack->cond); } - stack->stack[stack->nelts++] = wakeup; if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) { return rv; } + + /* At this point we've already added this worker to the stack, now + * we just wait until the listener has accept()ed a connection + * for us. */ + /* FIXME: We must own the wakeup->mutex before calling cond_wait(), + * and then keep calling cond_wait() until some signal is met. */ if ((rv = apr_thread_cond_wait(wakeup->cond, wakeup->mutex)) != APR_SUCCESS) { return rv; @@ -342,15 +345,22 @@ static apr_status_t worker_stack_pop(worker_stack *stack, if ((rv = apr_thread_mutex_lock(stack->mutex)) != APR_SUCCESS) { return rv; } - if (stack->nelts == 0) { - stack->listener_blocked = 1; - if ((rv = apr_thread_cond_wait(stack->cond, stack->mutex)) != - APR_SUCCESS) { + while ((stack->nelts == 0) && (!stack->terminated)) { + rv = apr_thread_cond_wait(stack->cond, stack->mutex); + if (rv != APR_SUCCESS) { + apr_status_t rv2; + rv2 = apr_thread_mutex_unlock(stack->mutex); + if (rv2 != APR_SUCCESS) { + return rv2; + } return rv; } + } + if (stack->terminated) { if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) { return rv; } + return APR_EOF; } *worker = stack->stack[--stack->nelts]; if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) { @@ -394,12 +404,9 @@ static void wakeup_listener(void) if ((rv = apr_thread_mutex_lock(idle_worker_stack->mutex)) != APR_SUCCESS) { return; } - if (idle_worker_stack->listener_blocked) { - idle_worker_stack->listener_blocked = 0; - if ((rv = apr_thread_cond_signal(idle_worker_stack->cond)) != - APR_SUCCESS) { - return; - } + if ((rv = apr_thread_cond_signal(idle_worker_stack->cond)) != + APR_SUCCESS) { + return; } if ((rv = apr_thread_mutex_unlock(idle_worker_stack->mutex)) != APR_SUCCESS) { return; @@ -844,8 +851,11 @@ static void *listener_thread(apr_thread_t *thd, void * dummy) if (listener_may_exit) break; if (worker == NULL) { - if ((rv = worker_stack_pop(idle_worker_stack, &worker)) != - APR_SUCCESS) { + rv = worker_stack_pop(idle_worker_stack, &worker); + if (APR_STATUS_IS_EOF(rv)) { + break; + } + else if (rv != APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ap_server_conf, "worker_stack_pop failed"); break; @@ -1034,9 +1044,13 @@ static void * APR_THREAD_FUNC worker_thread(apr_thread_t *thd, void * dummy) while (!workers_may_exit) { ap_update_child_status_from_indexes(process_slot, thread_slot, SERVER_READY, NULL); rv = worker_stack_wait(idle_worker_stack, wakeup); - if ((rv != APR_SUCCESS) || workers_may_exit || - (wakeup->csd == NULL)) { - break; + if (APR_STATUS_IS_EOF(rv)) { + break; /* The queue has been terminated. */ + } + else if (rv != APR_SUCCESS) { + ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ap_server_conf, + "worker_stack_wait failed"); + break; /* Treat all other errors as fatal. */ } process_socket(ptrans, wakeup->csd, process_slot, thread_slot, bucket_alloc);