From: Gunnar Beutner Date: Sun, 24 Jun 2012 14:30:16 +0000 (+0200) Subject: Get rid off threadpool for nagios checks. X-Git-Tag: v0.0.1~367 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=833ede8b3a6daa46be2741af206d8e065f51f002;p=icinga2 Get rid off threadpool for nagios checks. --- diff --git a/components/checker/checkercomponent.cpp b/components/checker/checkercomponent.cpp index 9c4934304..48b8ae304 100644 --- a/components/checker/checkercomponent.cpp +++ b/components/checker/checkercomponent.cpp @@ -41,7 +41,7 @@ void CheckerComponent::Start(void) m_CheckTimer->OnTimerExpired.connect(boost::bind(&CheckerComponent::CheckTimerHandler, this)); m_CheckTimer->Start(); - CheckTask::RegisterType("nagios", NagiosCheckTask::CreateTask, NagiosCheckTask::FlushQueue, NagiosCheckTask::GetFinishedTasks); + NagiosCheckTask::Register(); m_ResultTimer = boost::make_shared(); m_ResultTimer->SetInterval(5); diff --git a/icinga/checktask.cpp b/icinga/checktask.cpp index 84ac1606e..dc7b53f51 100644 --- a/icinga/checktask.cpp +++ b/icinga/checktask.cpp @@ -3,6 +3,8 @@ using namespace icinga; map CheckTask::m_Types; +vector CheckTask::m_FinishedTasks; +mutex CheckTask::m_FinishedTasksMutex; CheckTask::CheckTask(const Service& service) : m_Service(service) @@ -13,12 +15,11 @@ Service CheckTask::GetService(void) const return m_Service; } -void CheckTask::RegisterType(string type, Factory factory, QueueFlusher qflusher, FinishedTasksGetter qtasksgetter) +void CheckTask::RegisterType(string type, Factory factory, QueueFlusher qflusher) { CheckTaskType ctt; ctt.Factory = factory; ctt.QueueFlusher = qflusher; - ctt.FinishedTasksGetter = qtasksgetter; m_Types[type] = ctt; } @@ -49,12 +50,16 @@ void CheckTask::FlushQueue(void) vector CheckTask::GetFinishedTasks(void) { - vector tasks; + mutex::scoped_lock lock(m_FinishedTasksMutex); - map::iterator it; - for (it = m_Types.begin(); it != m_Types.end(); it++) - it->second.FinishedTasksGetter(tasks); + vector result = m_FinishedTasks; + m_FinishedTasks.clear(); - return tasks; + return result; } +void CheckTask::FinishTask(const CheckTask::Ptr& task) +{ + mutex::scoped_lock lock(m_FinishedTasksMutex); + m_FinishedTasks.push_back(task); +} diff --git a/icinga/checktask.h b/icinga/checktask.h index f8de86e2d..46cd9baa1 100644 --- a/icinga/checktask.h +++ b/icinga/checktask.h @@ -34,17 +34,18 @@ public: typedef function Factory; typedef function QueueFlusher; - typedef function& tasks)> FinishedTasksGetter; Service GetService(void) const; virtual void Enqueue(void) = 0; virtual CheckResult GetResult(void) = 0; - static void RegisterType(string type, Factory factory, QueueFlusher qflusher, FinishedTasksGetter qtasksgetter); + static void RegisterType(string type, Factory factory, QueueFlusher qflusher); static CheckTask::Ptr CreateTask(const Service& service); static void Enqueue(const CheckTask::Ptr& task); static void FlushQueue(void); + + static void FinishTask(const CheckTask::Ptr& task); static vector GetFinishedTasks(void); protected: @@ -54,13 +55,15 @@ private: Service m_Service; static map m_Types; + + static vector m_FinishedTasks; + static mutex m_FinishedTasksMutex; }; struct CheckTaskType { CheckTask::Factory Factory; CheckTask::QueueFlusher QueueFlusher; - CheckTask::FinishedTasksGetter FinishedTasksGetter; }; } diff --git a/icinga/nagioschecktask.cpp b/icinga/nagioschecktask.cpp index c6957c6ae..f2901b9be 100644 --- a/icinga/nagioschecktask.cpp +++ b/icinga/nagioschecktask.cpp @@ -5,13 +5,12 @@ using namespace icinga; -list NagiosCheckTask::m_QueuedTasks; - -boost::mutex NagiosCheckTask::m_FinishedTasksMutex; -vector NagiosCheckTask::m_FinishedTasks; +boost::mutex NagiosCheckTask::m_Mutex; +deque NagiosCheckTask::m_Tasks; +condition_variable NagiosCheckTask::m_TasksCV; NagiosCheckTask::NagiosCheckTask(const Service& service) - : CheckTask(service) + : CheckTask(service), m_FP(NULL) { string checkCommand = service.GetCheckCommand(); m_Command = MacroProcessor::ResolveMacros(checkCommand, service.GetMacros()); // + " 2>&1"; @@ -20,21 +19,16 @@ NagiosCheckTask::NagiosCheckTask(const Service& service) void NagiosCheckTask::Enqueue(void) { time(&m_Result.StartTime); - m_QueuedTasks.push_back(static_pointer_cast(static_cast(GetSelf()))); -// m_QueuedTasks.push_back(new ThreadPool:Task(bind(&NagiosCheckTask::Execute, static_cast(GetSelf())))); -} -void NagiosCheckTask::FlushQueue(void) -{ - ThreadPool::GetDefaultPool()->EnqueueTasks(m_QueuedTasks); - m_QueuedTasks.clear(); + { + mutex::scoped_lock lock(m_Mutex); + m_Tasks.push_back(GetSelf()); + } } -void NagiosCheckTask::GetFinishedTasks(vector& tasks) +void NagiosCheckTask::FlushQueue(void) { - mutex::scoped_lock lock(m_FinishedTasksMutex); - std::copy(m_FinishedTasks.begin(), m_FinishedTasks.end(), back_inserter(tasks)); - m_FinishedTasks.clear(); + m_TasksCV.notify_all(); } CheckResult NagiosCheckTask::GetResult(void) @@ -42,56 +36,113 @@ CheckResult NagiosCheckTask::GetResult(void) return m_Result; } -void NagiosCheckTask::Execute(void) +void NagiosCheckTask::CheckThreadProc(void) { - RunCheck(); + mutex::scoped_lock lock(m_Mutex); - { - mutex::scoped_lock lock(m_FinishedTasksMutex); - m_FinishedTasks.push_back(GetSelf()); + map tasks; + const int maxTasks = 16; + + for (;;) { + while (m_Tasks.empty() || tasks.size() >= maxTasks) { + lock.unlock(); + + map::iterator it, prev; + +#ifndef _MSC_VER + fd_set readfds; + int nfds = 0; + + FD_ZERO(&readfds); + + for (it = tasks.begin(); it != tasks.end(); it++) { + if (it->first > nfds) + nfds = it->first; + + FD_SET(it->first, &readfds); + } + + timeval tv; + tv.tv_sec = 1; + tv.tv_usec = 0; + select(nfds + 1, &readfds, NULL, NULL, &tv); +#else /* _MSC_VER */ + Sleep(1000); +#endif /* _MSC_VER */ + + for (it = tasks.begin(); it != tasks.end(); ) { +#ifndef _MSC_VER + if (!FD_ISSET(it->first, &readfds)) { + it++; + continue; + } +#endif /* _MSC_VER */ + + if (!it->second->RunTask()) { + CheckTask::FinishTask(it->second); + it = tasks.erase(it); + } else { + it++; + } + } + + lock.lock(); + } + + while (!m_Tasks.empty() && tasks.size() < maxTasks) { + NagiosCheckTask::Ptr task = m_Tasks.front(); + m_Tasks.pop_front(); + if (!task->InitTask()) { + CheckTask::FinishTask(task); + } else { + int fd = task->GetFD(); + if (fd >= 0) + tasks[fd] = task; + } + } } } -void NagiosCheckTask::RunCheck(void) +bool NagiosCheckTask::InitTask(void) { - FILE *fp; - #ifdef _MSC_VER - fp = _popen(m_Command.c_str(), "r"); + m_FP = _popen(m_Command.c_str(), "r"); #else /* _MSC_VER */ bool use_libc_popen = false; popen_noshell_pass_to_pclose pclose_arg; if (!use_libc_popen) { - fp = popen_noshell_compat(m_Command.c_str(), "r", &pclose_arg); + m_FP = popen_noshell_compat(m_Command.c_str(), "r", &pclose_arg); - if (fp == NULL) // TODO: add check for valgrind + if (m_FP == NULL) // TODO: add check for valgrind use_libc_popen = true; } if (use_libc_popen) - fp = popen(m_Command.c_str(), "r"); + m_FP = popen(m_Command.c_str(), "r"); #endif /* _MSC_VER */ - stringstream outputbuf; + return (m_FP != NULL); +} - while (!feof(fp)) { - char buffer[512]; - size_t read = fread(buffer, 1, sizeof(buffer), fp); +bool NagiosCheckTask::RunTask(void) +{ + char buffer[512]; + size_t read = fread(buffer, 1, sizeof(buffer), m_FP); - if (read == 0) - break; + if (read > 0) + m_OutputStream.write(buffer, read); - outputbuf.write(buffer, read); - } + if (!feof(m_FP)) + return true; - m_Result.Output = outputbuf.str(); + m_Result.Output = m_OutputStream.str(); boost::algorithm::trim(m_Result.Output); int status, exitcode; #ifdef _MSC_VER - status = _pclose(fp); + status = _pclose(m_FP); #else /* _MSC_VER */ if (use_libc_popen) status = pclose(fp); @@ -128,9 +179,26 @@ void NagiosCheckTask::RunCheck(void) #endif /* _MSC_VER */ time(&m_Result.EndTime); + + return false; +} + +int NagiosCheckTask::GetFD(void) const +{ + return fileno(m_FP); } CheckTask::Ptr NagiosCheckTask::CreateTask(const Service& service) { return boost::make_shared(service); } + +void NagiosCheckTask::Register(void) +{ + CheckTask::RegisterType("nagios", NagiosCheckTask::CreateTask, NagiosCheckTask::FlushQueue); + + for (int i = 0; i < 1; i++) { + thread t(&NagiosCheckTask::CheckThreadProc); + t.detach(); + } +} \ No newline at end of file diff --git a/icinga/nagioschecktask.h b/icinga/nagioschecktask.h index e835b505b..d850ad8cf 100644 --- a/icinga/nagioschecktask.h +++ b/icinga/nagioschecktask.h @@ -4,7 +4,7 @@ namespace icinga { -class I2_ICINGA_API NagiosCheckTask : public CheckTask, public ThreadPoolTask +class I2_ICINGA_API NagiosCheckTask : public CheckTask { public: typedef shared_ptr Ptr; @@ -17,19 +17,25 @@ public: static CheckTask::Ptr CreateTask(const Service& service); static void FlushQueue(void); - static void GetFinishedTasks(vector& tasks); + + static void Register(void); private: string m_Command; CheckResult m_Result; - static list m_QueuedTasks; + FILE *m_FP; + stringstream m_OutputStream; + + static boost::mutex m_Mutex; + static deque m_Tasks; + static condition_variable m_TasksCV; - static boost::mutex m_FinishedTasksMutex; - static vector m_FinishedTasks; + static void CheckThreadProc(void); - virtual void Execute(void); - void RunCheck(void); + bool InitTask(void); + bool RunTask(void); + int GetFD(void) const; }; }