ThreadPool::ThreadPool(long numThreads)
: m_Alive(true)
{
- for (long i = 0; i < numThreads; i++)
- m_Threads.create_thread(boost::bind(&ThreadPool::WorkerThreadProc, this));
+ for (long i = 0; i < numThreads; i++) {
+ thread *thr = m_Threads.create_thread(boost::bind(&ThreadPool::WorkerThreadProc, this));
+#ifdef _WIN32
+ HANDLE handle = thr->native_handle();
+ SetPriorityClass(handle, BELOW_NORMAL_PRIORITY_CLASS);
+#else /* _WIN32 */
+ pthread_t handle = thr->native_handle();
+
+ int policy;
+ sched_param param;
+
+ if (pthread_getschedparam(handle, &policy, ¶m) < 0)
+ throw PosixException("pthread_getschedparam failed", errno);
+
+ param.sched_priority = 0;
+
+ if (pthread_setschedparam(handle, SCHED_IDLE, ¶m) < 0)
+ throw PosixException("pthread_setschedparam failed", errno);
+#endif /* _WIN32 */
+ }
}
ThreadPool::~ThreadPool(void)
m_Threads.join_all();
}
+void ThreadPool::EnqueueTasks(const vector<Task>& tasks)
+{
+ unique_lock<mutex> lock(m_Lock);
+
+ std::copy(tasks.begin(), tasks.end(), std::back_inserter(m_Tasks));
+ m_CV.notify_all();
+}
+
void ThreadPool::EnqueueTask(Task task)
{
unique_lock<mutex> lock(m_Lock);
static ThreadPool::Ptr GetDefaultPool(void);
+ void EnqueueTasks(const vector<Task>& tasks);
void EnqueueTask(Task task);
void WaitForTasks(void);
*/
void Timer::Call(void)
{
+ time_t st;
+ time(&st);
+
OnTimerExpired(GetSelf());
+
+ time_t et;
+ time(&et);
+
+ if (et - st > 5) {
+ stringstream msgbuf;
+ msgbuf << "Timer call took " << et - st << " seconds.";
+ Application::Log(LogDebug, "base", msgbuf.str());
+ }
}
/**
m_CheckTimer->OnTimerExpired.connect(boost::bind(&CheckerComponent::CheckTimerHandler, this));
m_CheckTimer->Start();
- CheckTask::RegisterType("nagios", NagiosCheckTask::CreateTask);
+ CheckTask::RegisterType("nagios", NagiosCheckTask::CreateTask, NagiosCheckTask::FlushQueue);
m_ResultTimer = boost::make_shared<Timer>();
m_ResultTimer->SetInterval(5);
time_t now;
time(&now);
+ Application::Log(LogDebug, "checker", "CheckTimerHandler entered.");
+
+ long tasks = 0;
+
for (;;) {
if (m_Services.empty())
break;
m_Services.pop();
service.SetPendingCheck(true);
- Application::Log(LogInformation, "checker", "Executing service check for '" + service.GetName() + "'");
+// Application::Log(LogInformation, "checker", "Executing service check for '" + service.GetName() + "'");
CheckTask::Ptr task = CheckTask::CreateTask(service);
- task->Execute();
+ task->Enqueue();
m_PendingTasks.push_back(task);
service.SetNextCheck(now + service.GetCheckInterval());
+
+ tasks++;
}
+ Application::Log(LogDebug, "checker", "CheckTimerHandler: past loop.");
+
+ CheckTask::FlushQueue();
+
AdjustCheckTimer();
+
+ stringstream msgbuf;
+ msgbuf << "CheckTimerHandler: created " << tasks << " tasks";
+ Application::Log(LogDebug, "checker", msgbuf.str());
}
void CheckerComponent::ResultTimerHandler(void)
{
vector<CheckTask::Ptr> unfinishedTasks;
+ Application::Log(LogDebug, "checker", "ResultTimerHandler entered.");
+
+ long results = 0;
+
for (vector<CheckTask::Ptr>::iterator it = m_PendingTasks.begin(); it != m_PendingTasks.end(); it++) {
CheckTask::Ptr task = *it;
service.SetPendingCheck(false);
CheckResult result = task->GetResult();
- Application::Log(LogInformation, "checker", "Got result! Plugin output: " + result.Output);
+// Application::Log(LogInformation, "checker", "Got result! Plugin output: " + result.Output);
+
+ results++;
m_Services.push(service);
}
+ stringstream msgbuf;
+ msgbuf << "ResultTimerHandler: " << results << " results; " << unfinishedTasks.size() << " unfinished";
+ Application::Log(LogDebug, "checker", msgbuf.str());
+
m_PendingTasks = unfinishedTasks;
AdjustCheckTimer();
AC_PROG_YACC
AC_PROG_LIBTOOL
AX_CXX_GCC_ABI_DEMANGLE
+AX_PTHREAD
AX_BOOST_BASE
AX_BOOST_SIGNALS
AX_BOOST_THREAD
using namespace icinga;
-map<string, CheckTask::Factory> CheckTask::m_Types;
+map<string, CheckTaskType> CheckTask::m_Types;
CheckTask::CheckTask(const Service& service)
: m_Service(service)
return m_Service;
}
-void CheckTask::RegisterType(string type, Factory factory)
+void CheckTask::RegisterType(string type, Factory factory, QueueFlusher qflusher)
{
- m_Types[type] = factory;
+ CheckTaskType ctt;
+ ctt.Factory = factory;
+ ctt.QueueFlusher = qflusher;
+
+ m_Types[type] = ctt;
}
CheckTask::Ptr CheckTask::CreateTask(const Service& service)
{
- map<string, CheckTask::Factory>::iterator it;
+ map<string, CheckTaskType>::iterator it;
it = m_Types.find(service.GetCheckType());
if (it == m_Types.end())
throw runtime_error("Invalid check type specified for service '" + service.GetName() + "'");
- return it->second(service);
+ return it->second.Factory(service);
+}
+
+void CheckTask::Enqueue(const CheckTask::Ptr& task)
+{
+ task->Enqueue();
+}
+
+void CheckTask::FlushQueue(void)
+{
+ map<string, CheckTaskType>::iterator it;
+
+ for (it = m_Types.begin(); it != m_Types.end(); it++)
+ it->second.QueueFlusher();
}
Dictionary::Ptr PerformanceData;
};
+struct CheckTaskType;
+
class I2_ICINGA_API CheckTask : public Object
{
public:
typedef weak_ptr<CheckTask> WeakPtr;
typedef function<CheckTask::Ptr(const Service&)> Factory;
+ typedef function<void()> QueueFlusher;
Service GetService(void) const;
- virtual void Execute(void) = 0;
+ virtual void Enqueue(void) = 0;
virtual bool IsFinished(void) const = 0;
virtual CheckResult GetResult(void) = 0;
- static void RegisterType(string type, Factory factory);
+ static void RegisterType(string type, Factory factory, QueueFlusher qflusher);
static CheckTask::Ptr CreateTask(const Service& service);
+ static void Enqueue(const CheckTask::Ptr& task);
+ static void FlushQueue(void);
protected:
CheckTask(const Service& service);
private:
Service m_Service;
- static map<string, Factory> m_Types;
+ static map<string, CheckTaskType> m_Types;
+};
+
+struct CheckTaskType
+{
+ CheckTask::Factory Factory;
+ CheckTask::QueueFlusher QueueFlusher;
};
}
using namespace icinga;
+vector<ThreadPool::Task> NagiosCheckTask::m_QueuedTasks;
+
NagiosCheckTask::NagiosCheckTask(const Service& service)
: CheckTask(service)
{
m_Result = m_Task.get_future();
}
-void NagiosCheckTask::Execute(void)
+void NagiosCheckTask::Enqueue(void)
{
- Application::Log(LogDebug, "icinga", "Nagios check command: " + m_Command);
-
- ThreadPool::GetDefaultPool()->EnqueueTask(boost::bind(&NagiosCheckTask::InternalExecute, this));
+ m_QueuedTasks.push_back(bind(&NagiosCheckTask::Execute, this));
}
-void NagiosCheckTask::InternalExecute(void)
+void NagiosCheckTask::FlushQueue(void)
{
- m_Task();
+ ThreadPool::GetDefaultPool()->EnqueueTasks(m_QueuedTasks);
+ m_QueuedTasks.clear();
}
bool NagiosCheckTask::IsFinished(void) const
{
- return m_Result.is_ready();
+ return m_Result.has_value();
}
CheckResult NagiosCheckTask::GetResult(void)
return m_Result.get();
}
+void NagiosCheckTask::Execute(void)
+{
+ m_Task();
+}
+
CheckResult NagiosCheckTask::RunCheck(void) const
{
CheckResult cr;
public:
NagiosCheckTask(const Service& service);
- virtual void Execute(void);
+ virtual void Enqueue(void);
virtual bool IsFinished(void) const;
virtual CheckResult GetResult(void);
static CheckTask::Ptr CreateTask(const Service& service);
+ static void FlushQueue(void);
private:
string m_Command;
packaged_task<CheckResult> m_Task;
unique_future<CheckResult> m_Result;
- void InternalExecute(void);
+ static vector<ThreadPool::Task> m_QueuedTasks;
+
+ void Execute(void);
CheckResult RunCheck(void) const;
};