From: Gunnar Beutner Date: Mon, 18 Jun 2012 15:23:48 +0000 (+0200) Subject: Bugfixes for the nagios checker/thread pool. X-Git-Tag: v0.0.1~400 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=1db56a5995c8f79094ca03b7cb4079b128197469;p=icinga2 Bugfixes for the nagios checker/thread pool. --- diff --git a/base/threadpool.cpp b/base/threadpool.cpp index 2b602b1c5..f4911dc6c 100644 --- a/base/threadpool.cpp +++ b/base/threadpool.cpp @@ -5,8 +5,26 @@ using namespace icinga; ThreadPool::ThreadPool(long numThreads) : m_Alive(true) { - for (long i = 0; i < numThreads; i++) - m_Threads.create_thread(boost::bind(&ThreadPool::WorkerThreadProc, this)); + for (long i = 0; i < numThreads; i++) { + thread *thr = m_Threads.create_thread(boost::bind(&ThreadPool::WorkerThreadProc, this)); +#ifdef _WIN32 + HANDLE handle = thr->native_handle(); + SetPriorityClass(handle, BELOW_NORMAL_PRIORITY_CLASS); +#else /* _WIN32 */ + pthread_t handle = thr->native_handle(); + + int policy; + sched_param param; + + if (pthread_getschedparam(handle, &policy, ¶m) < 0) + throw PosixException("pthread_getschedparam failed", errno); + + param.sched_priority = 0; + + if (pthread_setschedparam(handle, SCHED_IDLE, ¶m) < 0) + throw PosixException("pthread_setschedparam failed", errno); +#endif /* _WIN32 */ + } } ThreadPool::~ThreadPool(void) @@ -24,6 +42,14 @@ ThreadPool::~ThreadPool(void) m_Threads.join_all(); } +void ThreadPool::EnqueueTasks(const vector& tasks) +{ + unique_lock lock(m_Lock); + + std::copy(tasks.begin(), tasks.end(), std::back_inserter(m_Tasks)); + m_CV.notify_all(); +} + void ThreadPool::EnqueueTask(Task task) { unique_lock lock(m_Lock); diff --git a/base/threadpool.h b/base/threadpool.h index 11643e144..d36c1bec9 100644 --- a/base/threadpool.h +++ b/base/threadpool.h @@ -17,6 +17,7 @@ public: static ThreadPool::Ptr GetDefaultPool(void); + void EnqueueTasks(const vector& tasks); void EnqueueTask(Task task); void WaitForTasks(void); diff --git a/base/timer.cpp b/base/timer.cpp index 33f59f9f1..90ff47dd0 100644 --- a/base/timer.cpp +++ b/base/timer.cpp @@ -99,7 +99,19 @@ void Timer::CallExpiredTimers(void) */ void Timer::Call(void) { + time_t st; + time(&st); + OnTimerExpired(GetSelf()); + + time_t et; + time(&et); + + if (et - st > 5) { + stringstream msgbuf; + msgbuf << "Timer call took " << et - st << " seconds."; + Application::Log(LogDebug, "base", msgbuf.str()); + } } /** diff --git a/components/checker/checkercomponent.cpp b/components/checker/checkercomponent.cpp index 5c530c2eb..90981b05d 100644 --- a/components/checker/checkercomponent.cpp +++ b/components/checker/checkercomponent.cpp @@ -43,7 +43,7 @@ void CheckerComponent::Start(void) m_CheckTimer->OnTimerExpired.connect(boost::bind(&CheckerComponent::CheckTimerHandler, this)); m_CheckTimer->Start(); - CheckTask::RegisterType("nagios", NagiosCheckTask::CreateTask); + CheckTask::RegisterType("nagios", NagiosCheckTask::CreateTask, NagiosCheckTask::FlushQueue); m_ResultTimer = boost::make_shared(); m_ResultTimer->SetInterval(5); @@ -64,6 +64,10 @@ void CheckerComponent::CheckTimerHandler(void) time_t now; time(&now); + Application::Log(LogDebug, "checker", "CheckTimerHandler entered."); + + long tasks = 0; + for (;;) { if (m_Services.empty()) break; @@ -76,22 +80,36 @@ void CheckerComponent::CheckTimerHandler(void) m_Services.pop(); service.SetPendingCheck(true); - Application::Log(LogInformation, "checker", "Executing service check for '" + service.GetName() + "'"); +// Application::Log(LogInformation, "checker", "Executing service check for '" + service.GetName() + "'"); CheckTask::Ptr task = CheckTask::CreateTask(service); - task->Execute(); + task->Enqueue(); m_PendingTasks.push_back(task); service.SetNextCheck(now + service.GetCheckInterval()); + + tasks++; } + Application::Log(LogDebug, "checker", "CheckTimerHandler: past loop."); + + CheckTask::FlushQueue(); + AdjustCheckTimer(); + + stringstream msgbuf; + msgbuf << "CheckTimerHandler: created " << tasks << " tasks"; + Application::Log(LogDebug, "checker", msgbuf.str()); } void CheckerComponent::ResultTimerHandler(void) { vector unfinishedTasks; + Application::Log(LogDebug, "checker", "ResultTimerHandler entered."); + + long results = 0; + for (vector::iterator it = m_PendingTasks.begin(); it != m_PendingTasks.end(); it++) { CheckTask::Ptr task = *it; @@ -104,11 +122,17 @@ void CheckerComponent::ResultTimerHandler(void) service.SetPendingCheck(false); CheckResult result = task->GetResult(); - Application::Log(LogInformation, "checker", "Got result! Plugin output: " + result.Output); +// Application::Log(LogInformation, "checker", "Got result! Plugin output: " + result.Output); + + results++; m_Services.push(service); } + stringstream msgbuf; + msgbuf << "ResultTimerHandler: " << results << " results; " << unfinishedTasks.size() << " unfinished"; + Application::Log(LogDebug, "checker", msgbuf.str()); + m_PendingTasks = unfinishedTasks; AdjustCheckTimer(); diff --git a/configure.ac b/configure.ac index 468193bd8..e5d9e2584 100644 --- a/configure.ac +++ b/configure.ac @@ -50,6 +50,7 @@ AM_PROG_LEX AC_PROG_YACC AC_PROG_LIBTOOL AX_CXX_GCC_ABI_DEMANGLE +AX_PTHREAD AX_BOOST_BASE AX_BOOST_SIGNALS AX_BOOST_THREAD diff --git a/icinga/checktask.cpp b/icinga/checktask.cpp index a82ffd7fd..694d1f2a6 100644 --- a/icinga/checktask.cpp +++ b/icinga/checktask.cpp @@ -2,7 +2,7 @@ using namespace icinga; -map CheckTask::m_Types; +map CheckTask::m_Types; CheckTask::CheckTask(const Service& service) : m_Service(service) @@ -13,19 +13,36 @@ Service CheckTask::GetService(void) const return m_Service; } -void CheckTask::RegisterType(string type, Factory factory) +void CheckTask::RegisterType(string type, Factory factory, QueueFlusher qflusher) { - m_Types[type] = factory; + CheckTaskType ctt; + ctt.Factory = factory; + ctt.QueueFlusher = qflusher; + + m_Types[type] = ctt; } CheckTask::Ptr CheckTask::CreateTask(const Service& service) { - map::iterator it; + map::iterator it; it = m_Types.find(service.GetCheckType()); if (it == m_Types.end()) throw runtime_error("Invalid check type specified for service '" + service.GetName() + "'"); - return it->second(service); + return it->second.Factory(service); +} + +void CheckTask::Enqueue(const CheckTask::Ptr& task) +{ + task->Enqueue(); +} + +void CheckTask::FlushQueue(void) +{ + map::iterator it; + + for (it = m_Types.begin(); it != m_Types.end(); it++) + it->second.QueueFlusher(); } diff --git a/icinga/checktask.h b/icinga/checktask.h index 045378c14..38c658743 100644 --- a/icinga/checktask.h +++ b/icinga/checktask.h @@ -24,6 +24,8 @@ struct CheckResult Dictionary::Ptr PerformanceData; }; +struct CheckTaskType; + class I2_ICINGA_API CheckTask : public Object { public: @@ -31,15 +33,18 @@ public: typedef weak_ptr WeakPtr; typedef function Factory; + typedef function QueueFlusher; Service GetService(void) const; - virtual void Execute(void) = 0; + virtual void Enqueue(void) = 0; virtual bool IsFinished(void) const = 0; virtual CheckResult GetResult(void) = 0; - static void RegisterType(string type, Factory factory); + static void RegisterType(string type, Factory factory, QueueFlusher qflusher); static CheckTask::Ptr CreateTask(const Service& service); + static void Enqueue(const CheckTask::Ptr& task); + static void FlushQueue(void); protected: CheckTask(const Service& service); @@ -47,7 +52,13 @@ protected: private: Service m_Service; - static map m_Types; + static map m_Types; +}; + +struct CheckTaskType +{ + CheckTask::Factory Factory; + CheckTask::QueueFlusher QueueFlusher; }; } diff --git a/icinga/nagioschecktask.cpp b/icinga/nagioschecktask.cpp index 4dd6f7d2d..66d8e7a8d 100644 --- a/icinga/nagioschecktask.cpp +++ b/icinga/nagioschecktask.cpp @@ -2,6 +2,8 @@ using namespace icinga; +vector NagiosCheckTask::m_QueuedTasks; + NagiosCheckTask::NagiosCheckTask(const Service& service) : CheckTask(service) { @@ -12,21 +14,20 @@ NagiosCheckTask::NagiosCheckTask(const Service& service) m_Result = m_Task.get_future(); } -void NagiosCheckTask::Execute(void) +void NagiosCheckTask::Enqueue(void) { - Application::Log(LogDebug, "icinga", "Nagios check command: " + m_Command); - - ThreadPool::GetDefaultPool()->EnqueueTask(boost::bind(&NagiosCheckTask::InternalExecute, this)); + m_QueuedTasks.push_back(bind(&NagiosCheckTask::Execute, this)); } -void NagiosCheckTask::InternalExecute(void) +void NagiosCheckTask::FlushQueue(void) { - m_Task(); + ThreadPool::GetDefaultPool()->EnqueueTasks(m_QueuedTasks); + m_QueuedTasks.clear(); } bool NagiosCheckTask::IsFinished(void) const { - return m_Result.is_ready(); + return m_Result.has_value(); } CheckResult NagiosCheckTask::GetResult(void) @@ -34,6 +35,11 @@ CheckResult NagiosCheckTask::GetResult(void) return m_Result.get(); } +void NagiosCheckTask::Execute(void) +{ + m_Task(); +} + CheckResult NagiosCheckTask::RunCheck(void) const { CheckResult cr; diff --git a/icinga/nagioschecktask.h b/icinga/nagioschecktask.h index e400f23d7..b45aa1a58 100644 --- a/icinga/nagioschecktask.h +++ b/icinga/nagioschecktask.h @@ -9,18 +9,21 @@ class I2_ICINGA_API NagiosCheckTask : public CheckTask public: NagiosCheckTask(const Service& service); - virtual void Execute(void); + virtual void Enqueue(void); virtual bool IsFinished(void) const; virtual CheckResult GetResult(void); static CheckTask::Ptr CreateTask(const Service& service); + static void FlushQueue(void); private: string m_Command; packaged_task m_Task; unique_future m_Result; - void InternalExecute(void); + static vector m_QueuedTasks; + + void Execute(void); CheckResult RunCheck(void) const; };