]> granicus.if.org Git - icinga2/commitdiff
Bug fix for ThreadPool::KillWorker().
authorGunnar Beutner <gunnar.beutner@netways.de>
Tue, 27 Aug 2013 13:57:00 +0000 (15:57 +0200)
committerGunnar Beutner <gunnar.beutner@netways.de>
Tue, 27 Aug 2013 13:57:00 +0000 (15:57 +0200)
components/cluster/clustercomponent.cpp
lib/base/threadpool.cpp
lib/base/threadpool.h
lib/base/utility.h

index 2bd7d298d740a9952299acb54a610b8b92062a15..790e9d3407595b8c69dd4208c6fc00f16d69f72d 100644 (file)
@@ -286,6 +286,11 @@ void ClusterComponent::MessageHandler(const Endpoint::Ptr& endpoint, const Dicti
                        return;
 
                service->ProcessCheckResult(cr);
+
+               /* Reschedule the next check. The side effect of this is that for as long
+                * as we receive results for a service we won't execute any
+                * active checks. */
+               service->SetNextCheck(Utility::GetTime() + service->GetCheckInterval());
        }
 }
 
index 45455dafa9c9798e2741c334988881f6c417c8b6..4ef83f9ea375a469d7da893179c289ec7d9f4a69 100644 (file)
@@ -30,7 +30,7 @@
 using namespace icinga;
 
 ThreadPool::ThreadPool(void)
-       : m_ThreadDeaths(0), m_WaitTime(0), m_ServiceTime(0),
+       : m_WaitTime(0), m_ServiceTime(0),
          m_TaskCount(0), m_Stopped(false)
 {
        for (int i = 0; i < 2; i++)
@@ -106,13 +106,11 @@ void ThreadPool::QueueThreadProc(int tid)
 
                        UpdateThreadUtilization(tid, ThreadIdle);
 
-                       while (m_WorkItems.empty() && !m_Stopped && m_ThreadDeaths == 0)
+                       while (m_WorkItems.empty() && !m_Stopped && !m_ThreadStats[tid].Zombie)
                                m_WorkCV.wait(lock);
 
-                       if (m_ThreadDeaths > 0) {
-                               m_ThreadDeaths--;
+                       if (m_ThreadStats[tid].Zombie)
                                break;
-                       }
 
                        if (m_WorkItems.empty() && m_Stopped)
                                break;
@@ -191,6 +189,7 @@ void ThreadPool::QueueThreadProc(int tid)
        }
 
        UpdateThreadUtilization(tid, ThreadDead);
+       m_ThreadStats[tid].Zombie = false;
 }
 
 /**
@@ -240,7 +239,7 @@ void ThreadPool::ManagerThreadProc(void)
                        alive = 0;
 
                        for (size_t i = 0; i < sizeof(m_ThreadStats) / sizeof(m_ThreadStats[0]); i++) {
-                               if (m_ThreadStats[i].State != ThreadDead) {
+                               if (m_ThreadStats[i].State != ThreadDead && !m_ThreadStats[i].Zombie) {
                                        alive++;
                                        utilization += m_ThreadStats[i].Utilization * 100;
                                }
@@ -254,7 +253,12 @@ void ThreadPool::ManagerThreadProc(void)
                                avg_latency = 0;
 
                        if (utilization < 60 || utilization > 80 || alive < 2) {
-                               int tthreads = ceil((utilization * alive) / 80.0) - alive;
+                               double wthreads = ceil((utilization * alive) / 80.0);
+
+                               if (!finite(wthreads))
+                                       wthreads = 0;
+
+                               int tthreads = wthreads - alive;
 
                                /* Don't ever kill the last 2 threads. */
                                if (alive + tthreads < 2)
@@ -264,6 +268,10 @@ void ThreadPool::ManagerThreadProc(void)
                                if (tthreads > 0 && pending > 0)
                                        tthreads = 8;
 
+                               std::ostringstream msgbuf;
+                               msgbuf << "Thread pool; current: " << alive << "; adjustment: " << tthreads;
+                               Log(LogDebug, "base", msgbuf.str());
+
                                for (int i = 0; i < -tthreads; i++)
                                        KillWorker();
 
@@ -312,9 +320,16 @@ void ThreadPool::SpawnWorker(void)
  */
 void ThreadPool::KillWorker(void)
 {
-       Log(LogDebug, "base", "Killing worker thread.");
+       for (size_t i = 0; i < sizeof(m_ThreadStats) / sizeof(m_ThreadStats[0]); i++) {
+               if (m_ThreadStats[i].State == ThreadIdle && !m_ThreadStats[i].Zombie) {
+                       Log(LogDebug, "base", "Killing worker thread.");
 
-       m_ThreadDeaths++;
+                       m_ThreadStats[i].Zombie = true;
+                       m_WorkCV.notify_all();
+
+                       break;
+               }
+       }
 }
 
 void ThreadPool::StatsThreadProc(void)
index b6eb76f0fb9e81d9a3c7360afed62cf8d41ef841..8caf83c37814678015b6275fbdaa4dc0c498c320 100644 (file)
@@ -60,16 +60,16 @@ private:
        struct ThreadStats
        {
                ThreadState State;
+               bool Zombie;
                double Utilization;
                double LastUpdate;
 
                ThreadStats(ThreadState state = ThreadDead)
-                       : State(state), Utilization(0), LastUpdate(0)
+                       : State(state), Zombie(false), Utilization(0), LastUpdate(0)
                { }
        };
 
        ThreadStats m_ThreadStats[512];
-       int m_ThreadDeaths;
 
        boost::thread m_ManagerThread;
        boost::thread m_StatsThread;
index 7288f61fd70c1ed7bef4afeb751f219593b365c3..b3f7d8e0b050c42ac29cc80754308e6678cb7fe6 100644 (file)
@@ -93,7 +93,7 @@ private:
 #      include <cassert>
 #      define ASSERT(expr) assert(expr)
 #else /* _DEBUG */
-#      define ASSERT(expr)
+#      define ASSERT(expr) __builtin_unreachable()
 #endif /* _DEBUG */
 
 #endif /* UTILITY_H */