]> granicus.if.org Git - icinga2/commitdiff
Implemented Process class, cleaned up NagiosCheckTask.
authorGunnar Beutner <gunnar.beutner@netways.de>
Fri, 13 Jul 2012 19:00:54 +0000 (21:00 +0200)
committerGunnar Beutner <gunnar.beutner@netways.de>
Fri, 13 Jul 2012 19:00:54 +0000 (21:00 +0200)
15 files changed:
base/Makefile.am
base/asynctask.h [new file with mode: 0644]
base/base.vcxproj
base/base.vcxproj.filters
base/event.h
base/i2-base.h
base/process.cpp [new file with mode: 0644]
base/process.h [new file with mode: 0644]
cib/checktask.cpp
cib/checktask.h
cib/nagioschecktask.cpp
cib/nagioschecktask.h
components/checker/checkercomponent.cpp
components/checker/checkercomponent.h
icinga-app/icinga-standalone.conf

index 56580af66f5c02ac833696e9de6de7f978cbaf51..24d7699cc02f097a0194055d051f26e76ac2b582 100644 (file)
@@ -7,6 +7,7 @@ pkglib_LTLIBRARIES =  \
 libbase_la_SOURCES =  \
        application.cpp \
        application.h \
+       asynctask.h \
        component.cpp \
        component.h \
        configobject.cpp \
@@ -28,6 +29,8 @@ libbase_la_SOURCES =  \
        objectset.h \
        objectmap.cpp \
        objectmap.h \
+       process.cpp \
+       process.h \
        ringbuffer.cpp \
        ringbuffer.h \
        socket.cpp \
diff --git a/base/asynctask.h b/base/asynctask.h
new file mode 100644 (file)
index 0000000..df16144
--- /dev/null
@@ -0,0 +1,53 @@
+#ifndef ASYNCTASK_H
+#define ASYNCTASK_H 
+
+namespace icinga
+{
+
+template<typename T>
+class AsyncTask : public Object
+{
+public:
+       typedef shared_ptr<AsyncTask<T> > Ptr;
+       typedef weak_ptr<AsyncTask<T> > WeakPtr;
+
+       AsyncTask(void)
+               : m_Finished(false)
+       { }
+
+       ~AsyncTask(void)
+       {
+               assert(m_Finished);
+       }
+
+       void Start(void)
+       {
+               assert(Application::IsMainThread());
+
+               Run();
+       }
+
+       boost::signal<void (const shared_ptr<T>&)> OnTaskCompleted;
+
+protected:
+       virtual void Run(void) = 0;
+
+       void Finish(void)
+       {
+               Event::Ptr ev = boost::make_shared<Event>();
+               ev->OnEventDelivered.connect(boost::bind(&T::FinishForwarder, static_cast<shared_ptr<T> >(GetSelf())));
+               Event::Post(ev);
+       }
+
+       bool m_Finished;
+
+private:
+       static void FinishForwarder(typename const shared_ptr<T>& task)
+       {
+               task->OnTaskCompleted(task);
+       }
+};
+
+}
+
+#endif /* ASYNCTASK_H */
index bc3bbccb81f6025fa4d67e670f7d9f9e2f7db8fd..ad0ca617683fabfe8fb34c300a398673187298c2 100644 (file)
@@ -22,6 +22,7 @@
     <ClCompile Include="object.cpp" />
     <ClCompile Include="objectmap.cpp" />
     <ClCompile Include="objectset.cpp" />
+    <ClCompile Include="process.cpp" />
     <ClCompile Include="ringbuffer.cpp" />
     <ClCompile Include="socket.cpp" />
     <ClCompile Include="streamlogger.cpp" />
@@ -39,6 +40,7 @@
   </ItemGroup>
   <ItemGroup>
     <ClInclude Include="application.h" />
+    <ClInclude Include="asynctask.h" />
     <ClInclude Include="component.h" />
     <ClInclude Include="configobject.h" />
     <ClInclude Include="dictionary.h" />
@@ -50,6 +52,7 @@
     <ClInclude Include="fifo.h" />
     <ClInclude Include="i2-base.h" />
     <ClInclude Include="object.h" />
+    <ClInclude Include="process.h" />
     <ClInclude Include="ringbuffer.h" />
     <ClInclude Include="socket.h" />
     <ClInclude Include="streamlogger.h" />
index 47b1ba4fde5af6592bba1080b5324dc9c70315f9..58d2e0a6ab068e98929d67e23cf3f9929834dfdb 100644 (file)
@@ -76,6 +76,9 @@
     <ClCompile Include="sysloglogger.cpp">
       <Filter>Quelldateien</Filter>
     </ClCompile>
+    <ClCompile Include="process.cpp">
+      <Filter>Quelldateien</Filter>
+    </ClCompile>
   </ItemGroup>
   <ItemGroup>
     <ClInclude Include="application.h">
     <ClInclude Include="sysloglogger.h">
       <Filter>Headerdateien</Filter>
     </ClInclude>
+    <ClInclude Include="asynctask.h">
+      <Filter>Headerdateien</Filter>
+    </ClInclude>
+    <ClInclude Include="process.h">
+      <Filter>Headerdateien</Filter>
+    </ClInclude>
   </ItemGroup>
   <ItemGroup>
-    <Filter Include="Headerdateien">
-      <UniqueIdentifier>{7bbee99c-5763-4063-836c-ddbcc8966ae3}</UniqueIdentifier>
-    </Filter>
     <Filter Include="Quelldateien">
       <UniqueIdentifier>{229e6896-1a39-4b0e-b5e4-a1291b825200}</UniqueIdentifier>
     </Filter>
+    <Filter Include="Headerdateien">
+      <UniqueIdentifier>{7bbee99c-5763-4063-836c-ddbcc8966ae3}</UniqueIdentifier>
+    </Filter>
   </ItemGroup>
 </Project>
\ No newline at end of file
index bb586ef93d345f6946e1e41b5c3f9acd95001710..5919f4e6592d95a28a07861ebe368e5290deeee3 100644 (file)
@@ -23,7 +23,7 @@
 namespace icinga
 {
 
-class Event : public Object
+class I2_BASE_API Event : public Object
 {
 public:
        typedef shared_ptr<Event> Ptr;
index 1f05082685c2ebd1bbb99a68ec02d1cf614cc60f..a0b1663fabd8c1ecd18cd55d939c98394df3e555 100644 (file)
@@ -175,5 +175,7 @@ using boost::system_time;
 #include "logger.h"
 #include "streamlogger.h"
 #include "sysloglogger.h"
+#include "asynctask.h"
+#include "process.h"
 
 #endif /* I2BASE_H */
diff --git a/base/process.cpp b/base/process.cpp
new file mode 100644 (file)
index 0000000..439ad34
--- /dev/null
@@ -0,0 +1,199 @@
+#include "i2-base.h"
+
+using namespace icinga;
+
+bool Process::m_ThreadsCreated = false;
+boost::mutex Process::m_Mutex;
+deque<Process::Ptr> Process::m_Tasks;
+condition_variable Process::m_TasksCV;
+
+Process::Process(const string& command)
+       : m_Command(command)
+{
+       if (!m_ThreadsCreated) {
+               int numThreads = boost::thread::hardware_concurrency();
+
+               if (numThreads < 4)
+                       numThreads = 4;
+
+               for (int i = 0; i < numThreads; i++) {
+                       thread t(&Process::WorkerThreadProc);
+                       t.detach();
+               }
+
+               m_ThreadsCreated = true;
+       }
+}
+
+void Process::Run(void)
+{
+       mutex::scoped_lock lock(m_Mutex);
+       m_Tasks.push_back(GetSelf());
+       m_TasksCV.notify_one();
+}
+
+void Process::WorkerThreadProc(void)
+{
+       mutex::scoped_lock lock(m_Mutex);
+
+       map<int, Process::Ptr> tasks;
+
+       for (;;) {
+               while (m_Tasks.empty() || tasks.size() >= MaxTasksPerThread) {
+                       lock.unlock();
+
+                       map<int, Process::Ptr>::iterator it, prev;
+
+#ifndef _MSC_VER
+                       fd_set readfds;
+                       int nfds = 0;
+                       
+                       FD_ZERO(&readfds);
+
+                       for (it = tasks.begin(); it != tasks.end(); it++) {
+                               if (it->first > nfds)
+                                       nfds = it->first;
+
+                               FD_SET(it->first, &readfds);
+                       }
+
+                       timeval tv;
+                       tv.tv_sec = 1;
+                       tv.tv_usec = 0;
+                       select(nfds + 1, &readfds, NULL, NULL, &tv);
+#else /* _MSC_VER */
+                       Sleep(1000);
+#endif /* _MSC_VER */
+
+                       for (it = tasks.begin(); it != tasks.end(); ) {
+                               int fd = it->first;
+                               Process::Ptr task = it->second;
+
+#ifndef _MSC_VER
+                               if (!FD_ISSET(fd, &readfds)) {
+                                       it++;
+                                       continue;
+                               }
+#endif /* _MSC_VER */
+
+                               if (!task->RunTask()) {
+                                       prev = it;
+                                       it++;
+                                       tasks.erase(prev);
+
+                                       task->Finish();
+                               } else {
+                                       it++;
+                               }
+                       }
+
+                       lock.lock();
+               }
+
+               while (!m_Tasks.empty() && tasks.size() < MaxTasksPerThread) {
+                       Process::Ptr task = m_Tasks.front();
+                       m_Tasks.pop_front();
+                       if (!task->InitTask()) {
+                               task->Finish();
+                       } else {
+                               int fd = task->GetFD();
+                               if (fd >= 0)
+                                       tasks[fd] = task;
+                       }
+               }
+       }
+}
+
+bool Process::InitTask(void)
+{
+#ifdef _MSC_VER
+       m_FP = _popen(m_Command.c_str(), "r");
+#else /* _MSC_VER */
+       if (!m_UsePopen) {
+               m_PCloseArg = new popen_noshell_pass_to_pclose;
+
+               m_FP = popen_noshell_compat(m_Command.c_str(), "r",
+                   (popen_noshell_pass_to_pclose *)m_PCloseArg);
+
+               if (m_FP == NULL) // TODO: add check for valgrind
+                       m_UsePopen = true;
+       }
+
+       if (m_UsePopen)
+               m_FP = popen(m_Command.c_str(), "r");
+#endif /* _MSC_VER */
+
+       if (m_FP == NULL) {
+               return false;
+       }
+
+       return true;
+}
+
+bool Process::RunTask(void)
+{
+       char buffer[512];
+       size_t read = fread(buffer, 1, sizeof(buffer), m_FP);
+
+       if (read > 0)
+               m_OutputStream.write(buffer, read);
+
+       if (!feof(m_FP))
+               return true;
+
+       string output = m_OutputStream.str();
+
+       int status, exitcode;
+#ifdef _MSC_VER
+       status = _pclose(m_FP);
+#else /* _MSC_VER */
+       if (m_UsePopen) {
+               status = pclose(m_FP);
+       } else {
+               status = pclose_noshell((popen_noshell_pass_to_pclose *)m_PCloseArg);
+               delete (popen_noshell_pass_to_pclose *)m_PCloseArg;
+       }
+#endif /* _MSC_VER */
+
+#ifndef _MSC_VER
+       if (WIFEXITED(status)) {
+               exitcode = WEXITSTATUS(status);
+#else /* _MSC_VER */
+               exitcode = status;
+
+               /* cmd.exe returns error code 1 (warning) when the plugin
+                * could not be executed - change the exit status to "unknown"
+                * when we have no plugin output. */
+               if (output.empty())
+                       exitcode = 128;
+#endif /* _MSC_VER */
+
+#ifndef _MSC_VER
+       } else if (WIFSIGNALED(status)) {
+               stringstream outputbuf;
+               outputbuf << "Process was terminated by signal " << WTERMSIG(status);
+               output = outputbuf.str();
+               exitcode = 128;
+       }
+#endif /* _MSC_VER */
+
+       m_ExitStatus = exitcode;
+       m_Output = output;
+
+       return false;
+}
+
+int Process::GetFD(void) const
+{
+       return fileno(m_FP);
+}
+
+long Process::GetExitStatus(void) const
+{
+       return m_ExitStatus;
+}
+
+string Process::GetOutput(void) const
+{
+       return m_Output;
+}
diff --git a/base/process.h b/base/process.h
new file mode 100644 (file)
index 0000000..f7b45cb
--- /dev/null
@@ -0,0 +1,51 @@
+#ifndef PROCESS_H
+#define PROCESS_H
+
+namespace icinga
+{
+
+class I2_BASE_API Process : public AsyncTask<Process>
+{
+public:
+       typedef shared_ptr<Process> Ptr;
+       typedef weak_ptr<Process> WeakPtr;
+
+       static const int MaxTasksPerThread = 128;
+
+       Process(const string& command);
+
+       long GetExitStatus(void) const;
+       string GetOutput(void) const;
+
+private:
+       static bool m_ThreadsCreated;
+
+       string m_Command;
+
+       FILE *m_FP;
+       stringstream m_OutputStream;
+       bool m_UsePopen;
+#ifndef _MSC_VER
+       void *m_PCloseArg;
+#endif /* _MSC_VER */
+
+       long m_ExitStatus;
+       string m_Output;
+
+       virtual void Run(void);
+
+       static boost::mutex m_Mutex;
+       static deque<Process::Ptr> m_Tasks;
+       static condition_variable m_TasksCV;
+
+       static void WorkerThreadProc(void);
+
+       bool InitTask(void);
+       bool RunTask(void);
+
+       int GetFD(void) const;
+};
+
+}
+
+#endif /* PROCESS_H */
index 207c22eb774a44d63f9cde4e8bb8d3388392a740..91afd23be69861efe6b7522a8f7b86749bc36634 100644 (file)
@@ -22,8 +22,6 @@
 using namespace icinga;
 
 map<string, CheckTaskType> CheckTask::m_Types;
-vector<CheckTask::Ptr> CheckTask::m_FinishedTasks;
-mutex CheckTask::m_FinishedTasksMutex;
 
 CheckTask::CheckTask(const Service& service)
        : m_Service(service)
@@ -39,11 +37,10 @@ CheckResult& CheckTask::GetResult(void)
        return m_Result;
 }
 
-void CheckTask::RegisterType(string type, Factory factory, QueueFlusher qflusher)
+void CheckTask::RegisterType(string type, Factory factory)
 {
        CheckTaskType ctt;
        ctt.Factory = factory;
-       ctt.QueueFlusher = qflusher;
 
        m_Types[type] = ctt;
 }
@@ -59,32 +56,3 @@ CheckTask::Ptr CheckTask::CreateTask(const Service& service)
 
        return it->second.Factory(service);
 }
-
-void CheckTask::Enqueue(const CheckTask::Ptr& task)
-{
-       task->Enqueue();
-}
-
-void CheckTask::FlushQueue(void)
-{
-       map<string, CheckTaskType>::iterator it;
-       for (it = m_Types.begin(); it != m_Types.end(); it++)
-               it->second.QueueFlusher();
-}
-
-vector<CheckTask::Ptr> CheckTask::GetFinishedTasks(void)
-{
-       mutex::scoped_lock lock(m_FinishedTasksMutex);
-
-       vector<CheckTask::Ptr> result = m_FinishedTasks;
-       m_FinishedTasks.clear();
-
-       return result;
-}
-
-void CheckTask::FinishTask(const CheckTask::Ptr& task)
-{
-       mutex::scoped_lock lock(m_FinishedTasksMutex);
-       m_FinishedTasks.push_back(task);
-}
-
index 7bfe5d4412c385d30195ffad1d46815a80dc30d1..321300158ff74ce427d8f7cfb50d3909667dc0ef 100644 (file)
@@ -25,46 +25,37 @@ namespace icinga
 
 struct CheckTaskType;
 
-class I2_CIB_API CheckTask : public Object
+class I2_CIB_API CheckTask : public AsyncTask<CheckTask>
 {
 public:
        typedef shared_ptr<CheckTask> Ptr;
        typedef weak_ptr<CheckTask> WeakPtr;
 
        typedef function<CheckTask::Ptr(const Service&)> Factory;
-       typedef function<void()> QueueFlusher;
 
        Service& GetService(void);
        CheckResult& GetResult(void);
 
-       virtual void Enqueue(void) = 0;
-
-       static void RegisterType(string type, Factory factory, QueueFlusher qflusher);
+       static void RegisterType(string type, Factory factory);
        static CheckTask::Ptr CreateTask(const Service& service);
-       static void Enqueue(const CheckTask::Ptr& task);
-       static void FlushQueue(void);
 
        static int GetTaskHistogramSlots(void);
-       static void FinishTask(const CheckTask::Ptr& task);
-       static vector<CheckTask::Ptr> GetFinishedTasks(void);
 
 protected:
        CheckTask(const Service& service);
 
+       virtual void Run(void) = 0;
+
 private:
        Service m_Service;
        CheckResult m_Result;
 
        static map<string, CheckTaskType> m_Types;
-
-       static vector<CheckTask::Ptr> m_FinishedTasks;
-       static mutex m_FinishedTasksMutex;
 };
 
 struct CheckTaskType
 {
        CheckTask::Factory Factory;
-       CheckTask::QueueFlusher QueueFlusher;
 };
 
 }
index ae96f95944d3117967ff27b8d8b936cd71f7f3e6..334c1ffe87b57f5a8a4b93db21b51a6fe33db7ee 100644 (file)
 
 using namespace icinga;
 
-boost::mutex NagiosCheckTask::m_Mutex;
-vector<NagiosCheckTask::Ptr> NagiosCheckTask::m_PendingTasks;
-deque<NagiosCheckTask::Ptr> NagiosCheckTask::m_Tasks;
-condition_variable NagiosCheckTask::m_TasksCV;
-
 NagiosCheckTask::NagiosCheckTask(const Service& service)
-       : CheckTask(service), m_FP(NULL), m_UsePopen(false)
+       : CheckTask(service)
 {
        string checkCommand = service.GetCheckCommand();
 
@@ -39,138 +34,52 @@ NagiosCheckTask::NagiosCheckTask(const Service& service)
        macroDicts.push_back(service.GetMacros());
        macroDicts.push_back(service.GetHost().GetMacros());
        macroDicts.push_back(IcingaApplication::GetInstance()->GetMacros());
-       m_Command = MacroProcessor::ResolveMacros(checkCommand, macroDicts);
+       string command = MacroProcessor::ResolveMacros(checkCommand, macroDicts);
+       m_Process = boost::make_shared<Process>(command);
 }
 
-void NagiosCheckTask::Enqueue(void)
+void NagiosCheckTask::Run(void)
 {
        time_t now;
        time(&now);
        GetResult().SetScheduleStart(now);
 
-       m_PendingTasks.push_back(GetSelf());
-}
-
-void NagiosCheckTask::FlushQueue(void)
-{
-       {
-               mutex::scoped_lock lock(m_Mutex);
-               std::copy(m_PendingTasks.begin(), m_PendingTasks.end(), back_inserter(m_Tasks));
-               m_PendingTasks.clear();
-               m_TasksCV.notify_all();
-       }
+       m_Process->OnTaskCompleted.connect(boost::bind(&NagiosCheckTask::ProcessFinishedHandler, static_cast<NagiosCheckTask::Ptr>(GetSelf())));
+       m_Process->Start();
 }
 
-void NagiosCheckTask::CheckThreadProc(void)
+void NagiosCheckTask::ProcessFinishedHandler(void)
 {
-       mutex::scoped_lock lock(m_Mutex);
-
-       map<int, NagiosCheckTask::Ptr> tasks;
-
-       for (;;) {
-               while (m_Tasks.empty() || tasks.size() >= MaxChecksPerThread) {
-                       lock.unlock();
-
-                       map<int, NagiosCheckTask::Ptr>::iterator it, prev;
-
-#ifndef _MSC_VER
-                       fd_set readfds;
-                       int nfds = 0;
-                       
-                       FD_ZERO(&readfds);
-
-                       for (it = tasks.begin(); it != tasks.end(); it++) {
-                               if (it->first > nfds)
-                                       nfds = it->first;
-
-                               FD_SET(it->first, &readfds);
-                       }
-
-                       timeval tv;
-                       tv.tv_sec = 1;
-                       tv.tv_usec = 0;
-                       select(nfds + 1, &readfds, NULL, NULL, &tv);
-#else /* _MSC_VER */
-                       Sleep(1000);
-#endif /* _MSC_VER */
-
-                       for (it = tasks.begin(); it != tasks.end(); ) {
-                               int fd = it->first;
-                               NagiosCheckTask::Ptr task = it->second;
-
-#ifndef _MSC_VER
-                               if (!FD_ISSET(fd, &readfds)) {
-                                       it++;
-                                       continue;
-                               }
-#endif /* _MSC_VER */
-
-                               if (!task->RunTask()) {
-                                       time_t now;
-                                       time(&now);
-                                       task->GetResult().SetScheduleEnd(now);
-
-                                       CheckTask::FinishTask(task);
-                                       prev = it;
-                                       it++;
-                                       tasks.erase(prev);
-                               } else {
-                                       it++;
-                               }
-                       }
-
-                       lock.lock();
-               }
-
-               while (!m_Tasks.empty() && tasks.size() < MaxChecksPerThread) {
-                       NagiosCheckTask::Ptr task = m_Tasks.front();
-                       m_Tasks.pop_front();
-                       if (!task->InitTask()) {
-                               time_t now;
-                               time(&now);
-                               task->GetResult().SetScheduleEnd(now);
+       string output = m_Process->GetOutput();
+       boost::algorithm::trim(output);
+       ProcessCheckOutput(output);
 
-                               CheckTask::FinishTask(task);
-                       } else {
-                               int fd = task->GetFD();
-                               if (fd >= 0)
-                                       tasks[fd] = task;
-                       }
-               }
+       long exitcode = m_Process->GetExitStatus();
+
+       ServiceState state;
+
+       switch (exitcode) {
+               case 0:
+                       state = StateOK;
+                       break;
+               case 1:
+                       state = StateWarning;
+                       break;
+               case 2:
+                       state = StateCritical;
+                       break;
+               default:
+                       state = StateUnknown;
+                       break;
        }
-}
 
-bool NagiosCheckTask::InitTask(void)
-{
+       GetResult().SetState(state);
+
        time_t now;
        time(&now);
-       GetResult().SetExecutionStart(now);
-
-#ifdef _MSC_VER
-       m_FP = _popen(m_Command.c_str(), "r");
-#else /* _MSC_VER */
-       if (!m_UsePopen) {
-               m_PCloseArg = new popen_noshell_pass_to_pclose;
-
-               m_FP = popen_noshell_compat(m_Command.c_str(), "r", (popen_noshell_pass_to_pclose *)m_PCloseArg);
-
-               if (m_FP == NULL) // TODO: add check for valgrind
-                       m_UsePopen = true;
-       }
-
-       if (m_UsePopen)
-               m_FP = popen(m_Command.c_str(), "r");
-#endif /* _MSC_VER */
-
-       if (m_FP == NULL) {
-               time_t now;
-               time(&now);
-               GetResult().SetExecutionEnd(now);
-
-               return false;
-       }
+       GetResult().SetExecutionEnd(now);
 
-       return true;
+       Finish();
 }
 
 void NagiosCheckTask::ProcessCheckOutput(const string& output)
@@ -206,85 +115,6 @@ void NagiosCheckTask::ProcessCheckOutput(const string& output)
        GetResult().SetPerformanceDataRaw(perfdata);
 }
 
-bool NagiosCheckTask::RunTask(void)
-{
-       char buffer[512];
-       size_t read = fread(buffer, 1, sizeof(buffer), m_FP);
-
-       if (read > 0)
-               m_OutputStream.write(buffer, read);
-
-       if (!feof(m_FP))
-               return true;
-
-       string output = m_OutputStream.str();
-       boost::algorithm::trim(output);
-       ProcessCheckOutput(output);
-
-       int status, exitcode;
-#ifdef _MSC_VER
-       status = _pclose(m_FP);
-#else /* _MSC_VER */
-       if (m_UsePopen) {
-               status = pclose(m_FP);
-       } else {
-               status = pclose_noshell((popen_noshell_pass_to_pclose *)m_PCloseArg);
-               delete (popen_noshell_pass_to_pclose *)m_PCloseArg;
-       }
-#endif /* _MSC_VER */
-
-#ifndef _MSC_VER
-       if (WIFEXITED(status)) {
-               exitcode = WEXITSTATUS(status);
-#else /* _MSC_VER */
-               exitcode = status;
-
-               /* cmd.exe returns error code 1 (warning) when the plugin
-                * could not be executed - change the exit status to "unknown"
-                * when we have no plugin output. */
-               if (output.empty())
-                       exitcode = 128;
-#endif /* _MSC_VER */
-
-               ServiceState state;
-
-               switch (exitcode) {
-                       case 0:
-                               state = StateOK;
-                               break;
-                       case 1:
-                               state = StateWarning;
-                               break;
-                       case 2:
-                               state = StateCritical;
-                               break;
-                       default:
-                               state = StateUnknown;
-                               break;
-               }
-
-               GetResult().SetState(state);
-#ifndef _MSC_VER
-       } else if (WIFSIGNALED(status)) {
-               stringstream outputbuf;
-               outputbuf << "Process was terminated by signal " << WTERMSIG(status);
-               GetResult().SetOutput(outputbuf.str());
-               GetResult().SetState(StateUnknown);
-       }
-#endif /* _MSC_VER */
-
-       time_t now;
-       time(&now);
-       GetResult().SetExecutionEnd(now);
-
-       return false;
-}
-
-int NagiosCheckTask::GetFD(void) const
-{
-       return fileno(m_FP);
-}
-
 CheckTask::Ptr NagiosCheckTask::CreateTask(const Service& service)
 {
        return boost::make_shared<NagiosCheckTask>(service);
@@ -292,15 +122,5 @@ CheckTask::Ptr NagiosCheckTask::CreateTask(const Service& service)
 
 void NagiosCheckTask::Register(void)
 {
-       CheckTask::RegisterType("nagios", NagiosCheckTask::CreateTask, NagiosCheckTask::FlushQueue);
-
-       int numThreads = boost::thread::hardware_concurrency();
-
-       if (numThreads < 4)
-               numThreads = 4;
-
-       for (int i = 0; i < numThreads; i++) {
-               thread t(&NagiosCheckTask::CheckThreadProc);
-               t.detach();
-       }
+       CheckTask::RegisterType("nagios", NagiosCheckTask::CreateTask);
 }
index 5811ab24b4c08259edd383c97df97ca753047357..0375324d031690f8fea4d74293ce452de1e5ca6e 100644 (file)
@@ -29,38 +29,18 @@ public:
        typedef shared_ptr<NagiosCheckTask> Ptr;
        typedef weak_ptr<NagiosCheckTask> WeakPtr;
 
-       static const int MaxChecksPerThread = 128;
-
        NagiosCheckTask(const Service& service);
 
-       virtual void Enqueue(void);
-
        static CheckTask::Ptr CreateTask(const Service& service);
-       static void FlushQueue(void);
 
        static void Register(void);
 
 private:
-       string m_Command;
-
-       FILE *m_FP;
-       stringstream m_OutputStream;
-       bool m_UsePopen;
-#ifndef _MSC_VER
-       void *m_PCloseArg;
-#endif /* _MSC_VER */
-
-       static boost::mutex m_Mutex;
-       static deque<NagiosCheckTask::Ptr> m_Tasks;
-       static vector<NagiosCheckTask::Ptr> m_PendingTasks;
-       static condition_variable m_TasksCV;
-
-       static void CheckThreadProc(void);
+       Process::Ptr m_Process;
 
-       bool InitTask(void);
+       virtual void Run(void);
+       void ProcessFinishedHandler(void);
        void ProcessCheckOutput(const string& output);
-       bool RunTask(void);
-       int GetFD(void) const;
 };
 
 }
index e6cc0abddd41ae6a622fc754a413d03557710874..89ae89d350927426ce5b5cb1062676d1e6d0ebee 100644 (file)
@@ -79,102 +79,67 @@ void CheckerComponent::CheckTimerHandler(void)
                m_PendingServices.insert(service.GetConfigObject());
 
                CheckTask::Ptr task = CheckTask::CreateTask(service);
-               task->Enqueue();
+               task->OnTaskCompleted.connect(boost::bind(&CheckerComponent::CheckCompletedHandler, this, _1));
+               task->Start();
 
                tasks++;
        }
 
        Logger::Write(LogDebug, "checker", "CheckTimerHandler: past loop.");
 
-       CheckTask::FlushQueue();
-
        stringstream msgbuf;
        msgbuf << "CheckTimerHandler: created " << tasks << " tasks";
        Logger::Write(LogInformation, "checker", msgbuf.str());
 }
 
-void CheckerComponent::ResultTimerHandler(void)
+void CheckerComponent::CheckCompletedHandler(const CheckTask::Ptr& task)
 {
-       Logger::Write(LogDebug, "checker", "ResultTimerHandler entered.");
-
-       time_t now;
-       time(&now);
-
-       long min_latency = -1, max_latency = 0, avg_latency = 0, results = 0, failed = 0;
-
-       vector<CheckTask::Ptr> finishedTasks = CheckTask::GetFinishedTasks();
-
-       for (vector<CheckTask::Ptr>::iterator it = finishedTasks.begin(); it != finishedTasks.end(); it++) {
-               CheckTask::Ptr task = *it;
-
-               Service service = task->GetService();
-
-               /* if the service isn't in the set of pending services
-                * it was removed and we need to ignore this check result. */
-               if (m_PendingServices.find(service.GetConfigObject()) == m_PendingServices.end())
-                       continue;
-
-               CheckResult result = task->GetResult();
-               Logger::Write(LogDebug, "checker", "Got result for service '" + service.GetName() + "'");
-
-               long execution_time = result.GetExecutionEnd() - result.GetExecutionStart();
-               long latency = (result.GetScheduleEnd() - result.GetScheduleStart()) - execution_time;
-               avg_latency += latency;
-
-               if (min_latency == -1 || latency < min_latency)
-                       min_latency = latency;
+       Service service = task->GetService();
 
-               if (latency > max_latency)
-                       max_latency = latency;
-
-               results++;
+       /* if the service isn't in the set of pending services
+               * it was removed and we need to ignore this check result. */
+       if (m_PendingServices.find(service.GetConfigObject()) == m_PendingServices.end())
+               return;
 
-               if (result.GetState() != StateOK)
-                       failed++;
+       CheckResult result = task->GetResult();
+       Logger::Write(LogDebug, "checker", "Got result for service '" + service.GetName() + "'");
 
-               /* update service state */
-               service.ApplyCheckResult(result);
+       long execution_time = result.GetExecutionEnd() - result.GetExecutionStart();
+       long latency = (result.GetScheduleEnd() - result.GetScheduleStart()) - execution_time;
 
-               /* figure out when the next check is for this service */
-               service.UpdateNextCheck();
+       /* update service state */
+       service.ApplyCheckResult(result);
 
-               /* remove the service from the list of pending services */
-               m_PendingServices.erase(service.GetConfigObject());
-               m_Services.push(service);
+       /* figure out when the next check is for this service */
+       service.UpdateNextCheck();
 
-               RequestMessage rm;
-               rm.SetMethod("checker::CheckResult");
+       /* remove the service from the list of pending services */
+       m_PendingServices.erase(service.GetConfigObject());
+       m_Services.push(service);
 
-               ServiceStatusMessage params;
-               params.SetService(service.GetName());
-               params.SetState(service.GetState());
-               params.SetStateType(service.GetStateType());
-               params.SetCurrentCheckAttempt(service.GetCurrentCheckAttempt());
-               params.SetNextCheck(service.GetNextCheck());
-               params.SetCheckResult(result);
+       RequestMessage rm;
+       rm.SetMethod("checker::CheckResult");
 
-               rm.SetParams(params);
+       ServiceStatusMessage params;
+       params.SetService(service.GetName());
+       params.SetState(service.GetState());
+       params.SetStateType(service.GetStateType());
+       params.SetCurrentCheckAttempt(service.GetCurrentCheckAttempt());
+       params.SetNextCheck(service.GetNextCheck());
+       params.SetCheckResult(result);
 
-               EndpointManager::GetInstance()->SendMulticastMessage(m_Endpoint, rm);
-       }
+       rm.SetParams(params);
 
-       if (min_latency > 5) {
-               stringstream latwarn;
-               latwarn << "We can't keep up with the checks: minimum latency is " << min_latency << " seconds";
-               Logger::Write(LogWarning, "checker", latwarn.str());
-       }
+       EndpointManager::GetInstance()->SendMulticastMessage(m_Endpoint, rm);
+}
 
-       {
-               stringstream msgbuf;
-               msgbuf << "ResultTimerHandler: " << results << " results (" << failed << " failed); latency: avg=" << avg_latency / (results ? results : 1) << ", min=" << min_latency << ", max: " << max_latency;
-               Logger::Write(LogInformation, "checker", msgbuf.str());
-       }
+void CheckerComponent::ResultTimerHandler(void)
+{
+       Logger::Write(LogDebug, "checker", "ResultTimerHandler entered.");
 
-       {
-               stringstream msgbuf;
-               msgbuf << "Pending services: " << m_PendingServices.size() << "; Idle services: " << m_Services.size();
-               Logger::Write(LogInformation, "checker", msgbuf.str());
-       }
+       stringstream msgbuf;
+       msgbuf << "Pending services: " << m_PendingServices.size() << "; Idle services: " << m_Services.size();
+       Logger::Write(LogInformation, "checker", msgbuf.str());
 }
 
 void CheckerComponent::AssignServiceRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request)
index 8584b3a75e60383b0ac83ebcfe5d2400845cacc9..b3e6b7ff13093ae4fbd0028eb29e3d3a9d914971 100644 (file)
@@ -60,6 +60,8 @@ private:
        void CheckTimerHandler(void);
        void ResultTimerHandler(void);
 
+       void CheckCompletedHandler(const CheckTask::Ptr& task);
+
        void AdjustCheckTimer(void);
 
        void AssignServiceRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request);
index c82a790fa36f05d3588ac021ee92b9d18c3e0087..46316c2e119ad7f82174cde5997b005966aebe29 100644 (file)
@@ -1,14 +1,14 @@
 local object application "icinga" {
-       cert = "icinga-c1.pem",
+/*     cert = "icinga-c1.pem",
        ca = "ca.crt",
 
        node = "192.168.2.235",
-       service = 7777
+       service = 7777*/
 }
 
-local object component "discovery" {
+/*local object component "discovery" {
 
-}
+}*/
 
 local object component "checker" {
 
@@ -18,7 +18,7 @@ local object component "delegation" {
 
 }
 
-local object endpoint "icinga-c2" {
+/*local object endpoint "icinga-c2" {
        roles = { "all" }
 }
 
@@ -33,7 +33,7 @@ local object endpoint "icinga-c4" {
 local object role "all" {
        publications = { "*" },
        subscriptions = { "*" }
-}
+}*/
 
 object host "localhost" {