]> granicus.if.org Git - icinga2/commitdiff
Get rid off threadpool for nagios checks.
authorGunnar Beutner <gunnar@beutner.name>
Sun, 24 Jun 2012 14:30:16 +0000 (16:30 +0200)
committerGunnar Beutner <gunnar@beutner.name>
Sun, 24 Jun 2012 14:30:16 +0000 (16:30 +0200)
components/checker/checkercomponent.cpp
icinga/checktask.cpp
icinga/checktask.h
icinga/nagioschecktask.cpp
icinga/nagioschecktask.h

index 9c493430431ed0efeb5edbde80f9a5c4ace2c784..48b8ae30408b48939eee4625cd024afcd4f89c65 100644 (file)
@@ -41,7 +41,7 @@ void CheckerComponent::Start(void)
        m_CheckTimer->OnTimerExpired.connect(boost::bind(&CheckerComponent::CheckTimerHandler, this));
        m_CheckTimer->Start();
 
-       CheckTask::RegisterType("nagios", NagiosCheckTask::CreateTask, NagiosCheckTask::FlushQueue, NagiosCheckTask::GetFinishedTasks);
+       NagiosCheckTask::Register();
 
        m_ResultTimer = boost::make_shared<Timer>();
        m_ResultTimer->SetInterval(5);
index 84ac1606e76259aa96c1ee9760cf0d08682084e8..dc7b53f51dae58f472583514ae444a67b297567f 100644 (file)
@@ -3,6 +3,8 @@
 using namespace icinga;
 
 map<string, CheckTaskType> CheckTask::m_Types;
+vector<CheckTask::Ptr> CheckTask::m_FinishedTasks;
+mutex CheckTask::m_FinishedTasksMutex;
 
 CheckTask::CheckTask(const Service& service)
        : m_Service(service)
@@ -13,12 +15,11 @@ Service CheckTask::GetService(void) const
        return m_Service;
 }
 
-void CheckTask::RegisterType(string type, Factory factory, QueueFlusher qflusher, FinishedTasksGetter qtasksgetter)
+void CheckTask::RegisterType(string type, Factory factory, QueueFlusher qflusher)
 {
        CheckTaskType ctt;
        ctt.Factory = factory;
        ctt.QueueFlusher = qflusher;
-       ctt.FinishedTasksGetter = qtasksgetter;
 
        m_Types[type] = ctt;
 }
@@ -49,12 +50,16 @@ void CheckTask::FlushQueue(void)
 
 vector<CheckTask::Ptr> CheckTask::GetFinishedTasks(void)
 {
-       vector<CheckTask::Ptr> tasks;
+       mutex::scoped_lock lock(m_FinishedTasksMutex);
 
-       map<string, CheckTaskType>::iterator it;
-       for (it = m_Types.begin(); it != m_Types.end(); it++)
-               it->second.FinishedTasksGetter(tasks);
+       vector<CheckTask::Ptr> result = m_FinishedTasks;
+       m_FinishedTasks.clear();
 
-       return tasks;
+       return result;
 }
 
+void CheckTask::FinishTask(const CheckTask::Ptr& task)
+{
+       mutex::scoped_lock lock(m_FinishedTasksMutex);
+       m_FinishedTasks.push_back(task);
+}
index f8de86e2dcf7a4d035a60ca15ffd4909c1315302..46cd9baa10c67727bb384444d2093dcb875083a6 100644 (file)
@@ -34,17 +34,18 @@ public:
 
        typedef function<CheckTask::Ptr(const Service&)> Factory;
        typedef function<void()> QueueFlusher;
-       typedef function<void (vector<CheckTask::Ptr>& tasks)> FinishedTasksGetter;
 
        Service GetService(void) const;
 
        virtual void Enqueue(void) = 0;
        virtual CheckResult GetResult(void) = 0;
 
-       static void RegisterType(string type, Factory factory, QueueFlusher qflusher, FinishedTasksGetter qtasksgetter);
+       static void RegisterType(string type, Factory factory, QueueFlusher qflusher);
        static CheckTask::Ptr CreateTask(const Service& service);
        static void Enqueue(const CheckTask::Ptr& task);
        static void FlushQueue(void);
+
+       static void FinishTask(const CheckTask::Ptr& task);
        static vector<CheckTask::Ptr> GetFinishedTasks(void);
 
 protected:
@@ -54,13 +55,15 @@ private:
        Service m_Service;
 
        static map<string, CheckTaskType> m_Types;
+
+       static vector<CheckTask::Ptr> m_FinishedTasks;
+       static mutex m_FinishedTasksMutex;
 };
 
 struct CheckTaskType
 {
        CheckTask::Factory Factory;
        CheckTask::QueueFlusher QueueFlusher;
-       CheckTask::FinishedTasksGetter FinishedTasksGetter;
 };
 
 }
index c6957c6ae272852097b0fb5b12dcf9b76c9807e7..f2901b9be9a5937aee4ca51210f08cdb0ad7af69 100644 (file)
@@ -5,13 +5,12 @@
 
 using namespace icinga;
 
-list<ThreadPoolTask::Ptr> NagiosCheckTask::m_QueuedTasks;
-
-boost::mutex NagiosCheckTask::m_FinishedTasksMutex;
-vector<CheckTask::Ptr> NagiosCheckTask::m_FinishedTasks;
+boost::mutex NagiosCheckTask::m_Mutex;
+deque<NagiosCheckTask::Ptr> NagiosCheckTask::m_Tasks;
+condition_variable NagiosCheckTask::m_TasksCV;
 
 NagiosCheckTask::NagiosCheckTask(const Service& service)
-       : CheckTask(service)
+       : CheckTask(service), m_FP(NULL)
 {
        string checkCommand = service.GetCheckCommand();
        m_Command = MacroProcessor::ResolveMacros(checkCommand, service.GetMacros()); // + " 2>&1";
@@ -20,21 +19,16 @@ NagiosCheckTask::NagiosCheckTask(const Service& service)
 void NagiosCheckTask::Enqueue(void)
 {
        time(&m_Result.StartTime);
-       m_QueuedTasks.push_back(static_pointer_cast<ThreadPoolTask>(static_cast<NagiosCheckTask::Ptr>(GetSelf())));
-//     m_QueuedTasks.push_back(new ThreadPool:Task(bind(&NagiosCheckTask::Execute, static_cast<NagiosCheckTask::Ptr>(GetSelf()))));
-}
 
-void NagiosCheckTask::FlushQueue(void)
-{
-       ThreadPool::GetDefaultPool()->EnqueueTasks(m_QueuedTasks);
-       m_QueuedTasks.clear();
+       {
+                       mutex::scoped_lock lock(m_Mutex);
+                       m_Tasks.push_back(GetSelf());
+       }
 }
 
-void NagiosCheckTask::GetFinishedTasks(vector<CheckTask::Ptr>& tasks)
+void NagiosCheckTask::FlushQueue(void)
 {
-       mutex::scoped_lock lock(m_FinishedTasksMutex);
-       std::copy(m_FinishedTasks.begin(), m_FinishedTasks.end(), back_inserter(tasks));
-       m_FinishedTasks.clear();
+       m_TasksCV.notify_all();
 }
 
 CheckResult NagiosCheckTask::GetResult(void)
@@ -42,56 +36,113 @@ CheckResult NagiosCheckTask::GetResult(void)
        return m_Result;
 }
 
-void NagiosCheckTask::Execute(void)
+void NagiosCheckTask::CheckThreadProc(void)
 {
-       RunCheck();
+       mutex::scoped_lock lock(m_Mutex);
 
-       {
-               mutex::scoped_lock lock(m_FinishedTasksMutex);
-               m_FinishedTasks.push_back(GetSelf());
+       map<int, NagiosCheckTask::Ptr> tasks;
+       const int maxTasks = 16;
+
+       for (;;) {
+               while (m_Tasks.empty() || tasks.size() >= maxTasks) {
+                       lock.unlock();
+
+                       map<int, NagiosCheckTask::Ptr>::iterator it, prev;
+
+#ifndef _MSC_VER
+                       fd_set readfds;
+                       int nfds = 0;
+                       
+                       FD_ZERO(&readfds);
+
+                       for (it = tasks.begin(); it != tasks.end(); it++) {
+                               if (it->first > nfds)
+                                       nfds = it->first;
+
+                               FD_SET(it->first, &readfds);
+                       }
+
+                       timeval tv;
+                       tv.tv_sec = 1;
+                       tv.tv_usec = 0;
+                       select(nfds + 1, &readfds, NULL, NULL, &tv);
+#else /* _MSC_VER */
+                       Sleep(1000);
+#endif /* _MSC_VER */
+
+                       for (it = tasks.begin(); it != tasks.end(); ) {
+#ifndef _MSC_VER
+                               if (!FD_ISSET(it->first, &readfds)) {
+                                       it++;
+                                       continue;
+                               }
+#endif /* _MSC_VER */
+
+                               if (!it->second->RunTask()) {
+                                       CheckTask::FinishTask(it->second);
+                                       it = tasks.erase(it);
+                               } else {
+                                       it++;
+                               }
+                       }
+
+                       lock.lock();
+               }
+
+               while (!m_Tasks.empty() && tasks.size() < maxTasks) {
+                       NagiosCheckTask::Ptr task = m_Tasks.front();
+                       m_Tasks.pop_front();
+                       if (!task->InitTask()) {
+                               CheckTask::FinishTask(task);
+                       } else {
+                               int fd = task->GetFD();
+                               if (fd >= 0)
+                                       tasks[fd] = task;
+                       }
+               }
        }
 }
 
-void NagiosCheckTask::RunCheck(void)
+bool NagiosCheckTask::InitTask(void)
 {
-       FILE *fp;
-
 #ifdef _MSC_VER
-       fp = _popen(m_Command.c_str(), "r");
+       m_FP = _popen(m_Command.c_str(), "r");
 #else /* _MSC_VER */
        bool use_libc_popen = false;
 
        popen_noshell_pass_to_pclose pclose_arg;
 
        if (!use_libc_popen) {
-               fp = popen_noshell_compat(m_Command.c_str(), "r", &pclose_arg);
+               m_FP = popen_noshell_compat(m_Command.c_str(), "r", &pclose_arg);
 
-               if (fp == NULL) // TODO: add check for valgrind
+               if (m_FP == NULL) // TODO: add check for valgrind
                        use_libc_popen = true;
        }
 
        if (use_libc_popen)
-               fp = popen(m_Command.c_str(), "r");
+               m_FP = popen(m_Command.c_str(), "r");
 #endif /* _MSC_VER */
 
-       stringstream outputbuf;
+       return (m_FP != NULL);
+}
 
-       while (!feof(fp)) {
-               char buffer[512];
-               size_t read = fread(buffer, 1, sizeof(buffer), fp);
+bool NagiosCheckTask::RunTask(void)
+{
+       char buffer[512];
+       size_t read = fread(buffer, 1, sizeof(buffer), m_FP);
 
-               if (read == 0)
-                       break;
+       if (read > 0)
+               m_OutputStream.write(buffer, read);
 
-               outputbuf.write(buffer, read);
-       }
+       if (!feof(m_FP))
+               return true;
 
-       m_Result.Output = outputbuf.str();
+       m_Result.Output = m_OutputStream.str();
        boost::algorithm::trim(m_Result.Output);
 
        int status, exitcode;
 #ifdef _MSC_VER
-       status = _pclose(fp);
+       status = _pclose(m_FP);
 #else /* _MSC_VER */
        if (use_libc_popen)
                status = pclose(fp);
@@ -128,9 +179,26 @@ void NagiosCheckTask::RunCheck(void)
 #endif /* _MSC_VER */
 
        time(&m_Result.EndTime);
+
+       return false;
+}
+
+int NagiosCheckTask::GetFD(void) const
+{
+       return fileno(m_FP);
 }
 
 CheckTask::Ptr NagiosCheckTask::CreateTask(const Service& service)
 {
        return boost::make_shared<NagiosCheckTask>(service);
 }
+
+void NagiosCheckTask::Register(void)
+{
+       CheckTask::RegisterType("nagios", NagiosCheckTask::CreateTask, NagiosCheckTask::FlushQueue);
+
+       for (int i = 0; i < 1; i++) {
+               thread t(&NagiosCheckTask::CheckThreadProc);
+               t.detach();
+       }
+}
\ No newline at end of file
index e835b505b56c4b37c9589c8303d116c39cdad308..d850ad8cfd4417b0b64f65f0655c798144ab7a14 100644 (file)
@@ -4,7 +4,7 @@
 namespace icinga
 {
 
-class I2_ICINGA_API NagiosCheckTask : public CheckTask, public ThreadPoolTask
+class I2_ICINGA_API NagiosCheckTask : public CheckTask
 {
 public:
        typedef shared_ptr<NagiosCheckTask> Ptr;
@@ -17,19 +17,25 @@ public:
 
        static CheckTask::Ptr CreateTask(const Service& service);
        static void FlushQueue(void);
-       static void GetFinishedTasks(vector<CheckTask::Ptr>& tasks);
+
+       static void Register(void);
 
 private:
        string m_Command;
        CheckResult m_Result;
 
-       static list<ThreadPoolTask::Ptr> m_QueuedTasks;
+       FILE *m_FP;
+       stringstream m_OutputStream;
+
+       static boost::mutex m_Mutex;
+       static deque<NagiosCheckTask::Ptr> m_Tasks;
+       static condition_variable m_TasksCV;
 
-       static boost::mutex m_FinishedTasksMutex;
-       static vector<CheckTask::Ptr> m_FinishedTasks;
+       static void CheckThreadProc(void);
 
-       virtual void Execute(void);
-       void RunCheck(void);
+       bool InitTask(void);
+       bool RunTask(void);
+       int GetFD(void) const;
 };
 
 }