From 643dfa0c949ff64e864f23e126ffeeeea60ff830 Mon Sep 17 00:00:00 2001 From: Ryan Bloom Date: Sun, 5 Aug 2001 18:41:38 +0000 Subject: [PATCH] Get the worker MPM working again. This should fix the serialization problems, and it makes up initialize the queue only once. git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@89930 13f79535-47bb-0310-9956-ffa450edef68 --- server/mpm/worker/fdqueue.c | 122 +++++++++++++++++++----------------- server/mpm/worker/fdqueue.h | 3 +- server/mpm/worker/worker.c | 20 +++--- 3 files changed, 75 insertions(+), 70 deletions(-) diff --git a/server/mpm/worker/fdqueue.c b/server/mpm/worker/fdqueue.c index 6868f5072c..ad7a4752f0 100644 --- a/server/mpm/worker/fdqueue.c +++ b/server/mpm/worker/fdqueue.c @@ -57,112 +57,116 @@ */ #include "fdqueue.h" -#include "apr_pools.h" -/* Assumption: queue itself is allocated by the user */ /* Assumption: increment and decrement are atomic on int */ -int ap_queue_size(FDQueue *queue) { - return ((queue->tail - queue->head + queue->bounds) % queue->bounds); -} - -int ap_queue_full(FDQueue *queue) { - return(queue->blanks <= 0); -} - -int ap_block_on_queue(FDQueue *queue) { -#if 0 +int ap_increase_blanks(FDQueue *queue) +{ if (pthread_mutex_lock(&queue->one_big_mutex) != 0) { return FD_QUEUE_FAILURE; } -#endif - if (ap_queue_full(queue)) { - pthread_cond_wait(&queue->not_full, &queue->one_big_mutex); - } -#if 0 + queue->blanks++; if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) { return FD_QUEUE_FAILURE; } -#endif - return FD_QUEUE_SUCCESS; -} - -static int increase_blanks(FDQueue *queue) { - queue->blanks++; return FD_QUEUE_SUCCESS; } -static apr_status_t ap_queue_destroy(void *data) { +static apr_status_t ap_queue_destroy(void *data) +{ FDQueue *queue = data; /* Ignore errors here, we can't do anything about them anyway */ - pthread_cond_destroy(&queue->not_empty); - pthread_cond_destroy(&queue->not_full); + pthread_cond_destroy(&(queue->not_empty)); + pthread_cond_destroy(&(queue->not_full)); pthread_mutex_destroy(&queue->one_big_mutex); return FD_QUEUE_SUCCESS; } -int ap_queue_init(FDQueue *queue, int queue_capacity, apr_pool_t *a) { +int ap_queue_init(FDQueue *queue, int queue_capacity, apr_pool_t *a) +{ int i; int bounds = queue_capacity + 1; + pthread_cond_t not_empty = PTHREAD_COND_INITIALIZER; + pthread_cond_t not_full = PTHREAD_COND_INITIALIZER; + queue->not_empty = not_empty; + queue->not_full = not_full; pthread_mutex_init(&queue->one_big_mutex, NULL); - pthread_cond_init(&queue->not_empty, NULL); - pthread_cond_init(&queue->not_full, NULL); queue->head = queue->tail = 0; queue->data = apr_palloc(a, bounds * sizeof(FDQueueElement)); queue->bounds = bounds; - queue->blanks = queue_capacity; + queue->blanks = 0; apr_pool_cleanup_register(a, queue, ap_queue_destroy, apr_pool_cleanup_null); for (i=0; i < bounds; ++i) queue->data[i].sd = NULL; return FD_QUEUE_SUCCESS; } -int ap_queue_push(FDQueue *queue, apr_socket_t *sd, apr_pool_t *p) { +int ap_queue_push(FDQueue *queue, apr_socket_t *sd, apr_pool_t *p) +{ + if (pthread_mutex_lock(&queue->one_big_mutex) != 0) { + return FD_QUEUE_FAILURE; + } queue->data[queue->tail].sd = sd; - queue->data[queue->tail].p = p; + queue->data[queue->tail].p = p; queue->tail = (queue->tail + 1) % queue->bounds; queue->blanks--; - pthread_cond_signal(&queue->not_empty); -#if 0 - if (queue->head == (queue->tail + 1) % queue->bounds) { -#endif - if (ap_queue_full(queue)) { - pthread_cond_wait(&queue->not_full, &queue->one_big_mutex); + if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) { + return FD_QUEUE_FAILURE; } + pthread_cond_signal(&(queue->not_empty)); return FD_QUEUE_SUCCESS; } -apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p, int block_if_empty) { - increase_blanks(queue); - /* We have just removed one from the queue. By definition, it is - * no longer full. We can ALWAYS signal the listener thread at - * this point. However, the original code didn't do it this way, - * so I am leaving the original code in, just commented out. BTW, - * originally, the increase_blanks wasn't in this function either. - * - if (queue->blanks > 0) { - */ - pthread_cond_signal(&queue->not_full); - - /* } */ +apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p) +{ + if (pthread_mutex_lock(&queue->one_big_mutex) != 0) { + return FD_QUEUE_FAILURE; + } if (queue->head == queue->tail) { - if (block_if_empty) { - pthread_cond_wait(&queue->not_empty, &queue->one_big_mutex); -fprintf(stderr, "Found a non-empty queue :-)\n"); - } + pthread_cond_wait(&(queue->not_empty), &queue->one_big_mutex); } *sd = queue->data[queue->head].sd; - *p = queue->data[queue->head].p; + *p = queue->data[queue->head].p; queue->data[queue->head].sd = NULL; - if (*sd != NULL) { + queue->data[queue->head].p = NULL; + if (sd != NULL) { queue->head = (queue->head + 1) % queue->bounds; } + if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) { + return FD_QUEUE_FAILURE; + } + if (queue->blanks > 0) { + pthread_cond_signal(&(queue->not_full)); + } return APR_SUCCESS; } +int ap_queue_size(FDQueue *queue) +{ + return ((queue->tail - queue->head + queue->bounds) % queue->bounds); +} + +int ap_queue_full(FDQueue *queue) +{ + return(queue->blanks <= 0); +} + +int ap_block_on_queue(FDQueue *queue) +{ + if (pthread_mutex_lock(&queue->one_big_mutex) != 0) { + return FD_QUEUE_FAILURE; + } + if (ap_queue_full(queue)) { + pthread_cond_wait(&(queue->not_full), &queue->one_big_mutex); + } + if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) { + return FD_QUEUE_FAILURE; + } + return FD_QUEUE_SUCCESS; +} + void ap_queue_signal_all_wakeup(FDQueue *queue) { -fprintf(stderr, "trying to broadcast to all workers\n"); - pthread_cond_broadcast(&queue->not_empty); + pthread_cond_broadcast(&(queue->not_empty)); } diff --git a/server/mpm/worker/fdqueue.h b/server/mpm/worker/fdqueue.h index 669e4f4309..a253bbfb28 100644 --- a/server/mpm/worker/fdqueue.h +++ b/server/mpm/worker/fdqueue.h @@ -87,10 +87,11 @@ typedef struct fd_queue { int ap_queue_init(FDQueue *queue, int queue_size, apr_pool_t *a); int ap_queue_push(FDQueue *queue, apr_socket_t *sd, apr_pool_t *p); -apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p, int block_if_empty); +apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p); int ap_queue_size(FDQueue *queue); int ap_queue_full(FDQueue *queue); int ap_block_on_queue(FDQueue *queue); void ap_queue_signal_all_wakeup(FDQueue *queue); +int ap_increase_blanks(FDQueue *queue); #endif /* FDQUEUE_H */ diff --git a/server/mpm/worker/worker.c b/server/mpm/worker/worker.c index b272342293..9503611a7c 100644 --- a/server/mpm/worker/worker.c +++ b/server/mpm/worker/worker.c @@ -510,7 +510,6 @@ static void check_infinite_requests(void) /* Sets workers_may_exit if we received a character on the pipe_of_death */ static void check_pipe_of_death(void) { -fprintf(stderr, "looking at pipe of death\n"); apr_lock_acquire(pipe_of_death_mutex); if (!workers_may_exit) { apr_status_t ret; @@ -684,7 +683,8 @@ static void *worker_thread(apr_thread_t *thd, void * dummy) free(ti); while (!workers_may_exit) { - ap_queue_pop(worker_queue, &csd, &ptrans, 1); + ap_queue_pop(worker_queue, &csd, &ptrans); + ap_increase_blanks(worker_queue); process_socket(ptrans, csd, process_slot, thread_slot); requests_this_child--; apr_pool_clear(ptrans); @@ -729,21 +729,21 @@ static void *start_threads(apr_thread_t *thd, void * dummy) apr_thread_t **threads = ts->threads; apr_threadattr_t *thread_attr = ts->threadattr; int child_num_arg = ts->child_num_arg; - int i; int my_child_num = child_num_arg; proc_info *my_info = NULL; apr_status_t rv; + int i = 0; int threads_created = 0; apr_thread_t *listener; + my_info = (proc_info *)malloc(sizeof(proc_info)); + my_info->pid = my_child_num; + my_info->tid = i; + my_info->sd = 0; + apr_pool_create(&my_info->tpool, pchild); + apr_thread_create(&listener, thread_attr, listener_thread, my_info, pchild); while (1) { - my_info = (proc_info *)malloc(sizeof(proc_info)); - my_info->pid = my_child_num; - my_info->tid = i; - my_info->sd = 0; - apr_pool_create(&my_info->tpool, pchild); - apr_thread_create(&listener, thread_attr, listener_thread, my_info, pchild); - for (i=0; i < ap_threads_per_child; i++) { + for (i=1; i < ap_threads_per_child; i++) { int status = ap_scoreboard_image->servers[child_num_arg][i].status; if (status != SERVER_GRACEFUL && status != SERVER_DEAD) { -- 2.40.0