]> granicus.if.org Git - apache/commitdiff
Get the worker MPM working again. This should fix the serialization
authorRyan Bloom <rbb@apache.org>
Sun, 5 Aug 2001 18:41:38 +0000 (18:41 +0000)
committerRyan Bloom <rbb@apache.org>
Sun, 5 Aug 2001 18:41:38 +0000 (18:41 +0000)
problems, and it makes up initialize the queue only once.

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@89930 13f79535-47bb-0310-9956-ffa450edef68

server/mpm/worker/fdqueue.c
server/mpm/worker/fdqueue.h
server/mpm/worker/worker.c

index 6868f5072c20425781743268158f692ed7b61ff4..ad7a4752f0edc3eb4c62741d1e0d3b962f1f657a 100644 (file)
  */
 
 #include "fdqueue.h"
-#include "apr_pools.h"
 
-/* Assumption: queue itself is allocated by the user */
 /* Assumption: increment and decrement are atomic on int */
 
-int ap_queue_size(FDQueue *queue) {
-    return ((queue->tail - queue->head + queue->bounds) % queue->bounds);
-}
-
-int ap_queue_full(FDQueue *queue) {
-    return(queue->blanks <= 0);
-}
-
-int ap_block_on_queue(FDQueue *queue) {
-#if 0
+int ap_increase_blanks(FDQueue *queue) 
+{
     if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
         return FD_QUEUE_FAILURE;
     }
-#endif
-    if (ap_queue_full(queue)) {
-        pthread_cond_wait(&queue->not_full, &queue->one_big_mutex);
-    }
-#if 0
+    queue->blanks++;
     if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
         return FD_QUEUE_FAILURE;
     }
-#endif
-    return FD_QUEUE_SUCCESS;
-}
-
-static int increase_blanks(FDQueue *queue) {
-    queue->blanks++;
     return FD_QUEUE_SUCCESS;
 }
 
-static apr_status_t ap_queue_destroy(void *data) {
+static apr_status_t ap_queue_destroy(void *data) 
+{
     FDQueue *queue = data;
     /* Ignore errors here, we can't do anything about them anyway */
-    pthread_cond_destroy(&queue->not_empty);
-    pthread_cond_destroy(&queue->not_full);
+    pthread_cond_destroy(&(queue->not_empty));
+    pthread_cond_destroy(&(queue->not_full));
     pthread_mutex_destroy(&queue->one_big_mutex);
     return FD_QUEUE_SUCCESS;
 }
 
-int ap_queue_init(FDQueue *queue, int queue_capacity, apr_pool_t *a) {
+int ap_queue_init(FDQueue *queue, int queue_capacity, apr_pool_t *a) 
+{
     int i;
     int bounds = queue_capacity + 1;
+    pthread_cond_t not_empty = PTHREAD_COND_INITIALIZER;
+    pthread_cond_t not_full = PTHREAD_COND_INITIALIZER;
+    queue->not_empty = not_empty;
+    queue->not_full = not_full;
     pthread_mutex_init(&queue->one_big_mutex, NULL);
-    pthread_cond_init(&queue->not_empty, NULL);
-    pthread_cond_init(&queue->not_full, NULL);
     queue->head = queue->tail = 0;
     queue->data = apr_palloc(a, bounds * sizeof(FDQueueElement));
     queue->bounds = bounds;
-    queue->blanks = queue_capacity;
+    queue->blanks = 0;
     apr_pool_cleanup_register(a, queue, ap_queue_destroy, apr_pool_cleanup_null);
     for (i=0; i < bounds; ++i)
         queue->data[i].sd = NULL;
     return FD_QUEUE_SUCCESS;
 }
 
-int ap_queue_push(FDQueue *queue, apr_socket_t *sd, apr_pool_t *p) {
+int ap_queue_push(FDQueue *queue, apr_socket_t *sd, apr_pool_t *p) 
+{
+    if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
+        return FD_QUEUE_FAILURE;
+    }
     queue->data[queue->tail].sd = sd;
-    queue->data[queue->tail].p  = p;
+    queue->data[queue->tail].p = p;
     queue->tail = (queue->tail + 1) % queue->bounds;
     queue->blanks--;
-    pthread_cond_signal(&queue->not_empty);
-#if 0
-    if (queue->head == (queue->tail + 1) % queue->bounds) {
-#endif
-    if (ap_queue_full(queue)) {
-        pthread_cond_wait(&queue->not_full, &queue->one_big_mutex);
+    if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
+        return FD_QUEUE_FAILURE;
     }
+    pthread_cond_signal(&(queue->not_empty));
     return FD_QUEUE_SUCCESS;
 }
 
-apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p, int block_if_empty) {
-    increase_blanks(queue);
-    /* We have just removed one from the queue.  By definition, it is
-     * no longer full.  We can ALWAYS signal the listener thread at
-     * this point.  However, the original code didn't do it this way,
-     * so I am leaving the original code in, just commented out.  BTW,
-     * originally, the increase_blanks wasn't in this function either.
-     *
-     if (queue->blanks > 0) {
-     */
-    pthread_cond_signal(&queue->not_full);
-
-    /*    }    */
+apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p) 
+{
+    if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
+        return FD_QUEUE_FAILURE;
+    }
     if (queue->head == queue->tail) {
-        if (block_if_empty) {
-            pthread_cond_wait(&queue->not_empty, &queue->one_big_mutex);
-fprintf(stderr, "Found a non-empty queue  :-)\n");
-        }
+        pthread_cond_wait(&(queue->not_empty), &queue->one_big_mutex);
     } 
     
     *sd = queue->data[queue->head].sd;
-    *p  = queue->data[queue->head].p;
+    *p = queue->data[queue->head].p;
     queue->data[queue->head].sd = NULL;
-    if (*sd != NULL) {
+    queue->data[queue->head].p = NULL;
+    if (sd != NULL) {
         queue->head = (queue->head + 1) % queue->bounds;
     }
+    if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
+        return FD_QUEUE_FAILURE;
+    }
+    if (queue->blanks > 0) {
+        pthread_cond_signal(&(queue->not_full));
+    }
     return APR_SUCCESS;
 }
 
+int ap_queue_size(FDQueue *queue) 
+{
+    return ((queue->tail - queue->head + queue->bounds) % queue->bounds);
+}
+
+int ap_queue_full(FDQueue *queue) 
+{
+    return(queue->blanks <= 0);
+}
+
+int ap_block_on_queue(FDQueue *queue) 
+{
+    if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
+        return FD_QUEUE_FAILURE;
+    }
+    if (ap_queue_full(queue)) {
+        pthread_cond_wait(&(queue->not_full), &queue->one_big_mutex);
+    }
+    if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
+        return FD_QUEUE_FAILURE;
+    }
+    return FD_QUEUE_SUCCESS;
+}
+
 void ap_queue_signal_all_wakeup(FDQueue *queue)
 {
-fprintf(stderr, "trying to broadcast to all workers\n");
-    pthread_cond_broadcast(&queue->not_empty);
+    pthread_cond_broadcast(&(queue->not_empty));
 }
index 669e4f430995b3919704419c874ee5b16e82f31d..a253bbfb287ac90cfab6f8c1445afeb874b80532 100644 (file)
@@ -87,10 +87,11 @@ typedef struct fd_queue {
 
 int ap_queue_init(FDQueue *queue, int queue_size, apr_pool_t *a);
 int ap_queue_push(FDQueue *queue, apr_socket_t *sd, apr_pool_t *p);
-apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p, int block_if_empty);
+apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p);
 int ap_queue_size(FDQueue *queue);
 int ap_queue_full(FDQueue *queue);
 int ap_block_on_queue(FDQueue *queue);
 void ap_queue_signal_all_wakeup(FDQueue *queue);
+int ap_increase_blanks(FDQueue *queue);
 
 #endif /* FDQUEUE_H */
index b272342293739f43811b9f19475404ae67b70158..9503611a7c05113d3abd85c1aaba5e7641cbc614 100644 (file)
@@ -510,7 +510,6 @@ static void check_infinite_requests(void)
 /* Sets workers_may_exit if we received a character on the pipe_of_death */
 static void check_pipe_of_death(void)
 {
-fprintf(stderr, "looking at pipe of death\n");
     apr_lock_acquire(pipe_of_death_mutex);
     if (!workers_may_exit) {
         apr_status_t ret;
@@ -684,7 +683,8 @@ static void *worker_thread(apr_thread_t *thd, void * dummy)
     free(ti);
 
     while (!workers_may_exit) {
-        ap_queue_pop(worker_queue, &csd, &ptrans, 1);
+        ap_queue_pop(worker_queue, &csd, &ptrans);
+        ap_increase_blanks(worker_queue);
         process_socket(ptrans, csd, process_slot, thread_slot);
         requests_this_child--;
         apr_pool_clear(ptrans);
@@ -729,21 +729,21 @@ static void *start_threads(apr_thread_t *thd, void * dummy)
     apr_thread_t **threads = ts->threads;
     apr_threadattr_t *thread_attr = ts->threadattr;
     int child_num_arg = ts->child_num_arg;
-    int i;
     int my_child_num = child_num_arg;
     proc_info *my_info = NULL;
     apr_status_t rv;
+    int i = 0;
     int threads_created = 0;
     apr_thread_t *listener;
 
+    my_info = (proc_info *)malloc(sizeof(proc_info));
+    my_info->pid = my_child_num;
+    my_info->tid = i;
+    my_info->sd = 0;
+    apr_pool_create(&my_info->tpool, pchild);
+    apr_thread_create(&listener, thread_attr, listener_thread, my_info, pchild);
     while (1) {
-        my_info = (proc_info *)malloc(sizeof(proc_info));
-        my_info->pid = my_child_num;
-        my_info->tid = i;
-        my_info->sd = 0;
-        apr_pool_create(&my_info->tpool, pchild);
-       apr_thread_create(&listener, thread_attr, listener_thread, my_info, pchild);
-        for (i=0; i < ap_threads_per_child; i++) {
+        for (i=1; i < ap_threads_per_child; i++) {
             int status = ap_scoreboard_image->servers[child_num_arg][i].status;
 
             if (status != SERVER_GRACEFUL && status != SERVER_DEAD) {