]> granicus.if.org Git - icinga2/commitdiff
Improve thread spawning behavior
authorGunnar Beutner <gunnar.beutner@netways.de>
Thu, 11 Sep 2014 09:45:21 +0000 (11:45 +0200)
committerGunnar Beutner <gunnar.beutner@netways.de>
Thu, 11 Sep 2014 09:45:21 +0000 (11:45 +0200)
fixes #7186

components/livestatus/livestatuslistener.cpp
lib/base/threadpool.cpp
lib/base/threadpool.hpp
lib/base/utility.cpp
lib/base/utility.hpp
lib/remote/apilistener.cpp

index 7ff0be00500ac8844c7f8d6b96d7a286d149d12b..5c67c35ce2bd22930574a28811f6aab994688c94 100644 (file)
@@ -134,7 +134,7 @@ void LivestatusListener::ServerThreadProc(const Socket::Ptr& server)
                try {
                        Socket::Ptr client = server->Accept();
                        Log(LogNotice, "LivestatusListener", "Client connected");
-                       Utility::QueueAsyncCallback(boost::bind(&LivestatusListener::ClientHandler, this, client));
+                       Utility::QueueAsyncCallback(boost::bind(&LivestatusListener::ClientHandler, this, client), LowLatencyScheduler);
                } catch (std::exception&) {
                        Log(LogCritical, "ListenerListener", "Cannot accept new connection.");
                }
index 92fe9f4a16b76eaf7211a0660a4ca34c088f3cfa..5197bf7bc411d6fb25d3a97a191295f2809d7aea 100644 (file)
@@ -193,9 +193,10 @@ void ThreadPool::WorkerThread::ThreadProc(Queue& queue)
  * Appends a work item to the work queue. Work items will be processed in FIFO order.
  *
  * @param callback The callback function for the work item.
+ * @param policy The scheduling policy
  * @returns true if the item was queued, false otherwise.
  */
-bool ThreadPool::Post(const ThreadPool::WorkFunction& callback)
+bool ThreadPool::Post(const ThreadPool::WorkFunction& callback, SchedulerPolicy policy)
 {
        WorkItem wi;
        wi.Callback = callback;
@@ -209,6 +210,9 @@ bool ThreadPool::Post(const ThreadPool::WorkFunction& callback)
                if (queue.Stopped)
                        return false;
 
+               if (policy == LowLatencyScheduler)
+                       queue.SpawnWorker(m_ThreadGroup);
+
                queue.Items.push_back(wi);
                queue.CV.notify_one();
        }
@@ -233,7 +237,7 @@ void ThreadPool::ManagerThreadProc(void)
                        boost::mutex::scoped_lock lock(m_MgmtMutex);
 
                        if (!m_Stopped)
-                               m_MgmtCV.timed_wait(lock, boost::posix_time::seconds(5));
+                               m_MgmtCV.timed_wait(lock, boost::posix_time::milliseconds(500));
 
                        if (m_Stopped)
                                break;
@@ -273,7 +277,7 @@ void ThreadPool::ManagerThreadProc(void)
                                int tthreads = wthreads - alive;
 
                                /* Make sure there is at least one thread per CPU */
-                               int ncput = std::max(boost::thread::hardware_concurrency() / QUEUECOUNT, 1U);
+                               int ncput = std::max(boost::thread::hardware_concurrency() / QUEUECOUNT, 4U);
                                if (alive + tthreads < ncput)
                                        tthreads = ncput - alive;
 
index 518d7a14b2dbf5ae1e5c37f0faca9d7a2badf891..c34fd80a8c68ce7660c552e2729e1b907082f087 100644 (file)
@@ -32,6 +32,12 @@ namespace icinga
 
 #define QUEUECOUNT 4
 
+enum SchedulerPolicy
+{
+       DefaultScheduler,
+       LowLatencyScheduler
+};
+
 /**
  * A thread pool.
  *
@@ -49,7 +55,7 @@ public:
        void Stop(void);
        void Join(bool wait_for_stop = false);
 
-       bool Post(const WorkFunction& callback);
+       bool Post(const WorkFunction& callback, SchedulerPolicy policy = DefaultScheduler);
 
 private:
        enum ThreadState
index 951389421a2486d547085da33bf1d5a35bd0e124..fc348bceae8e9e15b59899f028c3ebd872836537 100644 (file)
@@ -684,9 +684,9 @@ void Utility::SetNonBlockingSocket(SOCKET s)
 #endif /* _WIN32 */
 }
 
-void Utility::QueueAsyncCallback(const boost::function<void (void)>& callback)
+void Utility::QueueAsyncCallback(const boost::function<void (void)>& callback, SchedulerPolicy policy)
 {
-       Application::GetTP().Post(callback);
+       Application::GetTP().Post(callback, policy);
 }
 
 String Utility::NaturalJoin(const std::vector<String>& tokens)
index f9fa9eebb6771d415a8b03fe7b4e0913137056c3..e6e133a7621b57c6aa8adcb80b1f9d935a139b7f 100644 (file)
@@ -26,6 +26,7 @@
 #include <boost/function.hpp>
 #include <boost/thread/tss.hpp>
 #include <vector>
+#include "base/threadpool.hpp"
 
 namespace icinga
 {
@@ -91,7 +92,7 @@ public:
        static bool MkDir(const String& path, int flags);
        static bool MkDirP(const String& path, int flags);
 
-       static void QueueAsyncCallback(const boost::function<void (void)>& callback);
+       static void QueueAsyncCallback(const boost::function<void (void)>& callback, SchedulerPolicy policy = DefaultScheduler);
 
        static String NaturalJoin(const std::vector<String>& tokens);
 
index 64474390ef099c20721a4e03f93ffe76bff8b5d6..d2e19a284e90767393128a991a6dd69e57ff3cf5 100644 (file)
@@ -204,7 +204,7 @@ void ApiListener::ListenerThreadProc(const Socket::Ptr& server)
        for (;;) {
                try {
                        Socket::Ptr client = server->Accept();
-                       Utility::QueueAsyncCallback(boost::bind(&ApiListener::NewClientHandler, this, client, RoleServer));
+                       Utility::QueueAsyncCallback(boost::bind(&ApiListener::NewClientHandler, this, client, RoleServer), LowLatencyScheduler);
                } catch (const std::exception&) {
                        Log(LogCritical, "ApiListener", "Cannot accept new connection.");
                }