* Detects when the fd_queue_t is full. This utility function is expected
* to be called from within critical sections, and is not threadsafe.
*/
-static int ap_queue_full(fd_queue_t *queue)
-{
- return (queue->blanks <= 0);
-}
+#define ap_queue_full(queue) ((queue)->tail == (queue)->bounds)
/**
* Detects when the fd_queue_t is empty. This utility function is expected
* to be called from within critical sections, and is not threadsafe.
*/
-static int ap_queue_empty(fd_queue_t *queue)
-{
- return (queue->blanks >= queue->bounds - 1);
-}
+#define ap_queue_empty(queue) ((queue)->tail == 0)
/**
* Callback routine that is called to destroy this
* XXX: We should at least try to signal an error here, it is
* indicative of a programmer error. -aaron */
pthread_cond_destroy(&queue->not_empty);
+ pthread_cond_destroy(&queue->not_full);
pthread_mutex_destroy(&queue->one_big_mutex);
return FD_QUEUE_SUCCESS;
int ap_queue_init(fd_queue_t *queue, int queue_capacity, apr_pool_t *a)
{
int i;
- int bounds;
if (pthread_mutex_init(&queue->one_big_mutex, NULL) != 0)
return FD_QUEUE_FAILURE;
if (pthread_cond_init(&queue->not_empty, NULL) != 0)
return FD_QUEUE_FAILURE;
+ if (pthread_cond_init(&queue->not_full, NULL) != 0)
+ return FD_QUEUE_FAILURE;
- bounds = queue_capacity + 1;
queue->tail = 0;
- queue->data = apr_palloc(a, bounds * sizeof(fd_queue_elem_t));
- queue->bounds = bounds;
- queue->blanks = queue_capacity;
+ queue->data = apr_palloc(a, queue_capacity * sizeof(fd_queue_elem_t));
+ queue->bounds = queue_capacity;
/* Set all the sockets in the queue to NULL */
- for (i = 0; i < bounds; ++i)
+ for (i = 0; i < queue_capacity; ++i)
queue->data[i].sd = NULL;
apr_pool_cleanup_register(a, queue, ap_queue_destroy, apr_pool_cleanup_null);
*/
int ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t *p)
{
+ fd_queue_elem_t *elem;
+
if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
return FD_QUEUE_FAILURE;
}
- /* If the caller didn't allocate enough slots and tries to push
- * too many, too bad. */
- if (ap_queue_full(queue)) {
- if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
- return FD_QUEUE_FAILURE;
- }
- return FD_QUEUE_OVERFLOW;
+ while (ap_queue_full(queue)) {
+ pthread_cond_wait(&queue->not_full, &queue->one_big_mutex);
}
- queue->data[queue->tail].sd = sd;
- queue->data[queue->tail].p = p;
- queue->tail++;
- queue->blanks--;
+ elem = &queue->data[queue->tail++];
+ elem->sd = sd;
+ elem->p = p;
pthread_cond_signal(&queue->not_empty);
}
}
- queue->tail--;
- elem = &queue->data[queue->tail];
+ elem = &queue->data[--queue->tail];
*sd = elem->sd;
*p = elem->p;
elem->sd = NULL;
elem->p = NULL;
- queue->blanks++;
+
+ /* signal not_full if we were full before this pop */
+ if (queue->tail == queue->bounds - 1) {
+ pthread_cond_signal(&queue->not_full);
+ }
if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
return FD_QUEUE_FAILURE;
return FD_QUEUE_FAILURE;
}
pthread_cond_broadcast(&queue->not_empty);
+ /* We shouldn't have multiple threads sitting in not_full, but
+ * broadcast just in case. */
+ pthread_cond_broadcast(&queue->not_full);
if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
return FD_QUEUE_FAILURE;
}