From fe4fa0cd35afe713640b06b1cf9198f13cbb2d5e Mon Sep 17 00:00:00 2001 From: Ryan Bloom Date: Fri, 24 Aug 2001 16:49:39 +0000 Subject: [PATCH] Make the worker MPM shutdown and restart cleanly. This also cleans up some race conditions, and gets the worker using pools more cleanly. Submitted by: [Aaron Bannert ] git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@90635 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES | 9 ++- server/mpm/worker/fdqueue.c | 143 +++++++++++++++++++++++++----------- server/mpm/worker/fdqueue.h | 45 ++++++------ server/mpm/worker/worker.c | 56 ++++++++------ 4 files changed, 165 insertions(+), 88 deletions(-) diff --git a/CHANGES b/CHANGES index 820ce19994..52f721a954 100644 --- a/CHANGES +++ b/CHANGES @@ -1,6 +1,11 @@ Changes with Apache 2.0.25-dev - *) Implement CRYPTO_set_locking_callback() in terms of apr_lock - for mod_ssl + + *) Make the worker MPM shutdown and restart cleanly. This also + cleans up some race conditions, and gets the worker using + pools more cleanly. [Aaron Bannert ] + + *) Implement CRYPTO_set_locking_callback() in terms of apr_lock + for mod_ssl [Madhusudan Mathihalli ] *) Fix for mod_include. Ryan's patch to check error diff --git a/server/mpm/worker/fdqueue.c b/server/mpm/worker/fdqueue.c index ad7a4752f0..8f6fe86d65 100644 --- a/server/mpm/worker/fdqueue.c +++ b/server/mpm/worker/fdqueue.c @@ -60,7 +60,11 @@ /* Assumption: increment and decrement are atomic on int */ -int ap_increase_blanks(FDQueue *queue) +/** + * Threadsafe way to increment the number of empty slots ("blanks") + * in the resource queue. + */ +int ap_increase_blanks(fd_queue_t *queue) { if (pthread_mutex_lock(&queue->one_big_mutex) != 0) { return FD_QUEUE_FAILURE; @@ -69,61 +73,129 @@ int ap_increase_blanks(FDQueue *queue) if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) { return FD_QUEUE_FAILURE; } + return FD_QUEUE_SUCCESS; } +/** + * Detects when the fd_queue_t is full. This utility function is expected + * to be called from within critical sections, and is not threadsafe. + */ +static int ap_queue_full(fd_queue_t *queue) +{ + return (queue->blanks <= 0); +} + +/** + * Detects when the fd_queue_t is empty. This utility function is expected + * to be called from within critical sections, and is not threadsafe. + */ +static int ap_queue_empty(fd_queue_t *queue) +{ + /*return (queue->head == queue->tail);*/ + return (queue->blanks >= queue->bounds - 1); +} + +/** + * Callback routine that is called to destroy this + * fd_queue_t when it's pool is destroyed. + */ static apr_status_t ap_queue_destroy(void *data) { - FDQueue *queue = data; - /* Ignore errors here, we can't do anything about them anyway */ - pthread_cond_destroy(&(queue->not_empty)); - pthread_cond_destroy(&(queue->not_full)); + fd_queue_t *queue = data; + + /* Ignore errors here, we can't do anything about them anyway. + * XXX: We should at least try to signal an error here, it is + * indicative of a programmer error. -aaron */ + pthread_cond_destroy(&queue->not_empty); + pthread_cond_destroy(&queue->not_full); pthread_mutex_destroy(&queue->one_big_mutex); + return FD_QUEUE_SUCCESS; } -int ap_queue_init(FDQueue *queue, int queue_capacity, apr_pool_t *a) +/** + * Initialize the fd_queue_t. + */ +int ap_queue_init(fd_queue_t *queue, int queue_capacity, apr_pool_t *a) { int i; - int bounds = queue_capacity + 1; - pthread_cond_t not_empty = PTHREAD_COND_INITIALIZER; - pthread_cond_t not_full = PTHREAD_COND_INITIALIZER; - queue->not_empty = not_empty; - queue->not_full = not_full; - pthread_mutex_init(&queue->one_big_mutex, NULL); + int bounds; + + if (pthread_mutex_init(&queue->one_big_mutex, NULL) != 0) + return FD_QUEUE_FAILURE; + if (pthread_cond_init(&queue->not_empty, NULL) != 0) + return FD_QUEUE_FAILURE; + if (pthread_cond_init(&queue->not_full, NULL) != 0) + return FD_QUEUE_FAILURE; + + bounds = queue_capacity + 1; queue->head = queue->tail = 0; - queue->data = apr_palloc(a, bounds * sizeof(FDQueueElement)); + queue->data = apr_palloc(a, bounds * sizeof(fd_queue_elem_t)); queue->bounds = bounds; - queue->blanks = 0; - apr_pool_cleanup_register(a, queue, ap_queue_destroy, apr_pool_cleanup_null); - for (i=0; i < bounds; ++i) + queue->blanks = queue_capacity; + + /* Set all the sockets in the queue to NULL */ + for (i = 0; i < bounds; ++i) queue->data[i].sd = NULL; + + apr_pool_cleanup_register(a, queue, ap_queue_destroy, apr_pool_cleanup_null); + return FD_QUEUE_SUCCESS; } -int ap_queue_push(FDQueue *queue, apr_socket_t *sd, apr_pool_t *p) +/** + * Push a new socket onto the queue. Blocks if the queue is full. Once + * the push operation has completed, it signals other threads waiting + * in apr_queue_pop() that they may continue consuming sockets. + */ +int ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t *p) { if (pthread_mutex_lock(&queue->one_big_mutex) != 0) { return FD_QUEUE_FAILURE; } + + /* Keep waiting until we wake up and find that the queue is not full. */ + while (ap_queue_full(queue)) { + pthread_cond_wait(&queue->not_full, &queue->one_big_mutex); + } + queue->data[queue->tail].sd = sd; queue->data[queue->tail].p = p; queue->tail = (queue->tail + 1) % queue->bounds; queue->blanks--; + + pthread_cond_signal(&queue->not_empty); + if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) { return FD_QUEUE_FAILURE; } - pthread_cond_signal(&(queue->not_empty)); + return FD_QUEUE_SUCCESS; } -apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p) +/** + * Retrieves the next available socket from the queue. If there are no + * sockets available, it will block until one becomes available. + * Once retrieved, the socket is placed into the address specified by + * 'sd'. + */ +apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p) { if (pthread_mutex_lock(&queue->one_big_mutex) != 0) { return FD_QUEUE_FAILURE; } - if (queue->head == queue->tail) { - pthread_cond_wait(&(queue->not_empty), &queue->one_big_mutex); + + /* Keep waiting until we wake up and find that the queue is not empty. */ + if (ap_queue_empty(queue)) { + pthread_cond_wait(&queue->not_empty, &queue->one_big_mutex); + /* If we wake up and it's still empty, then we were interrupted */ + if (ap_queue_empty(queue)) { + if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) { + return FD_QUEUE_FAILURE; + } + return FD_QUEUE_EINTR; + } } *sd = queue->data[queue->head].sd; @@ -133,40 +205,27 @@ apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p) if (sd != NULL) { queue->head = (queue->head + 1) % queue->bounds; } + queue->blanks++; + + /* we just consumed a slot, so we're no longer full */ + pthread_cond_signal(&queue->not_full); + if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) { return FD_QUEUE_FAILURE; } - if (queue->blanks > 0) { - pthread_cond_signal(&(queue->not_full)); - } - return APR_SUCCESS; -} - -int ap_queue_size(FDQueue *queue) -{ - return ((queue->tail - queue->head + queue->bounds) % queue->bounds); -} -int ap_queue_full(FDQueue *queue) -{ - return(queue->blanks <= 0); + return APR_SUCCESS; } -int ap_block_on_queue(FDQueue *queue) +apr_status_t ap_queue_interrupt_all(fd_queue_t *queue) { if (pthread_mutex_lock(&queue->one_big_mutex) != 0) { return FD_QUEUE_FAILURE; } - if (ap_queue_full(queue)) { - pthread_cond_wait(&(queue->not_full), &queue->one_big_mutex); - } + pthread_cond_broadcast(&queue->not_empty); if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) { return FD_QUEUE_FAILURE; } return FD_QUEUE_SUCCESS; } -void ap_queue_signal_all_wakeup(FDQueue *queue) -{ - pthread_cond_broadcast(&(queue->not_empty)); -} diff --git a/server/mpm/worker/fdqueue.h b/server/mpm/worker/fdqueue.h index c59ea254eb..e73181497c 100644 --- a/server/mpm/worker/fdqueue.h +++ b/server/mpm/worker/fdqueue.h @@ -66,34 +66,35 @@ #include #include #include +#include #define FD_QUEUE_SUCCESS 0 #define FD_QUEUE_FAILURE -1 /* Needs to be an invalid file descriptor because of queue_pop semantics */ +#define FD_QUEUE_EINTR APR_EINTR -typedef struct fd_queue_elem { - apr_socket_t *sd; - apr_pool_t *p; -} FDQueueElement; +struct fd_queue_elem_t { + apr_socket_t *sd; + apr_pool_t *p; +}; +typedef struct fd_queue_elem_t fd_queue_elem_t; -typedef struct fd_queue { - int head; - int tail; - FDQueueElement *data; - int bounds; - int blanks; - pthread_mutex_t one_big_mutex; - pthread_cond_t not_empty; - pthread_cond_t not_full; -} FDQueue; +struct fd_queue_t { + int head; + int tail; + fd_queue_elem_t *data; + int bounds; + int blanks; + pthread_mutex_t one_big_mutex; + pthread_cond_t not_empty; + pthread_cond_t not_full; + int cancel_state; +}; +typedef struct fd_queue_t fd_queue_t; -int ap_queue_init(FDQueue *queue, int queue_size, apr_pool_t *a); -int ap_queue_push(FDQueue *queue, apr_socket_t *sd, apr_pool_t *p); -apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p); -int ap_queue_size(FDQueue *queue); -int ap_queue_full(FDQueue *queue); -int ap_block_on_queue(FDQueue *queue); -void ap_queue_signal_all_wakeup(FDQueue *queue); -int ap_increase_blanks(FDQueue *queue); +int ap_queue_init(fd_queue_t *queue, int queue_capacity, apr_pool_t *a); +int ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t *p); +apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p); +apr_status_t ap_queue_interrupt_all(fd_queue_t *queue); #endif /* FDQUEUE_H */ diff --git a/server/mpm/worker/worker.c b/server/mpm/worker/worker.c index 76465963d4..a56b1f9fff 100644 --- a/server/mpm/worker/worker.c +++ b/server/mpm/worker/worker.c @@ -124,14 +124,13 @@ static int workers_may_exit = 0; static int requests_this_child; static int num_listensocks = 0; static apr_socket_t **listensocks; -static FDQueue *worker_queue; +static fd_queue_t *worker_queue; /* The structure used to pass unique initialization info to each thread */ typedef struct { int pid; int tid; int sd; - apr_pool_t *tpool; /* "pthread" would be confusing */ } proc_info; /* Structure used to pass information to the thread responsible for @@ -201,7 +200,9 @@ static const char *lock_fname; static void signal_workers(void) { workers_may_exit = 1; - ap_queue_signal_all_wakeup(worker_queue); + /* XXX: This will happen naturally on a graceful, and we don't care otherwise. + ap_queue_signal_all_wakeup(worker_queue); */ + ap_queue_interrupt_all(worker_queue); } AP_DECLARE(apr_status_t) ap_mpm_query(int query_code, int *result) @@ -553,7 +554,7 @@ static void *listener_thread(apr_thread_t *thd, void * dummy) proc_info * ti = dummy; int process_slot = ti->pid; int thread_slot = ti->tid; - apr_pool_t *tpool = ti->tpool; + apr_pool_t *tpool = apr_thread_pool_get(thd); apr_socket_t *csd = NULL; apr_pool_t *ptrans; /* Pool for per-transaction stuff */ apr_socket_t *sd = NULL; @@ -564,8 +565,6 @@ static void *listener_thread(apr_thread_t *thd, void * dummy) free(ti); - apr_pool_create(&ptrans, tpool); - apr_lock_acquire(worker_thread_count_mutex); worker_thread_count++; apr_lock_release(worker_thread_count_mutex); @@ -574,12 +573,10 @@ static void *listener_thread(apr_thread_t *thd, void * dummy) for(n=0 ; n <= num_listensocks ; ++n) apr_poll_socket_add(pollset, listensocks[n], APR_POLLIN); - worker_queue = apr_pcalloc(pchild, sizeof(*worker_queue)); - ap_queue_init(worker_queue, ap_threads_per_child, pchild); - /* TODO: Switch to a system where threads reuse the results from earlier poll calls - manoj */ while (1) { + /* TODO: requests_this_child should be synchronized - aaron */ if (requests_this_child <= 0) { check_infinite_requests(); } @@ -644,6 +641,9 @@ static void *listener_thread(apr_thread_t *thd, void * dummy) } got_fd: if (!workers_may_exit) { + /* create a new transaction pool for each accepted socket */ + apr_pool_create(&ptrans, tpool); + if ((rv = apr_accept(&csd, sd, ptrans)) != APR_SUCCESS) { csd = NULL; ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, @@ -658,7 +658,6 @@ static void *listener_thread(apr_thread_t *thd, void * dummy) } if (csd != NULL) { ap_queue_push(worker_queue, csd, ptrans); - ap_block_on_queue(worker_queue); } } else { @@ -673,13 +672,15 @@ static void *listener_thread(apr_thread_t *thd, void * dummy) } } - apr_pool_destroy(tpool); ap_update_child_status(process_slot, thread_slot, (dying) ? SERVER_DEAD : SERVER_GRACEFUL, (request_rec *) NULL); dying = 1; ap_scoreboard_image->parent[process_slot].quiescing = 1; kill(ap_my_pid, SIGTERM); +/* this is uncommented when we make a pool-pool + apr_thread_exit(thd, APR_SUCCESS); +*/ return NULL; } @@ -688,30 +689,35 @@ static void *worker_thread(apr_thread_t *thd, void * dummy) proc_info * ti = dummy; int process_slot = ti->pid; int thread_slot = ti->tid; - apr_pool_t *tpool = ti->tpool; apr_socket_t *csd = NULL; apr_pool_t *ptrans; /* Pool for per-transaction stuff */ + apr_status_t rv; free(ti); + /* apr_pool_create(&ptrans, tpool); */ + while (!workers_may_exit) { - ap_queue_pop(worker_queue, &csd, &ptrans); - if (!csd) { + rv = ap_queue_pop(worker_queue, &csd, &ptrans); + /* We get FD_QUEUE_EINTR whenever ap_queue_pop() has been interrupted + * from an explicit call to ap_queue_interrupt_all(). This allows + * us to unblock threads stuck in ap_queue_pop() when a shutdown + * is pending. */ + if (rv == FD_QUEUE_EINTR || !csd) { continue; } - ap_increase_blanks(worker_queue); process_socket(ptrans, csd, process_slot, thread_slot); - requests_this_child--; - apr_pool_clear(ptrans); + requests_this_child--; /* FIXME: should be synchronized - aaron */ + apr_pool_destroy(ptrans); } - apr_pool_destroy(tpool); ap_update_child_status(process_slot, thread_slot, (dying) ? SERVER_DEAD : SERVER_GRACEFUL, (request_rec *) NULL); apr_lock_acquire(worker_thread_count_mutex); worker_thread_count--; apr_lock_release(worker_thread_count_mutex); + apr_thread_exit(thd, APR_SUCCESS); return NULL; } @@ -738,14 +744,20 @@ static void *start_threads(apr_thread_t *thd, void * dummy) int threads_created = 0; apr_thread_t *listener; + /* We must create the fd queue before we start up the listener + * and worker threads. */ + worker_queue = apr_pcalloc(pchild, sizeof(*worker_queue)); + ap_queue_init(worker_queue, ap_threads_per_child, pchild); + my_info = (proc_info *)malloc(sizeof(proc_info)); my_info->pid = my_child_num; my_info->tid = i; my_info->sd = 0; - apr_pool_create(&my_info->tpool, pchild); apr_thread_create(&listener, thread_attr, listener_thread, my_info, pchild); while (1) { - for (i=1; i < ap_threads_per_child; i++) { + /* Does ap_threads_per_child include the listener thread? + * Why does this forloop start at 1? -aaron */ + for (i = 1; i < ap_threads_per_child; i++) { int status = ap_scoreboard_image->servers[child_num_arg][i].status; if (status != SERVER_GRACEFUL && status != SERVER_DEAD) { @@ -761,7 +773,6 @@ static void *start_threads(apr_thread_t *thd, void * dummy) my_info->pid = my_child_num; my_info->tid = i; my_info->sd = 0; - apr_pool_create(&my_info->tpool, pchild); /* We are creating threads right now */ (void) ap_update_child_status(my_child_num, i, SERVER_STARTING, @@ -794,6 +805,7 @@ static void *start_threads(apr_thread_t *thd, void * dummy) * "life_status" is almost right, but it's in the worker's structure, and * the name could be clearer. gla */ + apr_thread_exit(thd, APR_SUCCESS); return NULL; } @@ -870,7 +882,7 @@ static void child_main(int child_num_arg) apr_lock_create(&pipe_of_death_mutex, APR_MUTEX, APR_INTRAPROCESS, NULL, pchild); - ts = apr_palloc(pchild, sizeof(*ts)); + ts = (thread_starter *)apr_palloc(pchild, sizeof(*ts)); apr_threadattr_create(&thread_attr, pchild); apr_threadattr_detach_set(thread_attr, 0); /* 0 means PTHREAD_CREATE_JOINABLE */ -- 2.50.1