libbase_la_SOURCES = \
application.cpp \
application.h \
+ asynctask.h \
component.cpp \
component.h \
configobject.cpp \
objectset.h \
objectmap.cpp \
objectmap.h \
+ process.cpp \
+ process.h \
ringbuffer.cpp \
ringbuffer.h \
socket.cpp \
--- /dev/null
+#ifndef ASYNCTASK_H
+#define ASYNCTASK_H
+
+namespace icinga
+{
+
+template<typename T>
+class AsyncTask : public Object
+{
+public:
+ typedef shared_ptr<AsyncTask<T> > Ptr;
+ typedef weak_ptr<AsyncTask<T> > WeakPtr;
+
+ AsyncTask(void)
+ : m_Finished(false)
+ { }
+
+ ~AsyncTask(void)
+ {
+ assert(m_Finished);
+ }
+
+ void Start(void)
+ {
+ assert(Application::IsMainThread());
+
+ Run();
+ }
+
+ boost::signal<void (const shared_ptr<T>&)> OnTaskCompleted;
+
+protected:
+ virtual void Run(void) = 0;
+
+ void Finish(void)
+ {
+ Event::Ptr ev = boost::make_shared<Event>();
+ ev->OnEventDelivered.connect(boost::bind(&T::FinishForwarder, static_cast<shared_ptr<T> >(GetSelf())));
+ Event::Post(ev);
+ }
+
+ bool m_Finished;
+
+private:
+ static void FinishForwarder(typename const shared_ptr<T>& task)
+ {
+ task->OnTaskCompleted(task);
+ }
+};
+
+}
+
+#endif /* ASYNCTASK_H */
<ClCompile Include="object.cpp" />
<ClCompile Include="objectmap.cpp" />
<ClCompile Include="objectset.cpp" />
+ <ClCompile Include="process.cpp" />
<ClCompile Include="ringbuffer.cpp" />
<ClCompile Include="socket.cpp" />
<ClCompile Include="streamlogger.cpp" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="application.h" />
+ <ClInclude Include="asynctask.h" />
<ClInclude Include="component.h" />
<ClInclude Include="configobject.h" />
<ClInclude Include="dictionary.h" />
<ClInclude Include="fifo.h" />
<ClInclude Include="i2-base.h" />
<ClInclude Include="object.h" />
+ <ClInclude Include="process.h" />
<ClInclude Include="ringbuffer.h" />
<ClInclude Include="socket.h" />
<ClInclude Include="streamlogger.h" />
<ClCompile Include="sysloglogger.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
+ <ClCompile Include="process.cpp">
+ <Filter>Quelldateien</Filter>
+ </ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="application.h">
<ClInclude Include="sysloglogger.h">
<Filter>Headerdateien</Filter>
</ClInclude>
+ <ClInclude Include="asynctask.h">
+ <Filter>Headerdateien</Filter>
+ </ClInclude>
+ <ClInclude Include="process.h">
+ <Filter>Headerdateien</Filter>
+ </ClInclude>
</ItemGroup>
<ItemGroup>
- <Filter Include="Headerdateien">
- <UniqueIdentifier>{7bbee99c-5763-4063-836c-ddbcc8966ae3}</UniqueIdentifier>
- </Filter>
<Filter Include="Quelldateien">
<UniqueIdentifier>{229e6896-1a39-4b0e-b5e4-a1291b825200}</UniqueIdentifier>
</Filter>
+ <Filter Include="Headerdateien">
+ <UniqueIdentifier>{7bbee99c-5763-4063-836c-ddbcc8966ae3}</UniqueIdentifier>
+ </Filter>
</ItemGroup>
</Project>
\ No newline at end of file
namespace icinga
{
-class Event : public Object
+class I2_BASE_API Event : public Object
{
public:
typedef shared_ptr<Event> Ptr;
#include "logger.h"
#include "streamlogger.h"
#include "sysloglogger.h"
+#include "asynctask.h"
+#include "process.h"
#endif /* I2BASE_H */
--- /dev/null
+#include "i2-base.h"
+
+using namespace icinga;
+
+bool Process::m_ThreadsCreated = false;
+boost::mutex Process::m_Mutex;
+deque<Process::Ptr> Process::m_Tasks;
+condition_variable Process::m_TasksCV;
+
+Process::Process(const string& command)
+ : m_Command(command)
+{
+ if (!m_ThreadsCreated) {
+ int numThreads = boost::thread::hardware_concurrency();
+
+ if (numThreads < 4)
+ numThreads = 4;
+
+ for (int i = 0; i < numThreads; i++) {
+ thread t(&Process::WorkerThreadProc);
+ t.detach();
+ }
+
+ m_ThreadsCreated = true;
+ }
+}
+
+void Process::Run(void)
+{
+ mutex::scoped_lock lock(m_Mutex);
+ m_Tasks.push_back(GetSelf());
+ m_TasksCV.notify_one();
+}
+
+void Process::WorkerThreadProc(void)
+{
+ mutex::scoped_lock lock(m_Mutex);
+
+ map<int, Process::Ptr> tasks;
+
+ for (;;) {
+ while (m_Tasks.empty() || tasks.size() >= MaxTasksPerThread) {
+ lock.unlock();
+
+ map<int, Process::Ptr>::iterator it, prev;
+
+#ifndef _MSC_VER
+ fd_set readfds;
+ int nfds = 0;
+
+ FD_ZERO(&readfds);
+
+ for (it = tasks.begin(); it != tasks.end(); it++) {
+ if (it->first > nfds)
+ nfds = it->first;
+
+ FD_SET(it->first, &readfds);
+ }
+
+ timeval tv;
+ tv.tv_sec = 1;
+ tv.tv_usec = 0;
+ select(nfds + 1, &readfds, NULL, NULL, &tv);
+#else /* _MSC_VER */
+ Sleep(1000);
+#endif /* _MSC_VER */
+
+ for (it = tasks.begin(); it != tasks.end(); ) {
+ int fd = it->first;
+ Process::Ptr task = it->second;
+
+#ifndef _MSC_VER
+ if (!FD_ISSET(fd, &readfds)) {
+ it++;
+ continue;
+ }
+#endif /* _MSC_VER */
+
+ if (!task->RunTask()) {
+ prev = it;
+ it++;
+ tasks.erase(prev);
+
+ task->Finish();
+ } else {
+ it++;
+ }
+ }
+
+ lock.lock();
+ }
+
+ while (!m_Tasks.empty() && tasks.size() < MaxTasksPerThread) {
+ Process::Ptr task = m_Tasks.front();
+ m_Tasks.pop_front();
+ if (!task->InitTask()) {
+ task->Finish();
+ } else {
+ int fd = task->GetFD();
+ if (fd >= 0)
+ tasks[fd] = task;
+ }
+ }
+ }
+}
+
+bool Process::InitTask(void)
+{
+#ifdef _MSC_VER
+ m_FP = _popen(m_Command.c_str(), "r");
+#else /* _MSC_VER */
+ if (!m_UsePopen) {
+ m_PCloseArg = new popen_noshell_pass_to_pclose;
+
+ m_FP = popen_noshell_compat(m_Command.c_str(), "r",
+ (popen_noshell_pass_to_pclose *)m_PCloseArg);
+
+ if (m_FP == NULL) // TODO: add check for valgrind
+ m_UsePopen = true;
+ }
+
+ if (m_UsePopen)
+ m_FP = popen(m_Command.c_str(), "r");
+#endif /* _MSC_VER */
+
+ if (m_FP == NULL) {
+ return false;
+ }
+
+ return true;
+}
+
+bool Process::RunTask(void)
+{
+ char buffer[512];
+ size_t read = fread(buffer, 1, sizeof(buffer), m_FP);
+
+ if (read > 0)
+ m_OutputStream.write(buffer, read);
+
+ if (!feof(m_FP))
+ return true;
+
+ string output = m_OutputStream.str();
+
+ int status, exitcode;
+#ifdef _MSC_VER
+ status = _pclose(m_FP);
+#else /* _MSC_VER */
+ if (m_UsePopen) {
+ status = pclose(m_FP);
+ } else {
+ status = pclose_noshell((popen_noshell_pass_to_pclose *)m_PCloseArg);
+ delete (popen_noshell_pass_to_pclose *)m_PCloseArg;
+ }
+#endif /* _MSC_VER */
+
+#ifndef _MSC_VER
+ if (WIFEXITED(status)) {
+ exitcode = WEXITSTATUS(status);
+#else /* _MSC_VER */
+ exitcode = status;
+
+ /* cmd.exe returns error code 1 (warning) when the plugin
+ * could not be executed - change the exit status to "unknown"
+ * when we have no plugin output. */
+ if (output.empty())
+ exitcode = 128;
+#endif /* _MSC_VER */
+
+#ifndef _MSC_VER
+ } else if (WIFSIGNALED(status)) {
+ stringstream outputbuf;
+ outputbuf << "Process was terminated by signal " << WTERMSIG(status);
+ output = outputbuf.str();
+ exitcode = 128;
+ }
+#endif /* _MSC_VER */
+
+ m_ExitStatus = exitcode;
+ m_Output = output;
+
+ return false;
+}
+
+int Process::GetFD(void) const
+{
+ return fileno(m_FP);
+}
+
+long Process::GetExitStatus(void) const
+{
+ return m_ExitStatus;
+}
+
+string Process::GetOutput(void) const
+{
+ return m_Output;
+}
--- /dev/null
+#ifndef PROCESS_H
+#define PROCESS_H
+
+namespace icinga
+{
+
+class I2_BASE_API Process : public AsyncTask<Process>
+{
+public:
+ typedef shared_ptr<Process> Ptr;
+ typedef weak_ptr<Process> WeakPtr;
+
+ static const int MaxTasksPerThread = 128;
+
+ Process(const string& command);
+
+ long GetExitStatus(void) const;
+ string GetOutput(void) const;
+
+private:
+ static bool m_ThreadsCreated;
+
+ string m_Command;
+
+ FILE *m_FP;
+ stringstream m_OutputStream;
+ bool m_UsePopen;
+#ifndef _MSC_VER
+ void *m_PCloseArg;
+#endif /* _MSC_VER */
+
+ long m_ExitStatus;
+ string m_Output;
+
+ virtual void Run(void);
+
+ static boost::mutex m_Mutex;
+ static deque<Process::Ptr> m_Tasks;
+ static condition_variable m_TasksCV;
+
+ static void WorkerThreadProc(void);
+
+ bool InitTask(void);
+ bool RunTask(void);
+
+ int GetFD(void) const;
+};
+
+}
+
+#endif /* PROCESS_H */
using namespace icinga;
map<string, CheckTaskType> CheckTask::m_Types;
-vector<CheckTask::Ptr> CheckTask::m_FinishedTasks;
-mutex CheckTask::m_FinishedTasksMutex;
CheckTask::CheckTask(const Service& service)
: m_Service(service)
return m_Result;
}
-void CheckTask::RegisterType(string type, Factory factory, QueueFlusher qflusher)
+void CheckTask::RegisterType(string type, Factory factory)
{
CheckTaskType ctt;
ctt.Factory = factory;
- ctt.QueueFlusher = qflusher;
m_Types[type] = ctt;
}
return it->second.Factory(service);
}
-
-void CheckTask::Enqueue(const CheckTask::Ptr& task)
-{
- task->Enqueue();
-}
-
-void CheckTask::FlushQueue(void)
-{
- map<string, CheckTaskType>::iterator it;
- for (it = m_Types.begin(); it != m_Types.end(); it++)
- it->second.QueueFlusher();
-}
-
-vector<CheckTask::Ptr> CheckTask::GetFinishedTasks(void)
-{
- mutex::scoped_lock lock(m_FinishedTasksMutex);
-
- vector<CheckTask::Ptr> result = m_FinishedTasks;
- m_FinishedTasks.clear();
-
- return result;
-}
-
-void CheckTask::FinishTask(const CheckTask::Ptr& task)
-{
- mutex::scoped_lock lock(m_FinishedTasksMutex);
- m_FinishedTasks.push_back(task);
-}
-
struct CheckTaskType;
-class I2_CIB_API CheckTask : public Object
+class I2_CIB_API CheckTask : public AsyncTask<CheckTask>
{
public:
typedef shared_ptr<CheckTask> Ptr;
typedef weak_ptr<CheckTask> WeakPtr;
typedef function<CheckTask::Ptr(const Service&)> Factory;
- typedef function<void()> QueueFlusher;
Service& GetService(void);
CheckResult& GetResult(void);
- virtual void Enqueue(void) = 0;
-
- static void RegisterType(string type, Factory factory, QueueFlusher qflusher);
+ static void RegisterType(string type, Factory factory);
static CheckTask::Ptr CreateTask(const Service& service);
- static void Enqueue(const CheckTask::Ptr& task);
- static void FlushQueue(void);
static int GetTaskHistogramSlots(void);
- static void FinishTask(const CheckTask::Ptr& task);
- static vector<CheckTask::Ptr> GetFinishedTasks(void);
protected:
CheckTask(const Service& service);
+ virtual void Run(void) = 0;
+
private:
Service m_Service;
CheckResult m_Result;
static map<string, CheckTaskType> m_Types;
-
- static vector<CheckTask::Ptr> m_FinishedTasks;
- static mutex m_FinishedTasksMutex;
};
struct CheckTaskType
{
CheckTask::Factory Factory;
- CheckTask::QueueFlusher QueueFlusher;
};
}
using namespace icinga;
-boost::mutex NagiosCheckTask::m_Mutex;
-vector<NagiosCheckTask::Ptr> NagiosCheckTask::m_PendingTasks;
-deque<NagiosCheckTask::Ptr> NagiosCheckTask::m_Tasks;
-condition_variable NagiosCheckTask::m_TasksCV;
-
NagiosCheckTask::NagiosCheckTask(const Service& service)
- : CheckTask(service), m_FP(NULL), m_UsePopen(false)
+ : CheckTask(service)
{
string checkCommand = service.GetCheckCommand();
macroDicts.push_back(service.GetMacros());
macroDicts.push_back(service.GetHost().GetMacros());
macroDicts.push_back(IcingaApplication::GetInstance()->GetMacros());
- m_Command = MacroProcessor::ResolveMacros(checkCommand, macroDicts);
+ string command = MacroProcessor::ResolveMacros(checkCommand, macroDicts);
+ m_Process = boost::make_shared<Process>(command);
}
-void NagiosCheckTask::Enqueue(void)
+void NagiosCheckTask::Run(void)
{
time_t now;
time(&now);
GetResult().SetScheduleStart(now);
- m_PendingTasks.push_back(GetSelf());
-}
-
-void NagiosCheckTask::FlushQueue(void)
-{
- {
- mutex::scoped_lock lock(m_Mutex);
- std::copy(m_PendingTasks.begin(), m_PendingTasks.end(), back_inserter(m_Tasks));
- m_PendingTasks.clear();
- m_TasksCV.notify_all();
- }
+ m_Process->OnTaskCompleted.connect(boost::bind(&NagiosCheckTask::ProcessFinishedHandler, static_cast<NagiosCheckTask::Ptr>(GetSelf())));
+ m_Process->Start();
}
-void NagiosCheckTask::CheckThreadProc(void)
+void NagiosCheckTask::ProcessFinishedHandler(void)
{
- mutex::scoped_lock lock(m_Mutex);
-
- map<int, NagiosCheckTask::Ptr> tasks;
-
- for (;;) {
- while (m_Tasks.empty() || tasks.size() >= MaxChecksPerThread) {
- lock.unlock();
-
- map<int, NagiosCheckTask::Ptr>::iterator it, prev;
-
-#ifndef _MSC_VER
- fd_set readfds;
- int nfds = 0;
-
- FD_ZERO(&readfds);
-
- for (it = tasks.begin(); it != tasks.end(); it++) {
- if (it->first > nfds)
- nfds = it->first;
-
- FD_SET(it->first, &readfds);
- }
-
- timeval tv;
- tv.tv_sec = 1;
- tv.tv_usec = 0;
- select(nfds + 1, &readfds, NULL, NULL, &tv);
-#else /* _MSC_VER */
- Sleep(1000);
-#endif /* _MSC_VER */
-
- for (it = tasks.begin(); it != tasks.end(); ) {
- int fd = it->first;
- NagiosCheckTask::Ptr task = it->second;
-
-#ifndef _MSC_VER
- if (!FD_ISSET(fd, &readfds)) {
- it++;
- continue;
- }
-#endif /* _MSC_VER */
-
- if (!task->RunTask()) {
- time_t now;
- time(&now);
- task->GetResult().SetScheduleEnd(now);
-
- CheckTask::FinishTask(task);
- prev = it;
- it++;
- tasks.erase(prev);
- } else {
- it++;
- }
- }
-
- lock.lock();
- }
-
- while (!m_Tasks.empty() && tasks.size() < MaxChecksPerThread) {
- NagiosCheckTask::Ptr task = m_Tasks.front();
- m_Tasks.pop_front();
- if (!task->InitTask()) {
- time_t now;
- time(&now);
- task->GetResult().SetScheduleEnd(now);
+ string output = m_Process->GetOutput();
+ boost::algorithm::trim(output);
+ ProcessCheckOutput(output);
- CheckTask::FinishTask(task);
- } else {
- int fd = task->GetFD();
- if (fd >= 0)
- tasks[fd] = task;
- }
- }
+ long exitcode = m_Process->GetExitStatus();
+
+ ServiceState state;
+
+ switch (exitcode) {
+ case 0:
+ state = StateOK;
+ break;
+ case 1:
+ state = StateWarning;
+ break;
+ case 2:
+ state = StateCritical;
+ break;
+ default:
+ state = StateUnknown;
+ break;
}
-}
-bool NagiosCheckTask::InitTask(void)
-{
+ GetResult().SetState(state);
+
time_t now;
time(&now);
- GetResult().SetExecutionStart(now);
-
-#ifdef _MSC_VER
- m_FP = _popen(m_Command.c_str(), "r");
-#else /* _MSC_VER */
- if (!m_UsePopen) {
- m_PCloseArg = new popen_noshell_pass_to_pclose;
-
- m_FP = popen_noshell_compat(m_Command.c_str(), "r", (popen_noshell_pass_to_pclose *)m_PCloseArg);
-
- if (m_FP == NULL) // TODO: add check for valgrind
- m_UsePopen = true;
- }
-
- if (m_UsePopen)
- m_FP = popen(m_Command.c_str(), "r");
-#endif /* _MSC_VER */
-
- if (m_FP == NULL) {
- time_t now;
- time(&now);
- GetResult().SetExecutionEnd(now);
-
- return false;
- }
+ GetResult().SetExecutionEnd(now);
- return true;
+ Finish();
}
void NagiosCheckTask::ProcessCheckOutput(const string& output)
GetResult().SetPerformanceDataRaw(perfdata);
}
-bool NagiosCheckTask::RunTask(void)
-{
- char buffer[512];
- size_t read = fread(buffer, 1, sizeof(buffer), m_FP);
-
- if (read > 0)
- m_OutputStream.write(buffer, read);
-
- if (!feof(m_FP))
- return true;
-
- string output = m_OutputStream.str();
- boost::algorithm::trim(output);
- ProcessCheckOutput(output);
-
- int status, exitcode;
-#ifdef _MSC_VER
- status = _pclose(m_FP);
-#else /* _MSC_VER */
- if (m_UsePopen) {
- status = pclose(m_FP);
- } else {
- status = pclose_noshell((popen_noshell_pass_to_pclose *)m_PCloseArg);
- delete (popen_noshell_pass_to_pclose *)m_PCloseArg;
- }
-#endif /* _MSC_VER */
-
-#ifndef _MSC_VER
- if (WIFEXITED(status)) {
- exitcode = WEXITSTATUS(status);
-#else /* _MSC_VER */
- exitcode = status;
-
- /* cmd.exe returns error code 1 (warning) when the plugin
- * could not be executed - change the exit status to "unknown"
- * when we have no plugin output. */
- if (output.empty())
- exitcode = 128;
-#endif /* _MSC_VER */
-
- ServiceState state;
-
- switch (exitcode) {
- case 0:
- state = StateOK;
- break;
- case 1:
- state = StateWarning;
- break;
- case 2:
- state = StateCritical;
- break;
- default:
- state = StateUnknown;
- break;
- }
-
- GetResult().SetState(state);
-#ifndef _MSC_VER
- } else if (WIFSIGNALED(status)) {
- stringstream outputbuf;
- outputbuf << "Process was terminated by signal " << WTERMSIG(status);
- GetResult().SetOutput(outputbuf.str());
- GetResult().SetState(StateUnknown);
- }
-#endif /* _MSC_VER */
-
- time_t now;
- time(&now);
- GetResult().SetExecutionEnd(now);
-
- return false;
-}
-
-int NagiosCheckTask::GetFD(void) const
-{
- return fileno(m_FP);
-}
-
CheckTask::Ptr NagiosCheckTask::CreateTask(const Service& service)
{
return boost::make_shared<NagiosCheckTask>(service);
void NagiosCheckTask::Register(void)
{
- CheckTask::RegisterType("nagios", NagiosCheckTask::CreateTask, NagiosCheckTask::FlushQueue);
-
- int numThreads = boost::thread::hardware_concurrency();
-
- if (numThreads < 4)
- numThreads = 4;
-
- for (int i = 0; i < numThreads; i++) {
- thread t(&NagiosCheckTask::CheckThreadProc);
- t.detach();
- }
+ CheckTask::RegisterType("nagios", NagiosCheckTask::CreateTask);
}
typedef shared_ptr<NagiosCheckTask> Ptr;
typedef weak_ptr<NagiosCheckTask> WeakPtr;
- static const int MaxChecksPerThread = 128;
-
NagiosCheckTask(const Service& service);
- virtual void Enqueue(void);
-
static CheckTask::Ptr CreateTask(const Service& service);
- static void FlushQueue(void);
static void Register(void);
private:
- string m_Command;
-
- FILE *m_FP;
- stringstream m_OutputStream;
- bool m_UsePopen;
-#ifndef _MSC_VER
- void *m_PCloseArg;
-#endif /* _MSC_VER */
-
- static boost::mutex m_Mutex;
- static deque<NagiosCheckTask::Ptr> m_Tasks;
- static vector<NagiosCheckTask::Ptr> m_PendingTasks;
- static condition_variable m_TasksCV;
-
- static void CheckThreadProc(void);
+ Process::Ptr m_Process;
- bool InitTask(void);
+ virtual void Run(void);
+ void ProcessFinishedHandler(void);
void ProcessCheckOutput(const string& output);
- bool RunTask(void);
- int GetFD(void) const;
};
}
m_PendingServices.insert(service.GetConfigObject());
CheckTask::Ptr task = CheckTask::CreateTask(service);
- task->Enqueue();
+ task->OnTaskCompleted.connect(boost::bind(&CheckerComponent::CheckCompletedHandler, this, _1));
+ task->Start();
tasks++;
}
Logger::Write(LogDebug, "checker", "CheckTimerHandler: past loop.");
- CheckTask::FlushQueue();
-
stringstream msgbuf;
msgbuf << "CheckTimerHandler: created " << tasks << " tasks";
Logger::Write(LogInformation, "checker", msgbuf.str());
}
-void CheckerComponent::ResultTimerHandler(void)
+void CheckerComponent::CheckCompletedHandler(const CheckTask::Ptr& task)
{
- Logger::Write(LogDebug, "checker", "ResultTimerHandler entered.");
-
- time_t now;
- time(&now);
-
- long min_latency = -1, max_latency = 0, avg_latency = 0, results = 0, failed = 0;
-
- vector<CheckTask::Ptr> finishedTasks = CheckTask::GetFinishedTasks();
-
- for (vector<CheckTask::Ptr>::iterator it = finishedTasks.begin(); it != finishedTasks.end(); it++) {
- CheckTask::Ptr task = *it;
-
- Service service = task->GetService();
-
- /* if the service isn't in the set of pending services
- * it was removed and we need to ignore this check result. */
- if (m_PendingServices.find(service.GetConfigObject()) == m_PendingServices.end())
- continue;
-
- CheckResult result = task->GetResult();
- Logger::Write(LogDebug, "checker", "Got result for service '" + service.GetName() + "'");
-
- long execution_time = result.GetExecutionEnd() - result.GetExecutionStart();
- long latency = (result.GetScheduleEnd() - result.GetScheduleStart()) - execution_time;
- avg_latency += latency;
-
- if (min_latency == -1 || latency < min_latency)
- min_latency = latency;
+ Service service = task->GetService();
- if (latency > max_latency)
- max_latency = latency;
-
- results++;
+ /* if the service isn't in the set of pending services
+ * it was removed and we need to ignore this check result. */
+ if (m_PendingServices.find(service.GetConfigObject()) == m_PendingServices.end())
+ return;
- if (result.GetState() != StateOK)
- failed++;
+ CheckResult result = task->GetResult();
+ Logger::Write(LogDebug, "checker", "Got result for service '" + service.GetName() + "'");
- /* update service state */
- service.ApplyCheckResult(result);
+ long execution_time = result.GetExecutionEnd() - result.GetExecutionStart();
+ long latency = (result.GetScheduleEnd() - result.GetScheduleStart()) - execution_time;
- /* figure out when the next check is for this service */
- service.UpdateNextCheck();
+ /* update service state */
+ service.ApplyCheckResult(result);
- /* remove the service from the list of pending services */
- m_PendingServices.erase(service.GetConfigObject());
- m_Services.push(service);
+ /* figure out when the next check is for this service */
+ service.UpdateNextCheck();
- RequestMessage rm;
- rm.SetMethod("checker::CheckResult");
+ /* remove the service from the list of pending services */
+ m_PendingServices.erase(service.GetConfigObject());
+ m_Services.push(service);
- ServiceStatusMessage params;
- params.SetService(service.GetName());
- params.SetState(service.GetState());
- params.SetStateType(service.GetStateType());
- params.SetCurrentCheckAttempt(service.GetCurrentCheckAttempt());
- params.SetNextCheck(service.GetNextCheck());
- params.SetCheckResult(result);
+ RequestMessage rm;
+ rm.SetMethod("checker::CheckResult");
- rm.SetParams(params);
+ ServiceStatusMessage params;
+ params.SetService(service.GetName());
+ params.SetState(service.GetState());
+ params.SetStateType(service.GetStateType());
+ params.SetCurrentCheckAttempt(service.GetCurrentCheckAttempt());
+ params.SetNextCheck(service.GetNextCheck());
+ params.SetCheckResult(result);
- EndpointManager::GetInstance()->SendMulticastMessage(m_Endpoint, rm);
- }
+ rm.SetParams(params);
- if (min_latency > 5) {
- stringstream latwarn;
- latwarn << "We can't keep up with the checks: minimum latency is " << min_latency << " seconds";
- Logger::Write(LogWarning, "checker", latwarn.str());
- }
+ EndpointManager::GetInstance()->SendMulticastMessage(m_Endpoint, rm);
+}
- {
- stringstream msgbuf;
- msgbuf << "ResultTimerHandler: " << results << " results (" << failed << " failed); latency: avg=" << avg_latency / (results ? results : 1) << ", min=" << min_latency << ", max: " << max_latency;
- Logger::Write(LogInformation, "checker", msgbuf.str());
- }
+void CheckerComponent::ResultTimerHandler(void)
+{
+ Logger::Write(LogDebug, "checker", "ResultTimerHandler entered.");
- {
- stringstream msgbuf;
- msgbuf << "Pending services: " << m_PendingServices.size() << "; Idle services: " << m_Services.size();
- Logger::Write(LogInformation, "checker", msgbuf.str());
- }
+ stringstream msgbuf;
+ msgbuf << "Pending services: " << m_PendingServices.size() << "; Idle services: " << m_Services.size();
+ Logger::Write(LogInformation, "checker", msgbuf.str());
}
void CheckerComponent::AssignServiceRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request)
void CheckTimerHandler(void);
void ResultTimerHandler(void);
+ void CheckCompletedHandler(const CheckTask::Ptr& task);
+
void AdjustCheckTimer(void);
void AssignServiceRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request);
local object application "icinga" {
- cert = "icinga-c1.pem",
+/* cert = "icinga-c1.pem",
ca = "ca.crt",
node = "192.168.2.235",
- service = 7777
+ service = 7777*/
}
-local object component "discovery" {
+/*local object component "discovery" {
-}
+}*/
local object component "checker" {
}
-local object endpoint "icinga-c2" {
+/*local object endpoint "icinga-c2" {
roles = { "all" }
}
local object role "all" {
publications = { "*" },
subscriptions = { "*" }
-}
+}*/
object host "localhost" {