]> granicus.if.org Git - apache/commitdiff
mpm_worker: follow up to r1821624.
authorYann Ylavic <ylavic@apache.org>
Fri, 19 Jan 2018 12:29:18 +0000 (12:29 +0000)
committerYann Ylavic <ylavic@apache.org>
Fri, 19 Jan 2018 12:29:18 +0000 (12:29 +0000)
Use common [mpm_]fdqueue.

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1821639 13f79535-47bb-0310-9956-ffa450edef68

server/mpm/worker/config3.m4
server/mpm/worker/fdqueue.c [deleted file]
server/mpm/worker/fdqueue.h [deleted file]
server/mpm/worker/worker.c

index 68d861f49470f9a243c3b749b86be72b7189cf24..f21817a4a8d1363377fe925dc12b885700118969 100644 (file)
@@ -1,7 +1,7 @@
 APACHE_MPMPATH_INIT(worker)
 
 dnl ## XXX - Need a more thorough check of the proper flags to use
-APACHE_MPM_MODULE(worker, $enable_mpm_worker, worker.lo fdqueue.lo,[
+APACHE_MPM_MODULE(worker, $enable_mpm_worker, worker.lo,[
     AC_CHECK_FUNCS(pthread_kill)
 ])
 
diff --git a/server/mpm/worker/fdqueue.c b/server/mpm/worker/fdqueue.c
deleted file mode 100644 (file)
index 3cb1310..0000000
+++ /dev/null
@@ -1,401 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "fdqueue.h"
-#include "apr_atomic.h"
-
-typedef struct recycled_pool {
-    apr_pool_t *pool;
-    struct recycled_pool *next;
-} recycled_pool;
-
-struct fd_queue_info_t {
-    volatile apr_uint32_t idlers;
-    apr_thread_mutex_t *idlers_mutex;
-    apr_thread_cond_t *wait_for_idler;
-    int terminated;
-    int max_idlers;
-    recycled_pool  *recycled_pools;
-};
-
-static apr_status_t queue_info_cleanup(void *data_)
-{
-    fd_queue_info_t *qi = data_;
-    apr_thread_cond_destroy(qi->wait_for_idler);
-    apr_thread_mutex_destroy(qi->idlers_mutex);
-
-    /* Clean up any pools in the recycled list */
-    for (;;) {
-        struct recycled_pool *first_pool = qi->recycled_pools;
-        if (first_pool == NULL) {
-            break;
-        }
-        if (apr_atomic_casptr((void*)&(qi->recycled_pools), first_pool->next,
-                              first_pool) == first_pool) {
-            apr_pool_destroy(first_pool->pool);
-        }
-    }
-
-    return APR_SUCCESS;
-}
-
-apr_status_t ap_queue_info_create(fd_queue_info_t **queue_info,
-                                  apr_pool_t *pool, int max_idlers)
-{
-    apr_status_t rv;
-    fd_queue_info_t *qi;
-
-    qi = apr_pcalloc(pool, sizeof(*qi));
-
-    rv = apr_thread_mutex_create(&qi->idlers_mutex, APR_THREAD_MUTEX_DEFAULT,
-                                 pool);
-    if (rv != APR_SUCCESS) {
-        return rv;
-    }
-    rv = apr_thread_cond_create(&qi->wait_for_idler, pool);
-    if (rv != APR_SUCCESS) {
-        return rv;
-    }
-    qi->recycled_pools = NULL;
-    qi->max_idlers = max_idlers;
-    apr_pool_cleanup_register(pool, qi, queue_info_cleanup,
-                              apr_pool_cleanup_null);
-
-    *queue_info = qi;
-
-    return APR_SUCCESS;
-}
-
-apr_status_t ap_queue_info_set_idle(fd_queue_info_t *queue_info,
-                                    apr_pool_t *pool_to_recycle)
-{
-    apr_status_t rv;
-
-    /* If we have been given a pool to recycle, atomically link
-     * it into the queue_info's list of recycled pools
-     */
-    if (pool_to_recycle) {
-        struct recycled_pool *new_recycle;
-        new_recycle = (struct recycled_pool *)apr_palloc(pool_to_recycle,
-                                                         sizeof(*new_recycle));
-        new_recycle->pool = pool_to_recycle;
-        for (;;) {
-            /* Save queue_info->recycled_pool in local variable next because
-             * new_recycle->next can be changed after apr_atomic_casptr
-             * function call. For gory details see PR 44402.
-             */
-            struct recycled_pool *next = queue_info->recycled_pools;
-            new_recycle->next = next;
-            if (apr_atomic_casptr((void*)&(queue_info->recycled_pools),
-                                  new_recycle, next) == next) {
-                break;
-            }
-        }
-    }
-
-    /* If this thread makes the idle worker count nonzero,
-     * wake up the listener. */
-    if (apr_atomic_inc32(&queue_info->idlers) == 0) {
-        rv = apr_thread_mutex_lock(queue_info->idlers_mutex);
-        if (rv != APR_SUCCESS) {
-            return rv;
-        }
-        rv = apr_thread_cond_signal(queue_info->wait_for_idler);
-        if (rv != APR_SUCCESS) {
-            apr_thread_mutex_unlock(queue_info->idlers_mutex);
-            return rv;
-        }
-        rv = apr_thread_mutex_unlock(queue_info->idlers_mutex);
-        if (rv != APR_SUCCESS) {
-            return rv;
-        }
-    }
-
-    return APR_SUCCESS;
-}
-
-apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t *queue_info,
-                                          apr_pool_t **recycled_pool)
-{
-    apr_status_t rv;
-
-    *recycled_pool = NULL;
-
-    /* Block if the count of idle workers is zero */
-    if (queue_info->idlers == 0) {
-        rv = apr_thread_mutex_lock(queue_info->idlers_mutex);
-        if (rv != APR_SUCCESS) {
-            return rv;
-        }
-        /* Re-check the idle worker count to guard against a
-         * race condition.  Now that we're in the mutex-protected
-         * region, one of two things may have happened:
-         *   - If the idle worker count is still zero, the
-         *     workers are all still busy, so it's safe to
-         *     block on a condition variable, BUT
-         *     we need to check for idle worker count again
-         *     when we are signaled since it can happen that
-         *     we are signaled by a worker thread that went idle
-         *     but received a context switch before it could
-         *     tell us. If it does signal us later once it is on
-         *     CPU again there might be no idle worker left.
-         *     See
-         *     https://issues.apache.org/bugzilla/show_bug.cgi?id=45605#c4
-         *   - If the idle worker count is nonzero, then a
-         *     worker has become idle since the first check
-         *     of queue_info->idlers above.  It's possible
-         *     that the worker has also signaled the condition
-         *     variable--and if so, the listener missed it
-         *     because it wasn't yet blocked on the condition
-         *     variable.  But if the idle worker count is
-         *     now nonzero, it's safe for this function to
-         *     return immediately.
-         */
-        while (queue_info->idlers == 0) {
-            rv = apr_thread_cond_wait(queue_info->wait_for_idler,
-                                  queue_info->idlers_mutex);
-            if (rv != APR_SUCCESS) {
-                apr_status_t rv2;
-                rv2 = apr_thread_mutex_unlock(queue_info->idlers_mutex);
-                if (rv2 != APR_SUCCESS) {
-                    return rv2;
-                }
-                return rv;
-            }
-        }
-        rv = apr_thread_mutex_unlock(queue_info->idlers_mutex);
-        if (rv != APR_SUCCESS) {
-            return rv;
-        }
-    }
-
-    /* Atomically decrement the idle worker count */
-    apr_atomic_dec32(&(queue_info->idlers));
-
-    /* Atomically pop a pool from the recycled list */
-
-    /* This function is safe only as long as it is single threaded because
-     * it reaches into the queue and accesses "next" which can change.
-     * We are OK today because it is only called from the listener thread.
-     * cas-based pushes do not have the same limitation - any number can
-     * happen concurrently with a single cas-based pop.
-     */
-
-    for (;;) {
-        struct recycled_pool *first_pool = queue_info->recycled_pools;
-        if (first_pool == NULL) {
-            break;
-        }
-        if (apr_atomic_casptr((void*)&(queue_info->recycled_pools), first_pool->next,
-                              first_pool) == first_pool) {
-            *recycled_pool = first_pool->pool;
-            break;
-        }
-    }
-
-    if (queue_info->terminated) {
-        return APR_EOF;
-    }
-    else {
-        return APR_SUCCESS;
-    }
-}
-
-apr_status_t ap_queue_info_term(fd_queue_info_t *queue_info)
-{
-    apr_status_t rv;
-    rv = apr_thread_mutex_lock(queue_info->idlers_mutex);
-    if (rv != APR_SUCCESS) {
-        return rv;
-    }
-    queue_info->terminated = 1;
-    apr_thread_cond_broadcast(queue_info->wait_for_idler);
-    return apr_thread_mutex_unlock(queue_info->idlers_mutex);
-}
-
-/**
- * Detects when the fd_queue_t is full. This utility function is expected
- * to be called from within critical sections, and is not threadsafe.
- */
-#define ap_queue_full(queue) ((queue)->nelts == (queue)->bounds)
-
-/**
- * Detects when the fd_queue_t is empty. This utility function is expected
- * to be called from within critical sections, and is not threadsafe.
- */
-#define ap_queue_empty(queue) ((queue)->nelts == 0)
-
-/**
- * Callback routine that is called to destroy this
- * fd_queue_t when its pool is destroyed.
- */
-static apr_status_t ap_queue_destroy(void *data)
-{
-    fd_queue_t *queue = data;
-
-    /* Ignore errors here, we can't do anything about them anyway.
-     * XXX: We should at least try to signal an error here, it is
-     * indicative of a programmer error. -aaron */
-    apr_thread_cond_destroy(queue->not_empty);
-    apr_thread_mutex_destroy(queue->one_big_mutex);
-
-    return APR_SUCCESS;
-}
-
-/**
- * Initialize the fd_queue_t.
- */
-apr_status_t ap_queue_init(fd_queue_t *queue, int queue_capacity, apr_pool_t *a)
-{
-    int i;
-    apr_status_t rv;
-
-    if ((rv = apr_thread_mutex_create(&queue->one_big_mutex,
-                                      APR_THREAD_MUTEX_DEFAULT, a)) != APR_SUCCESS) {
-        return rv;
-    }
-    if ((rv = apr_thread_cond_create(&queue->not_empty, a)) != APR_SUCCESS) {
-        return rv;
-    }
-
-    queue->data = apr_palloc(a, queue_capacity * sizeof(fd_queue_elem_t));
-    queue->bounds = queue_capacity;
-    queue->nelts = 0;
-    queue->in = 0;
-    queue->out = 0;
-
-    /* Set all the sockets in the queue to NULL */
-    for (i = 0; i < queue_capacity; ++i)
-        queue->data[i].sd = NULL;
-
-    apr_pool_cleanup_register(a, queue, ap_queue_destroy, apr_pool_cleanup_null);
-
-    return APR_SUCCESS;
-}
-
-/**
- * Push a new socket onto the queue.
- *
- * precondition: ap_queue_info_wait_for_idler has already been called
- *               to reserve an idle worker thread
- */
-apr_status_t ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t *p)
-{
-    fd_queue_elem_t *elem;
-    apr_status_t rv;
-
-    if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
-        return rv;
-    }
-
-    AP_DEBUG_ASSERT(!queue->terminated);
-    AP_DEBUG_ASSERT(!ap_queue_full(queue));
-
-    elem = &queue->data[queue->in];
-    queue->in++;
-    if (queue->in >= queue->bounds)
-        queue->in -= queue->bounds;
-    elem->sd = sd;
-    elem->p = p;
-    queue->nelts++;
-
-    apr_thread_cond_signal(queue->not_empty);
-
-    if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) {
-        return rv;
-    }
-
-    return APR_SUCCESS;
-}
-
-/**
- * Retrieves the next available socket from the queue. If there are no
- * sockets available, it will block until one becomes available.
- * Once retrieved, the socket is placed into the address specified by
- * 'sd'.
- */
-apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p)
-{
-    fd_queue_elem_t *elem;
-    apr_status_t rv;
-
-    if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
-        return rv;
-    }
-
-    /* Keep waiting until we wake up and find that the queue is not empty. */
-    if (ap_queue_empty(queue)) {
-        if (!queue->terminated) {
-            apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex);
-        }
-        /* If we wake up and it's still empty, then we were interrupted */
-        if (ap_queue_empty(queue)) {
-            rv = apr_thread_mutex_unlock(queue->one_big_mutex);
-            if (rv != APR_SUCCESS) {
-                return rv;
-            }
-            if (queue->terminated) {
-                return APR_EOF; /* no more elements ever again */
-            }
-            else {
-                return APR_EINTR;
-            }
-        }
-    }
-
-    elem = &queue->data[queue->out];
-    queue->out++;
-    if (queue->out >= queue->bounds)
-        queue->out -= queue->bounds;
-    queue->nelts--;
-    *sd = elem->sd;
-    *p = elem->p;
-#ifdef AP_DEBUG
-    elem->sd = NULL;
-    elem->p = NULL;
-#endif /* AP_DEBUG */
-
-    rv = apr_thread_mutex_unlock(queue->one_big_mutex);
-    return rv;
-}
-
-static apr_status_t queue_interrupt_all(fd_queue_t *queue, int term)
-{
-    apr_status_t rv;
-
-    if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
-        return rv;
-    }
-    /* we must hold one_big_mutex when setting this... otherwise,
-     * we could end up setting it and waking everybody up just after a
-     * would-be popper checks it but right before they block
-     */
-    if (term) {
-        queue->terminated = 1;
-    }
-    apr_thread_cond_broadcast(queue->not_empty);
-    return apr_thread_mutex_unlock(queue->one_big_mutex);
-}
-
-apr_status_t ap_queue_interrupt_all(fd_queue_t *queue)
-{
-    return queue_interrupt_all(queue, 0);
-}
-
-apr_status_t ap_queue_term(fd_queue_t *queue)
-{
-    return queue_interrupt_all(queue, 1);
-}
diff --git a/server/mpm/worker/fdqueue.h b/server/mpm/worker/fdqueue.h
deleted file mode 100644 (file)
index 1d48a1a..0000000
+++ /dev/null
@@ -1,75 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * @file  worker/fdqueue.h
- * @brief fd queue declarations
- *
- * @addtogroup APACHE_MPM_WORKER
- * @{
- */
-
-#ifndef FDQUEUE_H
-#define FDQUEUE_H
-#include "httpd.h"
-#include <stdlib.h>
-#if APR_HAVE_UNISTD_H
-#include <unistd.h>
-#endif
-#include <apr_thread_mutex.h>
-#include <apr_thread_cond.h>
-#include <sys/types.h>
-#if APR_HAVE_SYS_SOCKET_H
-#include <sys/socket.h>
-#endif
-#include <apr_errno.h>
-
-typedef struct fd_queue_info_t fd_queue_info_t;
-
-apr_status_t ap_queue_info_create(fd_queue_info_t **queue_info,
-                                  apr_pool_t *pool, int max_idlers);
-apr_status_t ap_queue_info_set_idle(fd_queue_info_t *queue_info,
-                                    apr_pool_t *pool_to_recycle);
-apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t *queue_info,
-                                          apr_pool_t **recycled_pool);
-apr_status_t ap_queue_info_term(fd_queue_info_t *queue_info);
-
-struct fd_queue_elem_t {
-    apr_socket_t      *sd;
-    apr_pool_t        *p;
-};
-typedef struct fd_queue_elem_t fd_queue_elem_t;
-
-struct fd_queue_t {
-    fd_queue_elem_t    *data;
-    unsigned int       nelts;
-    unsigned int       bounds;
-    unsigned int       in;
-    unsigned int       out;
-    apr_thread_mutex_t *one_big_mutex;
-    apr_thread_cond_t  *not_empty;
-    int                 terminated;
-};
-typedef struct fd_queue_t fd_queue_t;
-
-apr_status_t ap_queue_init(fd_queue_t *queue, int queue_capacity, apr_pool_t *a);
-apr_status_t ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t *p);
-apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p);
-apr_status_t ap_queue_interrupt_all(fd_queue_t *queue);
-apr_status_t ap_queue_term(fd_queue_t *queue);
-
-#endif /* FDQUEUE_H */
-/** @} */
index 9d72f90a467d8cff8911f45cbd71b9c3c1d2a1fa..3cef5d7fcdc23f1f2f04c3af62d42ad69836c03f 100644 (file)
@@ -64,7 +64,7 @@
 #include "mpm_common.h"
 #include "ap_listen.h"
 #include "scoreboard.h"
-#include "fdqueue.h"
+#include "mpm_fdqueue.h"
 #include "mpm_default.h"
 #include "util_mutex.h"
 #include "unixd.h"
@@ -597,11 +597,7 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t *thd, void * dummy)
         if (listener_may_exit) break;
 
         if (!have_idle_worker) {
-            /* the following pops a recycled ptrans pool off a stack
-             * if there is one, in addition to reserving a worker thread
-             */
-            rv = ap_queue_info_wait_for_idler(worker_queue_info,
-                                              &ptrans);
+            rv = ap_queue_info_wait_for_idler(worker_queue_info, NULL);
             if (APR_STATUS_IS_EOF(rv)) {
                 break; /* we've been signaled to die now */
             }
@@ -679,6 +675,8 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t *thd, void * dummy)
         } /* if/else */
 
         if (!listener_may_exit) {
+            /* the following pops a recycled ptrans pool off a stack */
+            ap_pop_pool(&ptrans, worker_queue_info);
             if (ptrans == NULL) {
                 /* we can't use a recycled transaction pool this time.
                  * create a new transaction pool */
@@ -688,8 +686,8 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t *thd, void * dummy)
                 apr_allocator_max_free_set(allocator, ap_max_mem_free);
                 apr_pool_create_ex(&ptrans, pconf, NULL, allocator);
                 apr_allocator_owner_set(allocator, ptrans);
+                apr_pool_tag(ptrans, "transaction");
             }
-            apr_pool_tag(ptrans, "transaction");
             rv = lr->accept_func(&csd, lr, ptrans);
             /* later we trash rv and rely on csd to indicate success/failure */
             AP_DEBUG_ASSERT(rv == APR_SUCCESS || !csd);
@@ -712,7 +710,7 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t *thd, void * dummy)
                 accept_mutex_error("unlock", rv, process_slot);
             }
             if (csd != NULL) {
-                rv = ap_queue_push(worker_queue, csd, ptrans);
+                rv = ap_queue_push(worker_queue, csd, NULL, ptrans);
                 if (rv) {
                     /* trash the connection; we couldn't queue the connected
                      * socket to a worker
@@ -918,7 +916,7 @@ static void * APR_THREAD_FUNC start_threads(apr_thread_t *thd, void *dummy)
     }
 
     rv = ap_queue_info_create(&worker_queue_info, pchild,
-                              threads_per_child);
+                              threads_per_child, -1);
     if (rv != APR_SUCCESS) {
         ap_log_error(APLOG_MARK, APLOG_ALERT, rv, ap_server_conf, APLOGNO(03141)
                      "ap_queue_info_create() failed");