]> granicus.if.org Git - icinga2/commitdiff
Bugfixes for the nagios checker/thread pool.
authorGunnar Beutner <gunnar.beutner@netways.de>
Mon, 18 Jun 2012 15:23:48 +0000 (17:23 +0200)
committerGunnar Beutner <gunnar.beutner@netways.de>
Mon, 18 Jun 2012 15:23:48 +0000 (17:23 +0200)
base/threadpool.cpp
base/threadpool.h
base/timer.cpp
components/checker/checkercomponent.cpp
configure.ac
icinga/checktask.cpp
icinga/checktask.h
icinga/nagioschecktask.cpp
icinga/nagioschecktask.h

index 2b602b1c5474c957d19c1fef5a74ca16ce5a9acc..f4911dc6c4903aff065d0e1fe58a45b4d6ede623 100644 (file)
@@ -5,8 +5,26 @@ using namespace icinga;
 ThreadPool::ThreadPool(long numThreads)
        : m_Alive(true)
 {
-       for (long i = 0; i < numThreads; i++)
-               m_Threads.create_thread(boost::bind(&ThreadPool::WorkerThreadProc, this));
+       for (long i = 0; i < numThreads; i++) {
+               thread *thr = m_Threads.create_thread(boost::bind(&ThreadPool::WorkerThreadProc, this));
+#ifdef _WIN32
+               HANDLE handle = thr->native_handle();
+               SetPriorityClass(handle, BELOW_NORMAL_PRIORITY_CLASS);
+#else /* _WIN32 */
+               pthread_t handle = thr->native_handle();
+
+               int policy;
+               sched_param param;
+
+               if (pthread_getschedparam(handle, &policy, &param) < 0)
+                       throw PosixException("pthread_getschedparam failed", errno);
+
+               param.sched_priority = 0;
+
+               if (pthread_setschedparam(handle, SCHED_IDLE, &param) < 0)
+                       throw PosixException("pthread_setschedparam failed", errno);
+#endif /* _WIN32 */
+       }
 }
 
 ThreadPool::~ThreadPool(void)
@@ -24,6 +42,14 @@ ThreadPool::~ThreadPool(void)
        m_Threads.join_all();
 }
 
+void ThreadPool::EnqueueTasks(const vector<Task>& tasks)
+{
+       unique_lock<mutex> lock(m_Lock);
+
+       std::copy(tasks.begin(), tasks.end(), std::back_inserter(m_Tasks));
+       m_CV.notify_all();
+}
+
 void ThreadPool::EnqueueTask(Task task)
 {
        unique_lock<mutex> lock(m_Lock);
index 11643e144374863df8e66e61e4919aed65b43a76..d36c1bec933ec767c62984831e56882cf6b5091d 100644 (file)
@@ -17,6 +17,7 @@ public:
 
        static ThreadPool::Ptr GetDefaultPool(void);
 
+       void EnqueueTasks(const vector<Task>& tasks);
        void EnqueueTask(Task task);
        void WaitForTasks(void);
 
index 33f59f9f1241fb391151442c73582b0f95d87ac8..90ff47dd0831cbf88b4ebc88b049d5a9dbfc2788 100644 (file)
@@ -99,7 +99,19 @@ void Timer::CallExpiredTimers(void)
  */
 void Timer::Call(void)
 {
+       time_t st;
+       time(&st);
+
        OnTimerExpired(GetSelf());
+
+       time_t et;
+       time(&et);
+
+       if (et - st > 5) {
+               stringstream msgbuf;
+               msgbuf << "Timer call took " << et - st << " seconds.";
+               Application::Log(LogDebug, "base", msgbuf.str());
+       }
 }
 
 /**
index 5c530c2eb1d2d1439f8f160b67b9428d9eddaf4a..90981b05dbe9a6499b9ae6a120c11cd6284e078e 100644 (file)
@@ -43,7 +43,7 @@ void CheckerComponent::Start(void)
        m_CheckTimer->OnTimerExpired.connect(boost::bind(&CheckerComponent::CheckTimerHandler, this));
        m_CheckTimer->Start();
 
-       CheckTask::RegisterType("nagios", NagiosCheckTask::CreateTask);
+       CheckTask::RegisterType("nagios", NagiosCheckTask::CreateTask, NagiosCheckTask::FlushQueue);
 
        m_ResultTimer = boost::make_shared<Timer>();
        m_ResultTimer->SetInterval(5);
@@ -64,6 +64,10 @@ void CheckerComponent::CheckTimerHandler(void)
        time_t now;
        time(&now);
 
+       Application::Log(LogDebug, "checker", "CheckTimerHandler entered.");
+
+       long tasks = 0;
+
        for (;;) {
                if (m_Services.empty())
                        break;
@@ -76,22 +80,36 @@ void CheckerComponent::CheckTimerHandler(void)
                m_Services.pop();
                service.SetPendingCheck(true);
 
-               Application::Log(LogInformation, "checker", "Executing service check for '" + service.GetName() + "'");
+//             Application::Log(LogInformation, "checker", "Executing service check for '" + service.GetName() + "'");
 
                CheckTask::Ptr task = CheckTask::CreateTask(service);
-               task->Execute();
+               task->Enqueue();
                m_PendingTasks.push_back(task);
 
                service.SetNextCheck(now + service.GetCheckInterval());
+
+               tasks++;
        }
 
+       Application::Log(LogDebug, "checker", "CheckTimerHandler: past loop.");
+
+       CheckTask::FlushQueue();
+
        AdjustCheckTimer();
+
+       stringstream msgbuf;
+       msgbuf << "CheckTimerHandler: created " << tasks << " tasks";
+       Application::Log(LogDebug, "checker", msgbuf.str());
 }
 
 void CheckerComponent::ResultTimerHandler(void)
 {
        vector<CheckTask::Ptr> unfinishedTasks;
 
+       Application::Log(LogDebug, "checker", "ResultTimerHandler entered.");
+
+       long results = 0;
+
        for (vector<CheckTask::Ptr>::iterator it = m_PendingTasks.begin(); it != m_PendingTasks.end(); it++) {
                CheckTask::Ptr task = *it;
 
@@ -104,11 +122,17 @@ void CheckerComponent::ResultTimerHandler(void)
                service.SetPendingCheck(false);
 
                CheckResult result = task->GetResult();
-               Application::Log(LogInformation, "checker", "Got result! Plugin output: " + result.Output);
+//             Application::Log(LogInformation, "checker", "Got result! Plugin output: " + result.Output);
+
+               results++;
 
                m_Services.push(service);
        }
 
+       stringstream msgbuf;
+       msgbuf << "ResultTimerHandler: " << results << " results; " << unfinishedTasks.size() << " unfinished";
+       Application::Log(LogDebug, "checker", msgbuf.str());
+
        m_PendingTasks = unfinishedTasks;
 
        AdjustCheckTimer();
index 468193bd85427ea9adf4b22cfc6959dd6b75f1dc..e5d9e25848178e8a4ebd0016df318392e37db74c 100644 (file)
@@ -50,6 +50,7 @@ AM_PROG_LEX
 AC_PROG_YACC
 AC_PROG_LIBTOOL
 AX_CXX_GCC_ABI_DEMANGLE
+AX_PTHREAD
 AX_BOOST_BASE
 AX_BOOST_SIGNALS
 AX_BOOST_THREAD
index a82ffd7fd63f3a159f189fa5ef6956c16440a451..694d1f2a654160f227872e29c7b568c97290f5da 100644 (file)
@@ -2,7 +2,7 @@
 
 using namespace icinga;
 
-map<string, CheckTask::Factory> CheckTask::m_Types;
+map<string, CheckTaskType> CheckTask::m_Types;
 
 CheckTask::CheckTask(const Service& service)
        : m_Service(service)
@@ -13,19 +13,36 @@ Service CheckTask::GetService(void) const
        return m_Service;
 }
 
-void CheckTask::RegisterType(string type, Factory factory)
+void CheckTask::RegisterType(string type, Factory factory, QueueFlusher qflusher)
 {
-       m_Types[type] = factory;
+       CheckTaskType ctt;
+       ctt.Factory = factory;
+       ctt.QueueFlusher = qflusher;
+
+       m_Types[type] = ctt;
 }
 
 CheckTask::Ptr CheckTask::CreateTask(const Service& service)
 {
-       map<string, CheckTask::Factory>::iterator it;
+       map<string, CheckTaskType>::iterator it;
 
        it = m_Types.find(service.GetCheckType());
 
        if (it == m_Types.end())
                throw runtime_error("Invalid check type specified for service '" + service.GetName() + "'");
 
-       return it->second(service);
+       return it->second.Factory(service);
+}
+
+void CheckTask::Enqueue(const CheckTask::Ptr& task)
+{
+       task->Enqueue();
+}
+
+void CheckTask::FlushQueue(void)
+{
+       map<string, CheckTaskType>::iterator it;
+
+       for (it = m_Types.begin(); it != m_Types.end(); it++)
+               it->second.QueueFlusher();
 }
index 045378c14f666f3a9ac636091840736a1240e47e..38c6587434a8fb8238d6c54a97fc0cc60c14efc7 100644 (file)
@@ -24,6 +24,8 @@ struct CheckResult
        Dictionary::Ptr PerformanceData;
 };
 
+struct CheckTaskType;
+
 class I2_ICINGA_API CheckTask : public Object
 {
 public:
@@ -31,15 +33,18 @@ public:
        typedef weak_ptr<CheckTask> WeakPtr;
 
        typedef function<CheckTask::Ptr(const Service&)> Factory;
+       typedef function<void()> QueueFlusher;
 
        Service GetService(void) const;
 
-       virtual void Execute(void) = 0;
+       virtual void Enqueue(void) = 0;
        virtual bool IsFinished(void) const = 0;
        virtual CheckResult GetResult(void) = 0;
 
-       static void RegisterType(string type, Factory factory);
+       static void RegisterType(string type, Factory factory, QueueFlusher qflusher);
        static CheckTask::Ptr CreateTask(const Service& service);
+       static void Enqueue(const CheckTask::Ptr& task);
+       static void FlushQueue(void);
 
 protected:
        CheckTask(const Service& service);
@@ -47,7 +52,13 @@ protected:
 private:
        Service m_Service;
 
-       static map<string, Factory> m_Types;
+       static map<string, CheckTaskType> m_Types;
+};
+
+struct CheckTaskType
+{
+       CheckTask::Factory Factory;
+       CheckTask::QueueFlusher QueueFlusher;
 };
 
 }
index 4dd6f7d2ddd2a46908796139f44128f29018969e..66d8e7a8d2f9284f69ab33f74d30fcb77a5cd030 100644 (file)
@@ -2,6 +2,8 @@
 
 using namespace icinga;
 
+vector<ThreadPool::Task> NagiosCheckTask::m_QueuedTasks;
+
 NagiosCheckTask::NagiosCheckTask(const Service& service)
        : CheckTask(service)
 {
@@ -12,21 +14,20 @@ NagiosCheckTask::NagiosCheckTask(const Service& service)
        m_Result = m_Task.get_future();
 }
 
-void NagiosCheckTask::Execute(void)
+void NagiosCheckTask::Enqueue(void)
 {
-       Application::Log(LogDebug, "icinga", "Nagios check command: " + m_Command);
-
-       ThreadPool::GetDefaultPool()->EnqueueTask(boost::bind(&NagiosCheckTask::InternalExecute, this));
+       m_QueuedTasks.push_back(bind(&NagiosCheckTask::Execute, this));
 }
 
-void NagiosCheckTask::InternalExecute(void)
+void NagiosCheckTask::FlushQueue(void)
 {
-       m_Task();
+       ThreadPool::GetDefaultPool()->EnqueueTasks(m_QueuedTasks);
+       m_QueuedTasks.clear();
 }
 
 bool NagiosCheckTask::IsFinished(void) const
 {
-       return m_Result.is_ready();
+       return m_Result.has_value();
 }
 
 CheckResult NagiosCheckTask::GetResult(void)
@@ -34,6 +35,11 @@ CheckResult NagiosCheckTask::GetResult(void)
        return m_Result.get();
 }
 
+void NagiosCheckTask::Execute(void)
+{
+       m_Task();
+}
+
 CheckResult NagiosCheckTask::RunCheck(void) const
 {
        CheckResult cr;
index e400f23d73f0a70dfe67e554ce7fda20d84ac4ea..b45aa1a5867285d29d4b5de6ebeef4a0de32d928 100644 (file)
@@ -9,18 +9,21 @@ class I2_ICINGA_API NagiosCheckTask : public CheckTask
 public:
        NagiosCheckTask(const Service& service);
 
-       virtual void Execute(void);
+       virtual void Enqueue(void);
        virtual bool IsFinished(void) const;
        virtual CheckResult GetResult(void);
 
        static CheckTask::Ptr CreateTask(const Service& service);
+       static void FlushQueue(void);
 
 private:
        string m_Command;
        packaged_task<CheckResult> m_Task;
        unique_future<CheckResult> m_Result;
 
-       void InternalExecute(void);
+       static vector<ThreadPool::Task> m_QueuedTasks;
+
+       void Execute(void);
        CheckResult RunCheck(void) const;
 };