]> granicus.if.org Git - apache/commitdiff
Make the worker MPM shutdown and restart cleanly. This also
authorRyan Bloom <rbb@apache.org>
Fri, 24 Aug 2001 16:49:39 +0000 (16:49 +0000)
committerRyan Bloom <rbb@apache.org>
Fri, 24 Aug 2001 16:49:39 +0000 (16:49 +0000)
cleans up some race conditions, and gets the worker using
pools more cleanly.

Submitted by: [Aaron Bannert <aaron@clove.org>]

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@90635 13f79535-47bb-0310-9956-ffa450edef68

CHANGES
server/mpm/worker/fdqueue.c
server/mpm/worker/fdqueue.h
server/mpm/worker/worker.c

diff --git a/CHANGES b/CHANGES
index 820ce199941c50ef9244e6a103b7b38e58187934..52f721a95484429fd705c10acdb51f6fb6714c9e 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -1,6 +1,11 @@
 Changes with Apache 2.0.25-dev
-  *)  Implement CRYPTO_set_locking_callback() in terms of apr_lock
-      for mod_ssl
+
+  *) Make the worker MPM shutdown and restart cleanly.  This also
+     cleans up some race conditions, and gets the worker using
+     pools more cleanly.  [Aaron Bannert <aaron@clove.org>]
+
+  *) Implement CRYPTO_set_locking_callback() in terms of apr_lock
+     for mod_ssl
      [Madhusudan Mathihalli <madhusudan_mathihalli@hp.com>]
 
   *) Fix for mod_include. Ryan's patch to check error
index ad7a4752f0edc3eb4c62741d1e0d3b962f1f657a..8f6fe86d657d4c61dfe04adc9c9d021ebe8fb49e 100644 (file)
 
 /* Assumption: increment and decrement are atomic on int */
 
-int ap_increase_blanks(FDQueue *queue) 
+/**
+ * Threadsafe way to increment the number of empty slots ("blanks")
+ * in the resource queue.
+ */
+int ap_increase_blanks(fd_queue_t *queue) 
 {
     if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
         return FD_QUEUE_FAILURE;
@@ -69,61 +73,129 @@ int ap_increase_blanks(FDQueue *queue)
     if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
         return FD_QUEUE_FAILURE;
     }
+
     return FD_QUEUE_SUCCESS;
 }
 
+/**
+ * Detects when the fd_queue_t is full. This utility function is expected
+ * to be called from within critical sections, and is not threadsafe.
+ */
+static int ap_queue_full(fd_queue_t *queue)
+{
+    return (queue->blanks <= 0);
+}
+
+/**
+ * Detects when the fd_queue_t is empty. This utility function is expected
+ * to be called from within critical sections, and is not threadsafe.
+ */
+static int ap_queue_empty(fd_queue_t *queue)
+{
+    /*return (queue->head == queue->tail);*/
+    return (queue->blanks >= queue->bounds - 1);
+}
+
+/**
+ * Callback routine that is called to destroy this
+ * fd_queue_t when it's pool is destroyed.
+ */
 static apr_status_t ap_queue_destroy(void *data) 
 {
-    FDQueue *queue = data;
-    /* Ignore errors here, we can't do anything about them anyway */
-    pthread_cond_destroy(&(queue->not_empty));
-    pthread_cond_destroy(&(queue->not_full));
+    fd_queue_t *queue = data;
+
+    /* Ignore errors here, we can't do anything about them anyway.
+     * XXX: We should at least try to signal an error here, it is
+     * indicative of a programmer error. -aaron */
+    pthread_cond_destroy(&queue->not_empty);
+    pthread_cond_destroy(&queue->not_full);
     pthread_mutex_destroy(&queue->one_big_mutex);
+
     return FD_QUEUE_SUCCESS;
 }
 
-int ap_queue_init(FDQueue *queue, int queue_capacity, apr_pool_t *a) 
+/**
+ * Initialize the fd_queue_t.
+ */
+int ap_queue_init(fd_queue_t *queue, int queue_capacity, apr_pool_t *a) 
 {
     int i;
-    int bounds = queue_capacity + 1;
-    pthread_cond_t not_empty = PTHREAD_COND_INITIALIZER;
-    pthread_cond_t not_full = PTHREAD_COND_INITIALIZER;
-    queue->not_empty = not_empty;
-    queue->not_full = not_full;
-    pthread_mutex_init(&queue->one_big_mutex, NULL);
+    int bounds;
+
+    if (pthread_mutex_init(&queue->one_big_mutex, NULL) != 0)
+        return FD_QUEUE_FAILURE;
+    if (pthread_cond_init(&queue->not_empty, NULL) != 0)
+        return FD_QUEUE_FAILURE;
+    if (pthread_cond_init(&queue->not_full, NULL) != 0)
+        return FD_QUEUE_FAILURE;
+
+    bounds = queue_capacity + 1;
     queue->head = queue->tail = 0;
-    queue->data = apr_palloc(a, bounds * sizeof(FDQueueElement));
+    queue->data = apr_palloc(a, bounds * sizeof(fd_queue_elem_t));
     queue->bounds = bounds;
-    queue->blanks = 0;
-    apr_pool_cleanup_register(a, queue, ap_queue_destroy, apr_pool_cleanup_null);
-    for (i=0; i < bounds; ++i)
+    queue->blanks = queue_capacity;
+
+    /* Set all the sockets in the queue to NULL */
+    for (i = 0; i < bounds; ++i)
         queue->data[i].sd = NULL;
+
+    apr_pool_cleanup_register(a, queue, ap_queue_destroy, apr_pool_cleanup_null);
+
     return FD_QUEUE_SUCCESS;
 }
 
-int ap_queue_push(FDQueue *queue, apr_socket_t *sd, apr_pool_t *p) 
+/**
+ * Push a new socket onto the queue. Blocks if the queue is full. Once
+ * the push operation has completed, it signals other threads waiting
+ * in apr_queue_pop() that they may continue consuming sockets.
+ */
+int ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t *p) 
 {
     if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
         return FD_QUEUE_FAILURE;
     }
+
+    /* Keep waiting until we wake up and find that the queue is not full. */
+    while (ap_queue_full(queue)) {
+        pthread_cond_wait(&queue->not_full, &queue->one_big_mutex);
+    }
+
     queue->data[queue->tail].sd = sd;
     queue->data[queue->tail].p = p;
     queue->tail = (queue->tail + 1) % queue->bounds;
     queue->blanks--;
+
+    pthread_cond_signal(&queue->not_empty);
+
     if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
         return FD_QUEUE_FAILURE;
     }
-    pthread_cond_signal(&(queue->not_empty));
+
     return FD_QUEUE_SUCCESS;
 }
 
-apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p) 
+/**
+ * Retrieves the next available socket from the queue. If there are no
+ * sockets available, it will block until one becomes available.
+ * Once retrieved, the socket is placed into the address specified by
+ * 'sd'.
+ */
+apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p) 
 {
     if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
         return FD_QUEUE_FAILURE;
     }
-    if (queue->head == queue->tail) {
-        pthread_cond_wait(&(queue->not_empty), &queue->one_big_mutex);
+
+    /* Keep waiting until we wake up and find that the queue is not empty. */
+    if (ap_queue_empty(queue)) {
+        pthread_cond_wait(&queue->not_empty, &queue->one_big_mutex);
+        /* If we wake up and it's still empty, then we were interrupted */
+        if (ap_queue_empty(queue)) {
+            if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
+                return FD_QUEUE_FAILURE;
+            }
+            return FD_QUEUE_EINTR;
+        }
     } 
     
     *sd = queue->data[queue->head].sd;
@@ -133,40 +205,27 @@ apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p)
     if (sd != NULL) {
         queue->head = (queue->head + 1) % queue->bounds;
     }
+    queue->blanks++;
+
+    /* we just consumed a slot, so we're no longer full */
+    pthread_cond_signal(&queue->not_full);
+
     if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
         return FD_QUEUE_FAILURE;
     }
-    if (queue->blanks > 0) {
-        pthread_cond_signal(&(queue->not_full));
-    }
-    return APR_SUCCESS;
-}
-
-int ap_queue_size(FDQueue *queue) 
-{
-    return ((queue->tail - queue->head + queue->bounds) % queue->bounds);
-}
 
-int ap_queue_full(FDQueue *queue) 
-{
-    return(queue->blanks <= 0);
+    return APR_SUCCESS;
 }
 
-int ap_block_on_queue(FDQueue *queue) 
+apr_status_t ap_queue_interrupt_all(fd_queue_t *queue)
 {
     if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
         return FD_QUEUE_FAILURE;
     }
-    if (ap_queue_full(queue)) {
-        pthread_cond_wait(&(queue->not_full), &queue->one_big_mutex);
-    }
+    pthread_cond_broadcast(&queue->not_empty);
     if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
         return FD_QUEUE_FAILURE;
     }
     return FD_QUEUE_SUCCESS;
 }
 
-void ap_queue_signal_all_wakeup(FDQueue *queue)
-{
-    pthread_cond_broadcast(&(queue->not_empty));
-}
index c59ea254ebc8dced99d2426f1f0bdb561ffef0cc..e73181497c91833c2a8f21aef080aae746a82882 100644 (file)
 #include <pthread.h>
 #include <sys/types.h>
 #include <sys/socket.h>
+#include <apr_errno.h>
 
 #define FD_QUEUE_SUCCESS 0
 #define FD_QUEUE_FAILURE -1 /* Needs to be an invalid file descriptor because
                                of queue_pop semantics */
+#define FD_QUEUE_EINTR APR_EINTR
 
-typedef struct fd_queue_elem {
-    apr_socket_t *sd;
-    apr_pool_t *p;
-} FDQueueElement;
+struct fd_queue_elem_t {
+    apr_socket_t      *sd;
+    apr_pool_t        *p;
+};
+typedef struct fd_queue_elem_t fd_queue_elem_t;
 
-typedef struct fd_queue {
-    int head;
-    int tail;
-    FDQueueElement *data;
-    int bounds;
-    int blanks;
-    pthread_mutex_t one_big_mutex;
-    pthread_cond_t not_empty;
-    pthread_cond_t not_full;
-} FDQueue;
+struct fd_queue_t {
+    int                head;
+    int                tail;
+    fd_queue_elem_t   *data;
+    int                bounds;
+    int                blanks;
+    pthread_mutex_t    one_big_mutex;
+    pthread_cond_t     not_empty;
+    pthread_cond_t     not_full;
+    int                cancel_state;
+};
+typedef struct fd_queue_t fd_queue_t;
 
-int ap_queue_init(FDQueue *queue, int queue_size, apr_pool_t *a);
-int ap_queue_push(FDQueue *queue, apr_socket_t *sd, apr_pool_t *p);
-apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p);
-int ap_queue_size(FDQueue *queue);
-int ap_queue_full(FDQueue *queue);
-int ap_block_on_queue(FDQueue *queue);
-void ap_queue_signal_all_wakeup(FDQueue *queue);
-int ap_increase_blanks(FDQueue *queue);
+int ap_queue_init(fd_queue_t *queue, int queue_capacity, apr_pool_t *a);
+int ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t *p);
+apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p);
+apr_status_t ap_queue_interrupt_all(fd_queue_t *queue);
 
 #endif /* FDQUEUE_H */
index 76465963d483f65c88f1449f7cc1c35e3dcf3cb7..a56b1f9fff40d7f2f7be562a030983b4d5a5d4e9 100644 (file)
@@ -124,14 +124,13 @@ static int workers_may_exit = 0;
 static int requests_this_child;
 static int num_listensocks = 0;
 static apr_socket_t **listensocks;
-static FDQueue *worker_queue;
+static fd_queue_t *worker_queue;
 
 /* The structure used to pass unique initialization info to each thread */
 typedef struct {
     int pid;
     int tid;
     int sd;
-    apr_pool_t *tpool; /* "pthread" would be confusing */
 } proc_info;
 
 /* Structure used to pass information to the thread responsible for 
@@ -201,7 +200,9 @@ static const char *lock_fname;
 static void signal_workers(void)
 {
     workers_may_exit = 1;
-    ap_queue_signal_all_wakeup(worker_queue);
+    /* XXX: This will happen naturally on a graceful, and we don't care otherwise.
+    ap_queue_signal_all_wakeup(worker_queue); */
+    ap_queue_interrupt_all(worker_queue);
 }
 
 AP_DECLARE(apr_status_t) ap_mpm_query(int query_code, int *result)
@@ -553,7 +554,7 @@ static void *listener_thread(apr_thread_t *thd, void * dummy)
     proc_info * ti = dummy;
     int process_slot = ti->pid;
     int thread_slot = ti->tid;
-    apr_pool_t *tpool = ti->tpool;
+    apr_pool_t *tpool = apr_thread_pool_get(thd);
     apr_socket_t *csd = NULL;
     apr_pool_t *ptrans;                /* Pool for per-transaction stuff */
     apr_socket_t *sd = NULL;
@@ -564,8 +565,6 @@ static void *listener_thread(apr_thread_t *thd, void * dummy)
 
     free(ti);
 
-    apr_pool_create(&ptrans, tpool);
-
     apr_lock_acquire(worker_thread_count_mutex);
     worker_thread_count++;
     apr_lock_release(worker_thread_count_mutex);
@@ -574,12 +573,10 @@ static void *listener_thread(apr_thread_t *thd, void * dummy)
     for(n=0 ; n <= num_listensocks ; ++n)
        apr_poll_socket_add(pollset, listensocks[n], APR_POLLIN);
 
-    worker_queue = apr_pcalloc(pchild, sizeof(*worker_queue));
-    ap_queue_init(worker_queue, ap_threads_per_child, pchild);
-
     /* TODO: Switch to a system where threads reuse the results from earlier
        poll calls - manoj */
     while (1) {
+        /* TODO: requests_this_child should be synchronized - aaron */
         if (requests_this_child <= 0) {
             check_infinite_requests();
         }
@@ -644,6 +641,9 @@ static void *listener_thread(apr_thread_t *thd, void * dummy)
         }
     got_fd:
         if (!workers_may_exit) {
+            /* create a new transaction pool for each accepted socket */
+            apr_pool_create(&ptrans, tpool);
+
             if ((rv = apr_accept(&csd, sd, ptrans)) != APR_SUCCESS) {
                 csd = NULL;
                 ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, 
@@ -658,7 +658,6 @@ static void *listener_thread(apr_thread_t *thd, void * dummy)
             }
             if (csd != NULL) {
                 ap_queue_push(worker_queue, csd, ptrans);
-                ap_block_on_queue(worker_queue);
             }
         }
         else {
@@ -673,13 +672,15 @@ static void *listener_thread(apr_thread_t *thd, void * dummy)
         }
     }
 
-    apr_pool_destroy(tpool);
     ap_update_child_status(process_slot, thread_slot, (dying) ? SERVER_DEAD : SERVER_GRACEFUL,
         (request_rec *) NULL);
     dying = 1;
     ap_scoreboard_image->parent[process_slot].quiescing = 1;
     kill(ap_my_pid, SIGTERM);
 
+/* this is uncommented when we make a pool-pool
+    apr_thread_exit(thd, APR_SUCCESS);
+*/
     return NULL;
 }
 
@@ -688,30 +689,35 @@ static void *worker_thread(apr_thread_t *thd, void * dummy)
     proc_info * ti = dummy;
     int process_slot = ti->pid;
     int thread_slot = ti->tid;
-    apr_pool_t *tpool = ti->tpool;
     apr_socket_t *csd = NULL;
     apr_pool_t *ptrans;                /* Pool for per-transaction stuff */
+    apr_status_t rv;
 
     free(ti);
 
+    /* apr_pool_create(&ptrans, tpool); */
+
     while (!workers_may_exit) {
-        ap_queue_pop(worker_queue, &csd, &ptrans);
-        if (!csd) {
+        rv = ap_queue_pop(worker_queue, &csd, &ptrans);
+        /* We get FD_QUEUE_EINTR whenever ap_queue_pop() has been interrupted
+         * from an explicit call to ap_queue_interrupt_all(). This allows
+         * us to unblock threads stuck in ap_queue_pop() when a shutdown
+         * is pending. */
+        if (rv == FD_QUEUE_EINTR || !csd) {
             continue;
         }
-        ap_increase_blanks(worker_queue);
         process_socket(ptrans, csd, process_slot, thread_slot);
-        requests_this_child--;
-        apr_pool_clear(ptrans);
+        requests_this_child--; /* FIXME: should be synchronized - aaron */
+        apr_pool_destroy(ptrans);
     }
 
-    apr_pool_destroy(tpool);
     ap_update_child_status(process_slot, thread_slot, (dying) ? SERVER_DEAD : SERVER_GRACEFUL,
         (request_rec *) NULL);
     apr_lock_acquire(worker_thread_count_mutex);
     worker_thread_count--;
     apr_lock_release(worker_thread_count_mutex);
 
+    apr_thread_exit(thd, APR_SUCCESS);
     return NULL;
 }
 
@@ -738,14 +744,20 @@ static void *start_threads(apr_thread_t *thd, void * dummy)
     int threads_created = 0;
     apr_thread_t *listener;
 
+    /* We must create the fd queue before we start up the listener
+     * and worker threads. */
+    worker_queue = apr_pcalloc(pchild, sizeof(*worker_queue));
+    ap_queue_init(worker_queue, ap_threads_per_child, pchild);
+
     my_info = (proc_info *)malloc(sizeof(proc_info));
     my_info->pid = my_child_num;
     my_info->tid = i;
     my_info->sd = 0;
-    apr_pool_create(&my_info->tpool, pchild);
     apr_thread_create(&listener, thread_attr, listener_thread, my_info, pchild);
     while (1) {
-        for (i=1; i < ap_threads_per_child; i++) {
+        /* Does ap_threads_per_child include the listener thread?
+         * Why does this forloop start at 1? -aaron */
+        for (i = 1; i < ap_threads_per_child; i++) {
             int status = ap_scoreboard_image->servers[child_num_arg][i].status;
 
             if (status != SERVER_GRACEFUL && status != SERVER_DEAD) {
@@ -761,7 +773,6 @@ static void *start_threads(apr_thread_t *thd, void * dummy)
            my_info->pid = my_child_num;
             my_info->tid = i;
            my_info->sd = 0;
-           apr_pool_create(&my_info->tpool, pchild);
        
            /* We are creating threads right now */
            (void) ap_update_child_status(my_child_num, i, SERVER_STARTING, 
@@ -794,6 +805,7 @@ static void *start_threads(apr_thread_t *thd, void * dummy)
      *  "life_status" is almost right, but it's in the worker's structure, and 
      *  the name could be clearer.   gla
      */
+    apr_thread_exit(thd, APR_SUCCESS);
     return NULL;
 }
 
@@ -870,7 +882,7 @@ static void child_main(int child_num_arg)
     apr_lock_create(&pipe_of_death_mutex, APR_MUTEX, APR_INTRAPROCESS, 
                     NULL, pchild);
 
-    ts = apr_palloc(pchild, sizeof(*ts));
+    ts = (thread_starter *)apr_palloc(pchild, sizeof(*ts));
 
     apr_threadattr_create(&thread_attr, pchild);
     apr_threadattr_detach_set(thread_attr, 0);    /* 0 means PTHREAD_CREATE_JOINABLE */