]> granicus.if.org Git - icinga2/commitdiff
Bugfixes for the Process/AsyncTask classes.
authorGunnar Beutner <gunnar@beutner.name>
Sat, 14 Jul 2012 10:44:37 +0000 (12:44 +0200)
committerGunnar Beutner <gunnar@beutner.name>
Sat, 14 Jul 2012 10:44:37 +0000 (12:44 +0200)
12 files changed:
base/asynctask.h
base/configobject.cpp
base/configobject.h
base/process.cpp
base/process.h
cib/checktask.cpp
cib/checktask.h
cib/configobjectadapter.cpp
cib/configobjectadapter.h
cib/nagioschecktask.cpp
cib/nagioschecktask.h
components/checker/checkercomponent.cpp

index 75fe0283b0fd8ce287c0aa8f99c8a954b91fcfb7..9e25e7f327b5918298077984ff7ed4f0f84acb51 100644 (file)
@@ -30,8 +30,10 @@ public:
        typedef shared_ptr<AsyncTask<T> > Ptr;
        typedef weak_ptr<AsyncTask<T> > WeakPtr;
 
-       AsyncTask(void)
-               : m_Finished(false)
+       typedef function<void (const shared_ptr<T>&)> CompletionCallback;
+
+       AsyncTask(const CompletionCallback& completionCallback)
+               : m_Finished(false), m_CompletionCallback(completionCallback)
        { }
 
        ~AsyncTask(void)
@@ -46,17 +48,24 @@ public:
                Run();
        }
 
-       boost::signal<void (const shared_ptr<T>&)> OnTaskCompleted;
-
 protected:
        virtual void Run(void) = 0;
 
        void Finish(void)
        {
-               Event::Post(boost::bind(boost::cref(OnTaskCompleted), static_cast<shared_ptr<T> >(GetSelf())));
+               Event::Post(boost::bind(&T::ForwardCallback, static_cast<shared_ptr<T> >(GetSelf())));
+       }
+
+private:
+       void ForwardCallback(void)
+       {
+               m_CompletionCallback(GetSelf());
+               m_CompletionCallback = CompletionCallback();
+               m_Finished = true;
        }
 
        bool m_Finished;
+       CompletionCallback m_CompletionCallback;
 };
 
 }
index 13b6f7d5c7b68dd614979b587f2455a0d7d6d3f8..c80c09c08d523ab5caddfe1ce3f1330f51c123de 100644 (file)
@@ -191,3 +191,8 @@ ConfigObject::TMap::Range ConfigObject::GetObjects(string type)
 {
        return GetObjectsByType()->GetRange(type);
 }
+
+void ConfigObject::RemoveTag(const string& key)
+{
+       GetTags()->Remove(key);
+}
index 2a5bd31b06c96d465e09672a4fa60402161dbc44..6fbb1427cc4ab80c953d731fc51a281889b6d63a 100644 (file)
@@ -63,6 +63,8 @@ public:
                return GetTags()->Get(key, value);
        }
 
+       void RemoveTag(const string& key);
+
        string GetType(void) const;
        string GetName(void) const;
 
index 2d1d8292bb06c6e8ae88d8768d83c93b396b6249..50c4feabea14d790fdc8e697faee8bfa500eaa38 100644 (file)
 
 using namespace icinga;
 
-bool Process::m_ThreadsCreated = false;
+bool Process::m_ThreadCreated = false;
 boost::mutex Process::m_Mutex;
 deque<Process::Ptr> Process::m_Tasks;
 condition_variable Process::m_TasksCV;
 
-Process::Process(const string& command)
-       : m_Command(command)
+Process::Process(const string& command, const CompletionCallback& completionCallback)
+       : AsyncTask<Process>(completionCallback), m_Command(command), m_UsePopen(false)
 {
-       if (!m_ThreadsCreated) {
-               int numThreads = boost::thread::hardware_concurrency();
+       if (!m_ThreadCreated) {
+               thread t(&Process::WorkerThreadProc);
+               t.detach();
 
-               if (numThreads < 4)
-                       numThreads = 4;
-
-               for (int i = 0; i < numThreads; i++) {
-                       thread t(&Process::WorkerThreadProc);
-                       t.detach();
-               }
-
-               m_ThreadsCreated = true;
+               m_ThreadCreated = true;
        }
 }
 
@@ -52,7 +45,7 @@ void Process::Run(void)
 {
        mutex::scoped_lock lock(m_Mutex);
        m_Tasks.push_back(GetSelf());
-       m_TasksCV.notify_one();
+       m_TasksCV.notify_all();
 }
 
 void Process::WorkerThreadProc(void)
@@ -116,6 +109,9 @@ void Process::WorkerThreadProc(void)
                while (!m_Tasks.empty() && tasks.size() < MaxTasksPerThread) {
                        Process::Ptr task = m_Tasks.front();
                        m_Tasks.pop_front();
+
+                       lock.unlock();
+
                        if (!task->InitTask()) {
                                task->Finish();
                        } else {
@@ -123,12 +119,16 @@ void Process::WorkerThreadProc(void)
                                if (fd >= 0)
                                        tasks[fd] = task;
                        }
+
+                       lock.lock();
                }
        }
 }
 
 bool Process::InitTask(void)
 {
+       time(&m_ExecutionStart);
+
 #ifdef _MSC_VER
        m_FP = _popen(m_Command.c_str(), "r");
 #else /* _MSC_VER */
@@ -147,6 +147,8 @@ bool Process::InitTask(void)
 #endif /* _MSC_VER */
 
        if (m_FP == NULL) {
+               m_ExitStatus = 128;
+               m_ExecutionEnd = m_ExecutionStart;
                return false;
        }
 
@@ -178,6 +180,8 @@ bool Process::RunTask(void)
        }
 #endif /* _MSC_VER */
 
+       time(&m_ExecutionEnd);
+
 #ifndef _MSC_VER
        if (WIFEXITED(status)) {
                exitcode = WEXITSTATUS(status);
@@ -220,3 +224,14 @@ string Process::GetOutput(void) const
 {
        return m_Output;
 }
+
+time_t Process::GetExecutionStart(void) const
+{
+       return m_ExecutionStart;
+}
+
+time_t Process::GetExecutionEnd(void) const
+{
+       return m_ExecutionEnd;
+}
+
index 4aa0e7f77b1d40b099ef08fe4892c8e2f862a983..a5d11b85ab63c42f8458b9440bb2b71d2bbf1af9 100644 (file)
@@ -31,13 +31,16 @@ public:
 
        static const int MaxTasksPerThread = 128;
 
-       Process(const string& command);
+       Process(const string& command, const CompletionCallback& completionCallback);
+
+       time_t GetExecutionStart(void) const;
+       time_t GetExecutionEnd(void) const;
 
        long GetExitStatus(void) const;
        string GetOutput(void) const;
 
 private:
-       static bool m_ThreadsCreated;
+       static bool m_ThreadCreated;
 
        string m_Command;
 
@@ -48,6 +51,8 @@ private:
        void *m_PCloseArg;
 #endif /* _MSC_VER */
 
+       time_t m_ExecutionStart;
+       time_t m_ExecutionEnd;
        long m_ExitStatus;
        string m_Output;
 
index 91afd23be69861efe6b7522a8f7b86749bc36634..e75f3799dc453b19b7692968eac10baecada7c8b 100644 (file)
@@ -23,8 +23,8 @@ using namespace icinga;
 
 map<string, CheckTaskType> CheckTask::m_Types;
 
-CheckTask::CheckTask(const Service& service)
-       : m_Service(service)
+CheckTask::CheckTask(const Service& service, const CompletionCallback& completionCallback)
+       : AsyncTask<CheckTask>(completionCallback), m_Service(service)
 { }
 
 Service& CheckTask::GetService(void)
@@ -45,7 +45,7 @@ void CheckTask::RegisterType(string type, Factory factory)
        m_Types[type] = ctt;
 }
 
-CheckTask::Ptr CheckTask::CreateTask(const Service& service)
+CheckTask::Ptr CheckTask::CreateTask(const Service& service, const CompletionCallback& completionCallback)
 {
        map<string, CheckTaskType>::iterator it;
 
@@ -54,5 +54,5 @@ CheckTask::Ptr CheckTask::CreateTask(const Service& service)
        if (it == m_Types.end())
                throw runtime_error("Invalid check type specified for service '" + service.GetName() + "'");
 
-       return it->second.Factory(service);
+       return it->second.Factory(service, completionCallback);
 }
index 321300158ff74ce427d8f7cfb50d3909667dc0ef..db20668c70c1445479068a8569f98ad695360fbc 100644 (file)
@@ -31,18 +31,18 @@ public:
        typedef shared_ptr<CheckTask> Ptr;
        typedef weak_ptr<CheckTask> WeakPtr;
 
-       typedef function<CheckTask::Ptr(const Service&)> Factory;
+       typedef function<CheckTask::Ptr(const Service&, const CompletionCallback&)> Factory;
 
        Service& GetService(void);
        CheckResult& GetResult(void);
 
        static void RegisterType(string type, Factory factory);
-       static CheckTask::Ptr CreateTask(const Service& service);
+       static CheckTask::Ptr CreateTask(const Service& service, const CompletionCallback& completionCallback);
 
        static int GetTaskHistogramSlots(void);
 
 protected:
-       CheckTask(const Service& service);
+       CheckTask(const Service& service, const CompletionCallback& completionCallback);
 
        virtual void Run(void) = 0;
 
index fc7279c65f633aa8750dca5b880bc5d21c0d812a..4185552b8d20088f56e97d819b77fc08d9c6cb1e 100644 (file)
@@ -40,3 +40,8 @@ ConfigObject::Ptr ConfigObjectAdapter::GetConfigObject() const
 {
        return m_ConfigObject;
 }
+
+void ConfigObjectAdapter::RemoveTag(const string& key)
+{
+       m_ConfigObject->RemoveTag(key);
+}
index 4677b3f85884fcad500962ffb29813c0bc6b24b1..6036a4464ac9b39817474860a62042188f30ea43 100644 (file)
@@ -55,6 +55,8 @@ public:
                return GetConfigObject()->GetTag(key, value);
        }
 
+       void RemoveTag(const string& key);
+
 private:
        ConfigObject::Ptr m_ConfigObject;
 };
index 5b6dc23fca87007f336f471aa63e247e1fa8a9e4..f2077cacffd397a912d23b0035d49c8dde2b5240 100644 (file)
@@ -21,8 +21,8 @@
 
 using namespace icinga;
 
-NagiosCheckTask::NagiosCheckTask(const Service& service)
-       : CheckTask(service)
+NagiosCheckTask::NagiosCheckTask(const Service& service, const CompletionCallback& completionCallback)
+       : CheckTask(service, completionCallback)
 {
        string checkCommand = service.GetCheckCommand();
 
@@ -30,8 +30,7 @@ NagiosCheckTask::NagiosCheckTask(const Service& service)
        macroDicts.push_back(service.GetMacros());
        macroDicts.push_back(service.GetHost().GetMacros());
        macroDicts.push_back(IcingaApplication::GetInstance()->GetMacros());
-       string command = MacroProcessor::ResolveMacros(checkCommand, macroDicts);
-       m_Process = boost::make_shared<Process>(command);
+       m_Command = MacroProcessor::ResolveMacros(checkCommand, macroDicts);
 }
 
 void NagiosCheckTask::Run(void)
@@ -40,22 +39,22 @@ void NagiosCheckTask::Run(void)
        time(&now);
        GetResult().SetScheduleStart(now);
 
-       m_Process->OnTaskCompleted.connect(boost::bind(&NagiosCheckTask::ProcessFinishedHandler, static_cast<NagiosCheckTask::Ptr>(GetSelf())));
+       m_Process = boost::make_shared<Process>(m_Command, boost::bind(&NagiosCheckTask::ProcessFinishedHandler, static_cast<NagiosCheckTask::Ptr>(GetSelf())));
        m_Process->Start();
 }
 
 void NagiosCheckTask::ProcessFinishedHandler(void)
 {
-       time_t now;
-       time(&now);
-       GetResult().SetExecutionEnd(now);
+       GetResult().SetExecutionStart(m_Process->GetExecutionStart());
+       GetResult().SetExecutionEnd(m_Process->GetExecutionEnd());
 
        string output = m_Process->GetOutput();
+       long exitcode = m_Process->GetExitStatus();
+       m_Process.reset();
+
        boost::algorithm::trim(output);
        ProcessCheckOutput(output);
 
-       long exitcode = m_Process->GetExitStatus();
-
        ServiceState state;
 
        switch (exitcode) {
@@ -75,6 +74,7 @@ void NagiosCheckTask::ProcessFinishedHandler(void)
 
        GetResult().SetState(state);
 
+       time_t now;
        time(&now);
        GetResult().SetScheduleEnd(now);
 
@@ -114,9 +114,9 @@ void NagiosCheckTask::ProcessCheckOutput(const string& output)
        GetResult().SetPerformanceDataRaw(perfdata);
 }
 
-CheckTask::Ptr NagiosCheckTask::CreateTask(const Service& service)
+CheckTask::Ptr NagiosCheckTask::CreateTask(const Service& service, const CompletionCallback& completionCallback)
 {
-       return boost::make_shared<NagiosCheckTask>(service);
+       return boost::make_shared<NagiosCheckTask>(service, completionCallback);
 }
 
 void NagiosCheckTask::Register(void)
index 0375324d031690f8fea4d74293ce452de1e5ca6e..1a3e4a581516f2f2d54258a1b56200f6e3f8644d 100644 (file)
@@ -29,13 +29,14 @@ public:
        typedef shared_ptr<NagiosCheckTask> Ptr;
        typedef weak_ptr<NagiosCheckTask> WeakPtr;
 
-       NagiosCheckTask(const Service& service);
+       NagiosCheckTask(const Service& service, const CompletionCallback& completionCallback);
 
-       static CheckTask::Ptr CreateTask(const Service& service);
+       static CheckTask::Ptr CreateTask(const Service& service, const CompletionCallback& completionCallback);
 
        static void Register(void);
 
 private:
+       string m_Command;
        Process::Ptr m_Process;
 
        virtual void Run(void);
index 89ae89d350927426ce5b5cb1062676d1e6d0ebee..6c7d5416ed664768bf2bb678b759de568ca20162 100644 (file)
@@ -37,7 +37,7 @@ void CheckerComponent::Start(void)
        EndpointManager::GetInstance()->RegisterEndpoint(m_Endpoint);
 
        m_CheckTimer = boost::make_shared<Timer>();
-       m_CheckTimer->SetInterval(5);
+       m_CheckTimer->SetInterval(1);
        m_CheckTimer->OnTimerExpired.connect(boost::bind(&CheckerComponent::CheckTimerHandler, this));
        m_CheckTimer->Start();
 
@@ -78,10 +78,11 @@ void CheckerComponent::CheckTimerHandler(void)
 
                m_PendingServices.insert(service.GetConfigObject());
 
-               CheckTask::Ptr task = CheckTask::CreateTask(service);
-               task->OnTaskCompleted.connect(boost::bind(&CheckerComponent::CheckCompletedHandler, this, _1));
+               CheckTask::Ptr task = CheckTask::CreateTask(service, boost::bind(&CheckerComponent::CheckCompletedHandler, this, _1));
                task->Start();
 
+               service.SetTag("current_task", task);
+
                tasks++;
        }
 
@@ -96,8 +97,10 @@ void CheckerComponent::CheckCompletedHandler(const CheckTask::Ptr& task)
 {
        Service service = task->GetService();
 
+       service.RemoveTag("current_task");
+
        /* if the service isn't in the set of pending services
-               * it was removed and we need to ignore this check result. */
+        * it was removed and we need to ignore this check result. */
        if (m_PendingServices.find(service.GetConfigObject()) == m_PendingServices.end())
                return;