Changes with Apache 2.0.25-dev
- *) Implement CRYPTO_set_locking_callback() in terms of apr_lock
- for mod_ssl
+
+ *) Make the worker MPM shutdown and restart cleanly. This also
+ cleans up some race conditions, and gets the worker using
+ pools more cleanly. [Aaron Bannert <aaron@clove.org>]
+
+ *) Implement CRYPTO_set_locking_callback() in terms of apr_lock
+ for mod_ssl
[Madhusudan Mathihalli <madhusudan_mathihalli@hp.com>]
*) Fix for mod_include. Ryan's patch to check error
/* Assumption: increment and decrement are atomic on int */
-int ap_increase_blanks(FDQueue *queue)
+/**
+ * Threadsafe way to increment the number of empty slots ("blanks")
+ * in the resource queue.
+ */
+int ap_increase_blanks(fd_queue_t *queue)
{
if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
return FD_QUEUE_FAILURE;
if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
return FD_QUEUE_FAILURE;
}
+
return FD_QUEUE_SUCCESS;
}
+/**
+ * Detects when the fd_queue_t is full. This utility function is expected
+ * to be called from within critical sections, and is not threadsafe.
+ */
+static int ap_queue_full(fd_queue_t *queue)
+{
+ return (queue->blanks <= 0);
+}
+
+/**
+ * Detects when the fd_queue_t is empty. This utility function is expected
+ * to be called from within critical sections, and is not threadsafe.
+ */
+static int ap_queue_empty(fd_queue_t *queue)
+{
+ /*return (queue->head == queue->tail);*/
+ return (queue->blanks >= queue->bounds - 1);
+}
+
+/**
+ * Callback routine that is called to destroy this
+ * fd_queue_t when it's pool is destroyed.
+ */
static apr_status_t ap_queue_destroy(void *data)
{
- FDQueue *queue = data;
- /* Ignore errors here, we can't do anything about them anyway */
- pthread_cond_destroy(&(queue->not_empty));
- pthread_cond_destroy(&(queue->not_full));
+ fd_queue_t *queue = data;
+
+ /* Ignore errors here, we can't do anything about them anyway.
+ * XXX: We should at least try to signal an error here, it is
+ * indicative of a programmer error. -aaron */
+ pthread_cond_destroy(&queue->not_empty);
+ pthread_cond_destroy(&queue->not_full);
pthread_mutex_destroy(&queue->one_big_mutex);
+
return FD_QUEUE_SUCCESS;
}
-int ap_queue_init(FDQueue *queue, int queue_capacity, apr_pool_t *a)
+/**
+ * Initialize the fd_queue_t.
+ */
+int ap_queue_init(fd_queue_t *queue, int queue_capacity, apr_pool_t *a)
{
int i;
- int bounds = queue_capacity + 1;
- pthread_cond_t not_empty = PTHREAD_COND_INITIALIZER;
- pthread_cond_t not_full = PTHREAD_COND_INITIALIZER;
- queue->not_empty = not_empty;
- queue->not_full = not_full;
- pthread_mutex_init(&queue->one_big_mutex, NULL);
+ int bounds;
+
+ if (pthread_mutex_init(&queue->one_big_mutex, NULL) != 0)
+ return FD_QUEUE_FAILURE;
+ if (pthread_cond_init(&queue->not_empty, NULL) != 0)
+ return FD_QUEUE_FAILURE;
+ if (pthread_cond_init(&queue->not_full, NULL) != 0)
+ return FD_QUEUE_FAILURE;
+
+ bounds = queue_capacity + 1;
queue->head = queue->tail = 0;
- queue->data = apr_palloc(a, bounds * sizeof(FDQueueElement));
+ queue->data = apr_palloc(a, bounds * sizeof(fd_queue_elem_t));
queue->bounds = bounds;
- queue->blanks = 0;
- apr_pool_cleanup_register(a, queue, ap_queue_destroy, apr_pool_cleanup_null);
- for (i=0; i < bounds; ++i)
+ queue->blanks = queue_capacity;
+
+ /* Set all the sockets in the queue to NULL */
+ for (i = 0; i < bounds; ++i)
queue->data[i].sd = NULL;
+
+ apr_pool_cleanup_register(a, queue, ap_queue_destroy, apr_pool_cleanup_null);
+
return FD_QUEUE_SUCCESS;
}
-int ap_queue_push(FDQueue *queue, apr_socket_t *sd, apr_pool_t *p)
+/**
+ * Push a new socket onto the queue. Blocks if the queue is full. Once
+ * the push operation has completed, it signals other threads waiting
+ * in apr_queue_pop() that they may continue consuming sockets.
+ */
+int ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t *p)
{
if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
return FD_QUEUE_FAILURE;
}
+
+ /* Keep waiting until we wake up and find that the queue is not full. */
+ while (ap_queue_full(queue)) {
+ pthread_cond_wait(&queue->not_full, &queue->one_big_mutex);
+ }
+
queue->data[queue->tail].sd = sd;
queue->data[queue->tail].p = p;
queue->tail = (queue->tail + 1) % queue->bounds;
queue->blanks--;
+
+ pthread_cond_signal(&queue->not_empty);
+
if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
return FD_QUEUE_FAILURE;
}
- pthread_cond_signal(&(queue->not_empty));
+
return FD_QUEUE_SUCCESS;
}
-apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p)
+/**
+ * Retrieves the next available socket from the queue. If there are no
+ * sockets available, it will block until one becomes available.
+ * Once retrieved, the socket is placed into the address specified by
+ * 'sd'.
+ */
+apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p)
{
if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
return FD_QUEUE_FAILURE;
}
- if (queue->head == queue->tail) {
- pthread_cond_wait(&(queue->not_empty), &queue->one_big_mutex);
+
+ /* Keep waiting until we wake up and find that the queue is not empty. */
+ if (ap_queue_empty(queue)) {
+ pthread_cond_wait(&queue->not_empty, &queue->one_big_mutex);
+ /* If we wake up and it's still empty, then we were interrupted */
+ if (ap_queue_empty(queue)) {
+ if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
+ return FD_QUEUE_FAILURE;
+ }
+ return FD_QUEUE_EINTR;
+ }
}
*sd = queue->data[queue->head].sd;
if (sd != NULL) {
queue->head = (queue->head + 1) % queue->bounds;
}
+ queue->blanks++;
+
+ /* we just consumed a slot, so we're no longer full */
+ pthread_cond_signal(&queue->not_full);
+
if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
return FD_QUEUE_FAILURE;
}
- if (queue->blanks > 0) {
- pthread_cond_signal(&(queue->not_full));
- }
- return APR_SUCCESS;
-}
-
-int ap_queue_size(FDQueue *queue)
-{
- return ((queue->tail - queue->head + queue->bounds) % queue->bounds);
-}
-int ap_queue_full(FDQueue *queue)
-{
- return(queue->blanks <= 0);
+ return APR_SUCCESS;
}
-int ap_block_on_queue(FDQueue *queue)
+apr_status_t ap_queue_interrupt_all(fd_queue_t *queue)
{
if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
return FD_QUEUE_FAILURE;
}
- if (ap_queue_full(queue)) {
- pthread_cond_wait(&(queue->not_full), &queue->one_big_mutex);
- }
+ pthread_cond_broadcast(&queue->not_empty);
if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
return FD_QUEUE_FAILURE;
}
return FD_QUEUE_SUCCESS;
}
-void ap_queue_signal_all_wakeup(FDQueue *queue)
-{
- pthread_cond_broadcast(&(queue->not_empty));
-}
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
+#include <apr_errno.h>
#define FD_QUEUE_SUCCESS 0
#define FD_QUEUE_FAILURE -1 /* Needs to be an invalid file descriptor because
of queue_pop semantics */
+#define FD_QUEUE_EINTR APR_EINTR
-typedef struct fd_queue_elem {
- apr_socket_t *sd;
- apr_pool_t *p;
-} FDQueueElement;
+struct fd_queue_elem_t {
+ apr_socket_t *sd;
+ apr_pool_t *p;
+};
+typedef struct fd_queue_elem_t fd_queue_elem_t;
-typedef struct fd_queue {
- int head;
- int tail;
- FDQueueElement *data;
- int bounds;
- int blanks;
- pthread_mutex_t one_big_mutex;
- pthread_cond_t not_empty;
- pthread_cond_t not_full;
-} FDQueue;
+struct fd_queue_t {
+ int head;
+ int tail;
+ fd_queue_elem_t *data;
+ int bounds;
+ int blanks;
+ pthread_mutex_t one_big_mutex;
+ pthread_cond_t not_empty;
+ pthread_cond_t not_full;
+ int cancel_state;
+};
+typedef struct fd_queue_t fd_queue_t;
-int ap_queue_init(FDQueue *queue, int queue_size, apr_pool_t *a);
-int ap_queue_push(FDQueue *queue, apr_socket_t *sd, apr_pool_t *p);
-apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p);
-int ap_queue_size(FDQueue *queue);
-int ap_queue_full(FDQueue *queue);
-int ap_block_on_queue(FDQueue *queue);
-void ap_queue_signal_all_wakeup(FDQueue *queue);
-int ap_increase_blanks(FDQueue *queue);
+int ap_queue_init(fd_queue_t *queue, int queue_capacity, apr_pool_t *a);
+int ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t *p);
+apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p);
+apr_status_t ap_queue_interrupt_all(fd_queue_t *queue);
#endif /* FDQUEUE_H */
static int requests_this_child;
static int num_listensocks = 0;
static apr_socket_t **listensocks;
-static FDQueue *worker_queue;
+static fd_queue_t *worker_queue;
/* The structure used to pass unique initialization info to each thread */
typedef struct {
int pid;
int tid;
int sd;
- apr_pool_t *tpool; /* "pthread" would be confusing */
} proc_info;
/* Structure used to pass information to the thread responsible for
static void signal_workers(void)
{
workers_may_exit = 1;
- ap_queue_signal_all_wakeup(worker_queue);
+ /* XXX: This will happen naturally on a graceful, and we don't care otherwise.
+ ap_queue_signal_all_wakeup(worker_queue); */
+ ap_queue_interrupt_all(worker_queue);
}
AP_DECLARE(apr_status_t) ap_mpm_query(int query_code, int *result)
proc_info * ti = dummy;
int process_slot = ti->pid;
int thread_slot = ti->tid;
- apr_pool_t *tpool = ti->tpool;
+ apr_pool_t *tpool = apr_thread_pool_get(thd);
apr_socket_t *csd = NULL;
apr_pool_t *ptrans; /* Pool for per-transaction stuff */
apr_socket_t *sd = NULL;
free(ti);
- apr_pool_create(&ptrans, tpool);
-
apr_lock_acquire(worker_thread_count_mutex);
worker_thread_count++;
apr_lock_release(worker_thread_count_mutex);
for(n=0 ; n <= num_listensocks ; ++n)
apr_poll_socket_add(pollset, listensocks[n], APR_POLLIN);
- worker_queue = apr_pcalloc(pchild, sizeof(*worker_queue));
- ap_queue_init(worker_queue, ap_threads_per_child, pchild);
-
/* TODO: Switch to a system where threads reuse the results from earlier
poll calls - manoj */
while (1) {
+ /* TODO: requests_this_child should be synchronized - aaron */
if (requests_this_child <= 0) {
check_infinite_requests();
}
}
got_fd:
if (!workers_may_exit) {
+ /* create a new transaction pool for each accepted socket */
+ apr_pool_create(&ptrans, tpool);
+
if ((rv = apr_accept(&csd, sd, ptrans)) != APR_SUCCESS) {
csd = NULL;
ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf,
}
if (csd != NULL) {
ap_queue_push(worker_queue, csd, ptrans);
- ap_block_on_queue(worker_queue);
}
}
else {
}
}
- apr_pool_destroy(tpool);
ap_update_child_status(process_slot, thread_slot, (dying) ? SERVER_DEAD : SERVER_GRACEFUL,
(request_rec *) NULL);
dying = 1;
ap_scoreboard_image->parent[process_slot].quiescing = 1;
kill(ap_my_pid, SIGTERM);
+/* this is uncommented when we make a pool-pool
+ apr_thread_exit(thd, APR_SUCCESS);
+*/
return NULL;
}
proc_info * ti = dummy;
int process_slot = ti->pid;
int thread_slot = ti->tid;
- apr_pool_t *tpool = ti->tpool;
apr_socket_t *csd = NULL;
apr_pool_t *ptrans; /* Pool for per-transaction stuff */
+ apr_status_t rv;
free(ti);
+ /* apr_pool_create(&ptrans, tpool); */
+
while (!workers_may_exit) {
- ap_queue_pop(worker_queue, &csd, &ptrans);
- if (!csd) {
+ rv = ap_queue_pop(worker_queue, &csd, &ptrans);
+ /* We get FD_QUEUE_EINTR whenever ap_queue_pop() has been interrupted
+ * from an explicit call to ap_queue_interrupt_all(). This allows
+ * us to unblock threads stuck in ap_queue_pop() when a shutdown
+ * is pending. */
+ if (rv == FD_QUEUE_EINTR || !csd) {
continue;
}
- ap_increase_blanks(worker_queue);
process_socket(ptrans, csd, process_slot, thread_slot);
- requests_this_child--;
- apr_pool_clear(ptrans);
+ requests_this_child--; /* FIXME: should be synchronized - aaron */
+ apr_pool_destroy(ptrans);
}
- apr_pool_destroy(tpool);
ap_update_child_status(process_slot, thread_slot, (dying) ? SERVER_DEAD : SERVER_GRACEFUL,
(request_rec *) NULL);
apr_lock_acquire(worker_thread_count_mutex);
worker_thread_count--;
apr_lock_release(worker_thread_count_mutex);
+ apr_thread_exit(thd, APR_SUCCESS);
return NULL;
}
int threads_created = 0;
apr_thread_t *listener;
+ /* We must create the fd queue before we start up the listener
+ * and worker threads. */
+ worker_queue = apr_pcalloc(pchild, sizeof(*worker_queue));
+ ap_queue_init(worker_queue, ap_threads_per_child, pchild);
+
my_info = (proc_info *)malloc(sizeof(proc_info));
my_info->pid = my_child_num;
my_info->tid = i;
my_info->sd = 0;
- apr_pool_create(&my_info->tpool, pchild);
apr_thread_create(&listener, thread_attr, listener_thread, my_info, pchild);
while (1) {
- for (i=1; i < ap_threads_per_child; i++) {
+ /* Does ap_threads_per_child include the listener thread?
+ * Why does this forloop start at 1? -aaron */
+ for (i = 1; i < ap_threads_per_child; i++) {
int status = ap_scoreboard_image->servers[child_num_arg][i].status;
if (status != SERVER_GRACEFUL && status != SERVER_DEAD) {
my_info->pid = my_child_num;
my_info->tid = i;
my_info->sd = 0;
- apr_pool_create(&my_info->tpool, pchild);
/* We are creating threads right now */
(void) ap_update_child_status(my_child_num, i, SERVER_STARTING,
* "life_status" is almost right, but it's in the worker's structure, and
* the name could be clearer. gla
*/
+ apr_thread_exit(thd, APR_SUCCESS);
return NULL;
}
apr_lock_create(&pipe_of_death_mutex, APR_MUTEX, APR_INTRAPROCESS,
NULL, pchild);
- ts = apr_palloc(pchild, sizeof(*ts));
+ ts = (thread_starter *)apr_palloc(pchild, sizeof(*ts));
apr_threadattr_create(&thread_attr, pchild);
apr_threadattr_detach_set(thread_attr, 0); /* 0 means PTHREAD_CREATE_JOINABLE */