using boost::thread;
using boost::thread_group;
using boost::mutex;
-using boost::unique_lock;
using boost::condition_variable;
#if defined(__APPLE__) && defined(__MACH__)
ThreadPool::~ThreadPool(void)
{
{
- unique_lock<mutex> lock(m_Lock);
+ mutex::scoped_lock lock(m_Lock);
m_Tasks.clear();
m_Threads.join_all();
}
-void ThreadPool::EnqueueTasks(const vector<Task>& tasks)
+void ThreadPool::EnqueueTasks(list<ThreadPoolTask::Ptr>& tasks)
{
- unique_lock<mutex> lock(m_Lock);
+ {
+ mutex::scoped_lock lock(m_Lock);
+ m_Tasks.splice(m_Tasks.end(), tasks, tasks.begin(), tasks.end());
+ }
- std::copy(tasks.begin(), tasks.end(), std::back_inserter(m_Tasks));
m_CV.notify_all();
}
-void ThreadPool::EnqueueTask(Task task)
+void ThreadPool::EnqueueTask(const ThreadPoolTask::Ptr& task)
{
- unique_lock<mutex> lock(m_Lock);
- m_Tasks.push_back(task);
+ {
+ mutex::scoped_lock lock(m_Lock);
+ m_Tasks.push_back(task);
+ }
+
m_CV.notify_one();
}
+
+ThreadPoolTask::Ptr ThreadPool::DequeueTask(void)
+{
+ mutex::scoped_lock lock(m_Lock);
+
+ while (m_Tasks.empty()) {
+ if (!m_Alive)
+ return ThreadPoolTask::Ptr();
+
+ m_CV.wait(lock);
+ }
+
+ ThreadPoolTask::Ptr task = m_Tasks.front();
+ m_Tasks.pop_front();
+
+ return task;
+}
+
void ThreadPool::WaitForTasks(void)
{
- unique_lock<mutex> lock(m_Lock);
+ mutex::scoped_lock lock(m_Lock);
/* wait for all pending tasks */
while (!m_Tasks.empty())
void ThreadPool::WorkerThreadProc(void)
{
while (true) {
- Task task;
+ ThreadPoolTask::Ptr task = DequeueTask();
- {
- unique_lock<mutex> lock(m_Lock);
+ if (!task)
+ break;
- while (m_Tasks.empty()) {
- if (!m_Alive)
- return;
-
- m_CV.wait(lock);
- }
-
- task = m_Tasks.front();
- m_Tasks.pop_front();
- }
-
- task();
+ task->Execute();
}
}
return threadPool;
}
+
namespace icinga
{
+struct ThreadPoolTask
+{
+ typedef shared_ptr<ThreadPoolTask> Ptr;
+ typedef weak_ptr<ThreadPoolTask> WeakPtr;
+
+ virtual void Execute(void) = 0;
+};
+
class I2_BASE_API ThreadPool : public Object
{
public:
typedef shared_ptr<ThreadPool> Ptr;
typedef weak_ptr<ThreadPool> WeakPtr;
- typedef function<void()> Task;
-
- ThreadPool(long numThreads = 16);
+ ThreadPool(long numThreads = 128);
~ThreadPool(void);
static ThreadPool::Ptr GetDefaultPool(void);
- void EnqueueTasks(const vector<Task>& tasks);
- void EnqueueTask(Task task);
+ void EnqueueTasks(list<ThreadPoolTask::Ptr>& tasks);
+ void EnqueueTask(const ThreadPoolTask::Ptr& task);
void WaitForTasks(void);
private:
- mutex m_Lock;
+ mutable mutex m_Lock;
condition_variable m_CV;
- deque<Task> m_Tasks;
+ list<ThreadPoolTask::Ptr> m_Tasks;
thread_group m_Threads;
bool m_Alive;
+ ThreadPoolTask::Ptr DequeueTask(void);
void WorkerThreadProc(void);
};
if (timer->m_Next <= now) {
timer->Call();
- timer->Reschedule(now + timer->GetInterval());
+ timer->Reschedule(time(NULL) + timer->GetInterval());
}
}
}
GetEndpointManager()->RegisterEndpoint(m_CheckerEndpoint);
m_CheckTimer = boost::make_shared<Timer>();
- m_CheckTimer->SetInterval(10);
+ m_CheckTimer->SetInterval(5);
m_CheckTimer->OnTimerExpired.connect(boost::bind(&CheckerComponent::CheckTimerHandler, this));
m_CheckTimer->Start();
CheckTask::FlushQueue();
- AdjustCheckTimer();
-
stringstream msgbuf;
msgbuf << "CheckTimerHandler: created " << tasks << " tasks";
Application::Log(LogDebug, "checker", msgbuf.str());
time_t now;
time(&now);
- long results = 0;
+ long latency = 0, results = 0;
vector<CheckTask::Ptr> finishedTasks = CheckTask::GetFinishedTasks();
CheckResult result = task->GetResult();
// Application::Log(LogInformation, "checker", "Got result! Plugin output: " + result.Output);
+ latency += result.EndTime - result.StartTime;
results++;
service.SetNextCheck(now + service.GetCheckInterval());
}
stringstream msgbuf;
- msgbuf << "ResultTimerHandler: " << results << " results";
+ msgbuf << "ResultTimerHandler: " << results << " results; avg. latency: " << latency / (results ? results : 1);
Application::Log(LogDebug, "checker", msgbuf.str());
-
- AdjustCheckTimer();
-}
-
-void CheckerComponent::AdjustCheckTimer(void)
-{
- if (m_Services.empty())
- return;
-
- /* adjust next call time for the check timer */
- Service service = m_Services.top();
- m_CheckTimer->Reschedule(service.GetNextCheck());
}
void CheckerComponent::AssignServiceRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request)
AssignService(service);
}
+
+ m_DelegationTimer->Stop();
}
EXPORT_COMPONENT(delegation, DelegationComponent);
using namespace icinga;
-vector<ThreadPool::Task> NagiosCheckTask::m_QueuedTasks;
+list<ThreadPoolTask::Ptr> NagiosCheckTask::m_QueuedTasks;
boost::mutex NagiosCheckTask::m_FinishedTasksMutex;
vector<CheckTask::Ptr> NagiosCheckTask::m_FinishedTasks;
void NagiosCheckTask::Enqueue(void)
{
- m_QueuedTasks.push_back(bind(&NagiosCheckTask::Execute, static_cast<NagiosCheckTask::Ptr>(GetSelf())));
+ time(&m_Result.StartTime);
+ m_QueuedTasks.push_back(static_pointer_cast<ThreadPoolTask>(static_cast<NagiosCheckTask::Ptr>(GetSelf())));
+// m_QueuedTasks.push_back(new ThreadPool:Task(bind(&NagiosCheckTask::Execute, static_cast<NagiosCheckTask::Ptr>(GetSelf()))));
}
void NagiosCheckTask::FlushQueue(void)
void NagiosCheckTask::GetFinishedTasks(vector<CheckTask::Ptr>& tasks)
{
- unique_lock<mutex> lock(m_FinishedTasksMutex);
+ mutex::scoped_lock lock(m_FinishedTasksMutex);
std::copy(m_FinishedTasks.begin(), m_FinishedTasks.end(), back_inserter(tasks));
m_FinishedTasks.clear();
}
void NagiosCheckTask::Execute(void)
{
- m_Result = RunCheck();
+ RunCheck();
{
- unique_lock<mutex> lock(m_FinishedTasksMutex);
+ mutex::scoped_lock lock(m_FinishedTasksMutex);
m_FinishedTasks.push_back(GetSelf());
}
}
-CheckResult NagiosCheckTask::RunCheck(void) const
+void NagiosCheckTask::RunCheck(void)
{
- CheckResult cr;
FILE *fp;
- time(&cr.StartTime);
-
#ifdef _MSC_VER
fp = _popen(m_Command.c_str(), "r");
#else /* _MSC_VER */
fp = popen(m_Command.c_str(), "r");
#endif /* _MSC_VER */
- stringstream outputbuf;
+// ostringstream outputbuf;
while (!feof(fp)) {
char buffer[128];
if (read == 0)
break;
- outputbuf << string(buffer, buffer + read);
+// outputbuf.write(buffer, read);
}
- cr.Output = outputbuf.str();
- boost::algorithm::trim(cr.Output);
+// m_Result.Output = outputbuf.str();
+// boost::algorithm::trim(m_Result.Output);
int status, exitcode;
#ifdef _MSC_VER
switch (exitcode) {
case 0:
- cr.State = StateOK;
+ m_Result.State = StateOK;
break;
case 1:
- cr.State = StateWarning;
+ m_Result.State = StateWarning;
break;
case 2:
- cr.State = StateCritical;
+ m_Result.State = StateCritical;
break;
default:
- cr.State = StateUnknown;
+ m_Result.State = StateUnknown;
break;
}
#ifndef _MSC_VER
} else if (WIFSIGNALED(status)) {
- cr.Output = "Process was terminated by signal " + WTERMSIG(status);
- cr.State = StateUnknown;
+ m_Result.Output = "Process was terminated by signal " + WTERMSIG(status);
+ m_Result.State = StateUnknown;
}
#endif /* _MSC_VER */
- time(&cr.EndTime);
-
- return cr;
+ time(&m_Result.EndTime);
}
CheckTask::Ptr NagiosCheckTask::CreateTask(const Service& service)
namespace icinga
{
-class I2_ICINGA_API NagiosCheckTask : public CheckTask
+class I2_ICINGA_API NagiosCheckTask : public CheckTask, public ThreadPoolTask
{
public:
typedef shared_ptr<NagiosCheckTask> Ptr;
string m_Command;
CheckResult m_Result;
- static vector<ThreadPool::Task> m_QueuedTasks;
+ static list<ThreadPoolTask::Ptr> m_QueuedTasks;
static boost::mutex m_FinishedTasksMutex;
static vector<CheckTask::Ptr> m_FinishedTasks;
- void Execute(void);
- CheckResult RunCheck(void) const;
+ virtual void Execute(void);
+ void RunCheck(void);
};
}
void Service::SetNextCheck(time_t nextCheck)
{
- GetConfigObject()->SetTag("next_check", static_cast<long>(nextCheck));
+ m_NextCheck = nextCheck;
}
time_t Service::GetNextCheck(void)
{
- long value = -1;
- GetConfigObject()->GetTag("next_check", &value);
+ if (m_NextCheck == -1)
+ m_NextCheck = time(NULL) + rand() % GetCheckInterval();
- if (value == -1) {
- value = time(NULL) + rand() % GetCheckInterval();
- SetNextCheck(value);
- }
-
- return value;
+ return m_NextCheck;
}
void Service::SetChecker(string checker)
{
- GetConfigObject()->SetTag("checker", checker);
+ GetConfigObject()->SetProperty("checker", checker);
}
string Service::GetChecker(void) const
{
string value;
- GetConfigObject()->GetTag("checker", &value);
+ GetConfigObject()->GetProperty("checker", &value);
return value;
}
-void Service::SetPendingCheck(bool pending)
-{
- GetConfigObject()->SetTag("pendingCheck", pending);
-}
-
-bool Service::HasPendingCheck(void) const
-{
- bool value = false;
- GetConfigObject()->GetTag("pendingCheck", &value);
- return value;
-}
\ No newline at end of file
{
public:
Service(const ConfigObject::Ptr& configObject)
- : ConfigObjectAdapter(configObject)
+ : ConfigObjectAdapter(configObject), m_NextCheck(-1)
{ }
string GetDisplayName(void) const;
time_t GetNextCheck(void);
void SetChecker(string checker);
string GetChecker(void) const;
- void SetPendingCheck(bool pending);
- bool HasPendingCheck(void) const;
+
+private:
+ time_t m_NextCheck;
};
}