typedef shared_ptr<AsyncTask<T> > Ptr;
typedef weak_ptr<AsyncTask<T> > WeakPtr;
- AsyncTask(void)
- : m_Finished(false)
+ typedef function<void (const shared_ptr<T>&)> CompletionCallback;
+
+ AsyncTask(const CompletionCallback& completionCallback)
+ : m_Finished(false), m_CompletionCallback(completionCallback)
{ }
~AsyncTask(void)
Run();
}
- boost::signal<void (const shared_ptr<T>&)> OnTaskCompleted;
-
protected:
virtual void Run(void) = 0;
void Finish(void)
{
- Event::Post(boost::bind(boost::cref(OnTaskCompleted), static_cast<shared_ptr<T> >(GetSelf())));
+ Event::Post(boost::bind(&T::ForwardCallback, static_cast<shared_ptr<T> >(GetSelf())));
+ }
+
+private:
+ void ForwardCallback(void)
+ {
+ m_CompletionCallback(GetSelf());
+ m_CompletionCallback = CompletionCallback();
+ m_Finished = true;
}
bool m_Finished;
+ CompletionCallback m_CompletionCallback;
};
}
{
return GetObjectsByType()->GetRange(type);
}
+
+void ConfigObject::RemoveTag(const string& key)
+{
+ GetTags()->Remove(key);
+}
return GetTags()->Get(key, value);
}
+ void RemoveTag(const string& key);
+
string GetType(void) const;
string GetName(void) const;
using namespace icinga;
-bool Process::m_ThreadsCreated = false;
+bool Process::m_ThreadCreated = false;
boost::mutex Process::m_Mutex;
deque<Process::Ptr> Process::m_Tasks;
condition_variable Process::m_TasksCV;
-Process::Process(const string& command)
- : m_Command(command)
+Process::Process(const string& command, const CompletionCallback& completionCallback)
+ : AsyncTask<Process>(completionCallback), m_Command(command), m_UsePopen(false)
{
- if (!m_ThreadsCreated) {
- int numThreads = boost::thread::hardware_concurrency();
+ if (!m_ThreadCreated) {
+ thread t(&Process::WorkerThreadProc);
+ t.detach();
- if (numThreads < 4)
- numThreads = 4;
-
- for (int i = 0; i < numThreads; i++) {
- thread t(&Process::WorkerThreadProc);
- t.detach();
- }
-
- m_ThreadsCreated = true;
+ m_ThreadCreated = true;
}
}
{
mutex::scoped_lock lock(m_Mutex);
m_Tasks.push_back(GetSelf());
- m_TasksCV.notify_one();
+ m_TasksCV.notify_all();
}
void Process::WorkerThreadProc(void)
while (!m_Tasks.empty() && tasks.size() < MaxTasksPerThread) {
Process::Ptr task = m_Tasks.front();
m_Tasks.pop_front();
+
+ lock.unlock();
+
if (!task->InitTask()) {
task->Finish();
} else {
if (fd >= 0)
tasks[fd] = task;
}
+
+ lock.lock();
}
}
}
bool Process::InitTask(void)
{
+ time(&m_ExecutionStart);
+
#ifdef _MSC_VER
m_FP = _popen(m_Command.c_str(), "r");
#else /* _MSC_VER */
#endif /* _MSC_VER */
if (m_FP == NULL) {
+ m_ExitStatus = 128;
+ m_ExecutionEnd = m_ExecutionStart;
return false;
}
}
#endif /* _MSC_VER */
+ time(&m_ExecutionEnd);
+
#ifndef _MSC_VER
if (WIFEXITED(status)) {
exitcode = WEXITSTATUS(status);
{
return m_Output;
}
+
+time_t Process::GetExecutionStart(void) const
+{
+ return m_ExecutionStart;
+}
+
+time_t Process::GetExecutionEnd(void) const
+{
+ return m_ExecutionEnd;
+}
+
static const int MaxTasksPerThread = 128;
- Process(const string& command);
+ Process(const string& command, const CompletionCallback& completionCallback);
+
+ time_t GetExecutionStart(void) const;
+ time_t GetExecutionEnd(void) const;
long GetExitStatus(void) const;
string GetOutput(void) const;
private:
- static bool m_ThreadsCreated;
+ static bool m_ThreadCreated;
string m_Command;
void *m_PCloseArg;
#endif /* _MSC_VER */
+ time_t m_ExecutionStart;
+ time_t m_ExecutionEnd;
long m_ExitStatus;
string m_Output;
map<string, CheckTaskType> CheckTask::m_Types;
-CheckTask::CheckTask(const Service& service)
- : m_Service(service)
+CheckTask::CheckTask(const Service& service, const CompletionCallback& completionCallback)
+ : AsyncTask<CheckTask>(completionCallback), m_Service(service)
{ }
Service& CheckTask::GetService(void)
m_Types[type] = ctt;
}
-CheckTask::Ptr CheckTask::CreateTask(const Service& service)
+CheckTask::Ptr CheckTask::CreateTask(const Service& service, const CompletionCallback& completionCallback)
{
map<string, CheckTaskType>::iterator it;
if (it == m_Types.end())
throw runtime_error("Invalid check type specified for service '" + service.GetName() + "'");
- return it->second.Factory(service);
+ return it->second.Factory(service, completionCallback);
}
typedef shared_ptr<CheckTask> Ptr;
typedef weak_ptr<CheckTask> WeakPtr;
- typedef function<CheckTask::Ptr(const Service&)> Factory;
+ typedef function<CheckTask::Ptr(const Service&, const CompletionCallback&)> Factory;
Service& GetService(void);
CheckResult& GetResult(void);
static void RegisterType(string type, Factory factory);
- static CheckTask::Ptr CreateTask(const Service& service);
+ static CheckTask::Ptr CreateTask(const Service& service, const CompletionCallback& completionCallback);
static int GetTaskHistogramSlots(void);
protected:
- CheckTask(const Service& service);
+ CheckTask(const Service& service, const CompletionCallback& completionCallback);
virtual void Run(void) = 0;
{
return m_ConfigObject;
}
+
+void ConfigObjectAdapter::RemoveTag(const string& key)
+{
+ m_ConfigObject->RemoveTag(key);
+}
return GetConfigObject()->GetTag(key, value);
}
+ void RemoveTag(const string& key);
+
private:
ConfigObject::Ptr m_ConfigObject;
};
using namespace icinga;
-NagiosCheckTask::NagiosCheckTask(const Service& service)
- : CheckTask(service)
+NagiosCheckTask::NagiosCheckTask(const Service& service, const CompletionCallback& completionCallback)
+ : CheckTask(service, completionCallback)
{
string checkCommand = service.GetCheckCommand();
macroDicts.push_back(service.GetMacros());
macroDicts.push_back(service.GetHost().GetMacros());
macroDicts.push_back(IcingaApplication::GetInstance()->GetMacros());
- string command = MacroProcessor::ResolveMacros(checkCommand, macroDicts);
- m_Process = boost::make_shared<Process>(command);
+ m_Command = MacroProcessor::ResolveMacros(checkCommand, macroDicts);
}
void NagiosCheckTask::Run(void)
time(&now);
GetResult().SetScheduleStart(now);
- m_Process->OnTaskCompleted.connect(boost::bind(&NagiosCheckTask::ProcessFinishedHandler, static_cast<NagiosCheckTask::Ptr>(GetSelf())));
+ m_Process = boost::make_shared<Process>(m_Command, boost::bind(&NagiosCheckTask::ProcessFinishedHandler, static_cast<NagiosCheckTask::Ptr>(GetSelf())));
m_Process->Start();
}
void NagiosCheckTask::ProcessFinishedHandler(void)
{
- time_t now;
- time(&now);
- GetResult().SetExecutionEnd(now);
+ GetResult().SetExecutionStart(m_Process->GetExecutionStart());
+ GetResult().SetExecutionEnd(m_Process->GetExecutionEnd());
string output = m_Process->GetOutput();
+ long exitcode = m_Process->GetExitStatus();
+ m_Process.reset();
+
boost::algorithm::trim(output);
ProcessCheckOutput(output);
- long exitcode = m_Process->GetExitStatus();
-
ServiceState state;
switch (exitcode) {
GetResult().SetState(state);
+ time_t now;
time(&now);
GetResult().SetScheduleEnd(now);
GetResult().SetPerformanceDataRaw(perfdata);
}
-CheckTask::Ptr NagiosCheckTask::CreateTask(const Service& service)
+CheckTask::Ptr NagiosCheckTask::CreateTask(const Service& service, const CompletionCallback& completionCallback)
{
- return boost::make_shared<NagiosCheckTask>(service);
+ return boost::make_shared<NagiosCheckTask>(service, completionCallback);
}
void NagiosCheckTask::Register(void)
typedef shared_ptr<NagiosCheckTask> Ptr;
typedef weak_ptr<NagiosCheckTask> WeakPtr;
- NagiosCheckTask(const Service& service);
+ NagiosCheckTask(const Service& service, const CompletionCallback& completionCallback);
- static CheckTask::Ptr CreateTask(const Service& service);
+ static CheckTask::Ptr CreateTask(const Service& service, const CompletionCallback& completionCallback);
static void Register(void);
private:
+ string m_Command;
Process::Ptr m_Process;
virtual void Run(void);
EndpointManager::GetInstance()->RegisterEndpoint(m_Endpoint);
m_CheckTimer = boost::make_shared<Timer>();
- m_CheckTimer->SetInterval(5);
+ m_CheckTimer->SetInterval(1);
m_CheckTimer->OnTimerExpired.connect(boost::bind(&CheckerComponent::CheckTimerHandler, this));
m_CheckTimer->Start();
m_PendingServices.insert(service.GetConfigObject());
- CheckTask::Ptr task = CheckTask::CreateTask(service);
- task->OnTaskCompleted.connect(boost::bind(&CheckerComponent::CheckCompletedHandler, this, _1));
+ CheckTask::Ptr task = CheckTask::CreateTask(service, boost::bind(&CheckerComponent::CheckCompletedHandler, this, _1));
task->Start();
+ service.SetTag("current_task", task);
+
tasks++;
}
{
Service service = task->GetService();
+ service.RemoveTag("current_task");
+
/* if the service isn't in the set of pending services
- * it was removed and we need to ignore this check result. */
+ * it was removed and we need to ignore this check result. */
if (m_PendingServices.find(service.GetConfigObject()) == m_PendingServices.end())
return;