using namespace icinga;
map<string, CheckTaskType> CheckTask::m_Types;
+vector<CheckTask::Ptr> CheckTask::m_FinishedTasks;
+mutex CheckTask::m_FinishedTasksMutex;
CheckTask::CheckTask(const Service& service)
: m_Service(service)
return m_Service;
}
-void CheckTask::RegisterType(string type, Factory factory, QueueFlusher qflusher, FinishedTasksGetter qtasksgetter)
+void CheckTask::RegisterType(string type, Factory factory, QueueFlusher qflusher)
{
CheckTaskType ctt;
ctt.Factory = factory;
ctt.QueueFlusher = qflusher;
- ctt.FinishedTasksGetter = qtasksgetter;
m_Types[type] = ctt;
}
vector<CheckTask::Ptr> CheckTask::GetFinishedTasks(void)
{
- vector<CheckTask::Ptr> tasks;
+ mutex::scoped_lock lock(m_FinishedTasksMutex);
- map<string, CheckTaskType>::iterator it;
- for (it = m_Types.begin(); it != m_Types.end(); it++)
- it->second.FinishedTasksGetter(tasks);
+ vector<CheckTask::Ptr> result = m_FinishedTasks;
+ m_FinishedTasks.clear();
- return tasks;
+ return result;
}
+void CheckTask::FinishTask(const CheckTask::Ptr& task)
+{
+ mutex::scoped_lock lock(m_FinishedTasksMutex);
+ m_FinishedTasks.push_back(task);
+}
typedef function<CheckTask::Ptr(const Service&)> Factory;
typedef function<void()> QueueFlusher;
- typedef function<void (vector<CheckTask::Ptr>& tasks)> FinishedTasksGetter;
Service GetService(void) const;
virtual void Enqueue(void) = 0;
virtual CheckResult GetResult(void) = 0;
- static void RegisterType(string type, Factory factory, QueueFlusher qflusher, FinishedTasksGetter qtasksgetter);
+ static void RegisterType(string type, Factory factory, QueueFlusher qflusher);
static CheckTask::Ptr CreateTask(const Service& service);
static void Enqueue(const CheckTask::Ptr& task);
static void FlushQueue(void);
+
+ static void FinishTask(const CheckTask::Ptr& task);
static vector<CheckTask::Ptr> GetFinishedTasks(void);
protected:
Service m_Service;
static map<string, CheckTaskType> m_Types;
+
+ static vector<CheckTask::Ptr> m_FinishedTasks;
+ static mutex m_FinishedTasksMutex;
};
struct CheckTaskType
{
CheckTask::Factory Factory;
CheckTask::QueueFlusher QueueFlusher;
- CheckTask::FinishedTasksGetter FinishedTasksGetter;
};
}
using namespace icinga;
-list<ThreadPoolTask::Ptr> NagiosCheckTask::m_QueuedTasks;
-
-boost::mutex NagiosCheckTask::m_FinishedTasksMutex;
-vector<CheckTask::Ptr> NagiosCheckTask::m_FinishedTasks;
+boost::mutex NagiosCheckTask::m_Mutex;
+deque<NagiosCheckTask::Ptr> NagiosCheckTask::m_Tasks;
+condition_variable NagiosCheckTask::m_TasksCV;
NagiosCheckTask::NagiosCheckTask(const Service& service)
- : CheckTask(service)
+ : CheckTask(service), m_FP(NULL)
{
string checkCommand = service.GetCheckCommand();
m_Command = MacroProcessor::ResolveMacros(checkCommand, service.GetMacros()); // + " 2>&1";
void NagiosCheckTask::Enqueue(void)
{
time(&m_Result.StartTime);
- m_QueuedTasks.push_back(static_pointer_cast<ThreadPoolTask>(static_cast<NagiosCheckTask::Ptr>(GetSelf())));
-// m_QueuedTasks.push_back(new ThreadPool:Task(bind(&NagiosCheckTask::Execute, static_cast<NagiosCheckTask::Ptr>(GetSelf()))));
-}
-void NagiosCheckTask::FlushQueue(void)
-{
- ThreadPool::GetDefaultPool()->EnqueueTasks(m_QueuedTasks);
- m_QueuedTasks.clear();
+ {
+ mutex::scoped_lock lock(m_Mutex);
+ m_Tasks.push_back(GetSelf());
+ }
}
-void NagiosCheckTask::GetFinishedTasks(vector<CheckTask::Ptr>& tasks)
+void NagiosCheckTask::FlushQueue(void)
{
- mutex::scoped_lock lock(m_FinishedTasksMutex);
- std::copy(m_FinishedTasks.begin(), m_FinishedTasks.end(), back_inserter(tasks));
- m_FinishedTasks.clear();
+ m_TasksCV.notify_all();
}
CheckResult NagiosCheckTask::GetResult(void)
return m_Result;
}
-void NagiosCheckTask::Execute(void)
+void NagiosCheckTask::CheckThreadProc(void)
{
- RunCheck();
+ mutex::scoped_lock lock(m_Mutex);
- {
- mutex::scoped_lock lock(m_FinishedTasksMutex);
- m_FinishedTasks.push_back(GetSelf());
+ map<int, NagiosCheckTask::Ptr> tasks;
+ const int maxTasks = 16;
+
+ for (;;) {
+ while (m_Tasks.empty() || tasks.size() >= maxTasks) {
+ lock.unlock();
+
+ map<int, NagiosCheckTask::Ptr>::iterator it, prev;
+
+#ifndef _MSC_VER
+ fd_set readfds;
+ int nfds = 0;
+
+ FD_ZERO(&readfds);
+
+ for (it = tasks.begin(); it != tasks.end(); it++) {
+ if (it->first > nfds)
+ nfds = it->first;
+
+ FD_SET(it->first, &readfds);
+ }
+
+ timeval tv;
+ tv.tv_sec = 1;
+ tv.tv_usec = 0;
+ select(nfds + 1, &readfds, NULL, NULL, &tv);
+#else /* _MSC_VER */
+ Sleep(1000);
+#endif /* _MSC_VER */
+
+ for (it = tasks.begin(); it != tasks.end(); ) {
+#ifndef _MSC_VER
+ if (!FD_ISSET(it->first, &readfds)) {
+ it++;
+ continue;
+ }
+#endif /* _MSC_VER */
+
+ if (!it->second->RunTask()) {
+ CheckTask::FinishTask(it->second);
+ it = tasks.erase(it);
+ } else {
+ it++;
+ }
+ }
+
+ lock.lock();
+ }
+
+ while (!m_Tasks.empty() && tasks.size() < maxTasks) {
+ NagiosCheckTask::Ptr task = m_Tasks.front();
+ m_Tasks.pop_front();
+ if (!task->InitTask()) {
+ CheckTask::FinishTask(task);
+ } else {
+ int fd = task->GetFD();
+ if (fd >= 0)
+ tasks[fd] = task;
+ }
+ }
}
}
-void NagiosCheckTask::RunCheck(void)
+bool NagiosCheckTask::InitTask(void)
{
- FILE *fp;
-
#ifdef _MSC_VER
- fp = _popen(m_Command.c_str(), "r");
+ m_FP = _popen(m_Command.c_str(), "r");
#else /* _MSC_VER */
bool use_libc_popen = false;
popen_noshell_pass_to_pclose pclose_arg;
if (!use_libc_popen) {
- fp = popen_noshell_compat(m_Command.c_str(), "r", &pclose_arg);
+ m_FP = popen_noshell_compat(m_Command.c_str(), "r", &pclose_arg);
- if (fp == NULL) // TODO: add check for valgrind
+ if (m_FP == NULL) // TODO: add check for valgrind
use_libc_popen = true;
}
if (use_libc_popen)
- fp = popen(m_Command.c_str(), "r");
+ m_FP = popen(m_Command.c_str(), "r");
#endif /* _MSC_VER */
- stringstream outputbuf;
+ return (m_FP != NULL);
+}
- while (!feof(fp)) {
- char buffer[512];
- size_t read = fread(buffer, 1, sizeof(buffer), fp);
+bool NagiosCheckTask::RunTask(void)
+{
+ char buffer[512];
+ size_t read = fread(buffer, 1, sizeof(buffer), m_FP);
- if (read == 0)
- break;
+ if (read > 0)
+ m_OutputStream.write(buffer, read);
- outputbuf.write(buffer, read);
- }
+ if (!feof(m_FP))
+ return true;
- m_Result.Output = outputbuf.str();
+ m_Result.Output = m_OutputStream.str();
boost::algorithm::trim(m_Result.Output);
int status, exitcode;
#ifdef _MSC_VER
- status = _pclose(fp);
+ status = _pclose(m_FP);
#else /* _MSC_VER */
if (use_libc_popen)
status = pclose(fp);
#endif /* _MSC_VER */
time(&m_Result.EndTime);
+
+ return false;
+}
+
+int NagiosCheckTask::GetFD(void) const
+{
+ return fileno(m_FP);
}
CheckTask::Ptr NagiosCheckTask::CreateTask(const Service& service)
{
return boost::make_shared<NagiosCheckTask>(service);
}
+
+void NagiosCheckTask::Register(void)
+{
+ CheckTask::RegisterType("nagios", NagiosCheckTask::CreateTask, NagiosCheckTask::FlushQueue);
+
+ for (int i = 0; i < 1; i++) {
+ thread t(&NagiosCheckTask::CheckThreadProc);
+ t.detach();
+ }
+}
\ No newline at end of file