]> granicus.if.org Git - icinga2/commitdiff
Bugfixes for the delegation feature.
authorGunnar Beutner <gunnar@beutner.name>
Wed, 20 Jun 2012 22:10:10 +0000 (00:10 +0200)
committerGunnar Beutner <gunnar@beutner.name>
Wed, 20 Jun 2012 22:10:10 +0000 (00:10 +0200)
base/timer.cpp
components/delegation/delegationcomponent.cpp
components/delegation/delegationcomponent.h
icinga/endpointmanager.cpp
icinga/endpointmanager.h
icinga/nagioschecktask.cpp

index 0f003350eb661d2238dd2eb8725aa1f30240de15..d7646c59d8222e89f452f5c1822910b693769ff4 100644 (file)
@@ -107,10 +107,10 @@ void Timer::Call(void)
        time_t et;
        time(&et);
 
-       if (et - st > 5) {
+       if (et - st > 3) {
                stringstream msgbuf;
                msgbuf << "Timer call took " << et - st << " seconds.";
-               Application::Log(LogDebug, "base", msgbuf.str());
+               Application::Log(LogInformation, "base", msgbuf.str());
        }
 }
 
index 2bd2ecdec0f08da58dd4f117cc6190fb22c147a4..29881475780f81a4f0770a2a6eac1ad646aed7fe 100644 (file)
@@ -30,15 +30,16 @@ string DelegationComponent::GetName(void) const
 void DelegationComponent::Start(void)
 {
        m_AllServices = boost::make_shared<ConfigObject::Set>(ConfigObject::GetAllObjects(), ConfigObject::MakeTypePredicate("service"));
-       m_AllServices->OnObjectAdded.connect(boost::bind(&DelegationComponent::NewServiceHandler, this, _2));
+/*     m_AllServices->OnObjectAdded.connect(boost::bind(&DelegationComponent::NewServiceHandler, this, _2));
        m_AllServices->OnObjectCommitted.connect(boost::bind(&DelegationComponent::NewServiceHandler, this, _2));
-       m_AllServices->OnObjectRemoved.connect(boost::bind(&DelegationComponent::RemovedServiceHandler, this, _2));
+       m_AllServices->OnObjectRemoved.connect(boost::bind(&DelegationComponent::RemovedServiceHandler, this, _2));*/
        m_AllServices->Start();
 
        m_DelegationTimer = boost::make_shared<Timer>();
        m_DelegationTimer->SetInterval(30);
        m_DelegationTimer->OnTimerExpired.connect(boost::bind(&DelegationComponent::DelegationTimerHandler, this));
        m_DelegationTimer->Start();
+       m_DelegationTimer->Reschedule(0);
 
        m_DelegationEndpoint = boost::make_shared<VirtualEndpoint>();
        m_DelegationEndpoint->RegisterPublication("checker::AssignService");
@@ -54,17 +55,7 @@ void DelegationComponent::Stop(void)
                mgr->UnregisterEndpoint(m_DelegationEndpoint);
 }
 
-void DelegationComponent::NewServiceHandler(const Service& object)
-{
-       AssignService(object);
-}
-
-void DelegationComponent::RemovedServiceHandler(const Service& object)
-{
-       RevokeService(object);
-}
-
-void DelegationComponent::AssignService(const Service& service)
+void DelegationComponent::AssignService(const Endpoint::Ptr& checker, const Service& service)
 {
        RequestMessage request;
        request.SetMethod("checker::AssignService");
@@ -75,7 +66,7 @@ void DelegationComponent::AssignService(const Service& service)
 
        Application::Log(LogDebug, "delegation", "Trying to delegate service '" + service.GetName() + "'");
 
-       GetEndpointManager()->SendAPIMessage(m_DelegationEndpoint, request,
+       GetEndpointManager()->SendAPIMessage(m_DelegationEndpoint, checker, request,
            boost::bind(&DelegationComponent::AssignServiceResponseHandler, this, service, _2, _5));
 }
 
@@ -83,13 +74,11 @@ void DelegationComponent::AssignServiceResponseHandler(Service& service, const E
 {
        if (timedOut) {
                Application::Log(LogDebug, "delegation", "Service delegation for service '" + service.GetName() + "' timed out.");
-       } else {
-               service.SetChecker(sender->GetIdentity());
-               Application::Log(LogDebug, "delegation", "Service delegation for service '" + service.GetName() + "' was successful.");
+               service.SetChecker("");
        }
 }
 
-void DelegationComponent::RevokeService(const Service& service)
+void DelegationComponent::ClearServices(const Endpoint::Ptr& checker)
 {
 
 }
@@ -103,8 +92,14 @@ vector<Endpoint::Ptr> DelegationComponent::GetCheckerCandidates(const Service& s
        vector<Endpoint::Ptr> candidates;
 
        EndpointManager::Iterator it;
-       for (it = GetEndpointManager()->Begin(); it != GetEndpointManager()->End(); it++)
-               candidates.push_back(it->second);
+       for (it = GetEndpointManager()->Begin(); it != GetEndpointManager()->End(); it++) {
+               Endpoint::Ptr endpoint = it->second;
+
+               if (!endpoint->HasSubscription("checker::AssignService"))
+                       continue;
+
+               candidates.push_back(endpoint);
+       }
 
        return candidates;
 }
@@ -114,9 +109,8 @@ void DelegationComponent::DelegationTimerHandler(void)
        map<Endpoint::Ptr, int> histogram;
 
        EndpointManager::Iterator eit;
-       for (eit = GetEndpointManager()->Begin(); eit != GetEndpointManager()->End(); eit++) {
+       for (eit = GetEndpointManager()->Begin(); eit != GetEndpointManager()->End(); eit++)
                histogram[eit->second] = 0;
-       }
 
        /* nothing to do if we have no checkers */
        if (histogram.size() == 0)
@@ -144,11 +138,11 @@ void DelegationComponent::DelegationTimerHandler(void)
 
        std::random_shuffle(services.begin(), services.end());
 
-       long delegated = 0;
+       int delegated = 0;
 
        /* re-assign services */
        vector<Service>::iterator sit;
-       for (sit = services.begin(); sit != services.end(); it++) {
+       for (sit = services.begin(); sit != services.end(); sit++) {
                Service service = *sit;
 
                string checker = service.GetChecker();
@@ -158,14 +152,19 @@ void DelegationComponent::DelegationTimerHandler(void)
                        oldEndpoint = GetEndpointManager()->GetEndpointByIdentity(checker);
 
                vector<Endpoint::Ptr> candidates = GetCheckerCandidates(service);
+               std::random_shuffle(candidates.begin(), candidates.end());
+
+               stringstream msgbuf;
+               msgbuf << "Service: " << service.GetName() << ", candidates: " << candidates.size();
+               Application::Log(LogDebug, "delegation", msgbuf.str());
 
-               long avg_services = 0;
+               int avg_services = 0;
                vector<Endpoint::Ptr>::iterator cit;
                for (cit = candidates.begin(); cit != candidates.end(); cit++)
                        avg_services += histogram[*cit];
 
                avg_services /= candidates.size();
-               long overflow_tolerance = candidates.size() * 2;
+               int overflow_tolerance = candidates.size() * 2;
 
                /* don't re-assign service if the checker is still valid
                 * and doesn't have too many services */
@@ -198,6 +197,23 @@ void DelegationComponent::DelegationTimerHandler(void)
                assert(!service.GetChecker().empty());
        }
 
+       if (delegated > 0) {
+               map<Endpoint::Ptr, int>::iterator hit;
+               for (hit = histogram.begin(); hit != histogram.end(); hit++) {
+                       ClearServices(hit->first);
+               }
+
+               for (sit = services.begin(); sit != services.end(); sit++) {
+                       string checker = sit->GetChecker();
+                       Endpoint::Ptr endpoint = GetEndpointManager()->GetEndpointByIdentity(checker);
+
+                       if (!endpoint)
+                               continue;
+
+                       AssignService(endpoint, *sit);
+               }
+       }
+
        stringstream msgbuf;
        msgbuf << "Re-delegated " << delegated << " services";
        Application::Log(LogInformation, "delegation", msgbuf.str());
index c4d85e6c5e208664d7aa134669a36988d6fb27b6..0ab714737824b08a11a9d4223a1d095b2add0134 100644 (file)
@@ -38,9 +38,6 @@ private:
        ConfigObject::Set::Ptr m_AllServices;
        Timer::Ptr m_DelegationTimer;
 
-       void NewServiceHandler(const Service& object);
-       void RemovedServiceHandler(const Service& object);
-
        void AssignServiceResponseHandler(Service& service, const Endpoint::Ptr& sender, bool timedOut);
        void RevokeServiceResponseHandler(Service& service, const Endpoint::Ptr& sender, bool timedOut);
 
@@ -48,8 +45,8 @@ private:
 
        vector<Endpoint::Ptr> GetCheckerCandidates(const Service& service) const;
 
-       void AssignService(const Service& service);
-       void RevokeService(const Service& service);
+       void AssignService(const Endpoint::Ptr& checker, const Service& service);
+       void ClearServices(const Endpoint::Ptr& checker);
 };
 
 }
index 43354ccfc0b83994f1b94f5dda16e18b9b6e73ed..03747e1bd1671fafde45f77962e7035612121c43 100644 (file)
@@ -292,7 +292,7 @@ Endpoint::Ptr EndpointManager::GetEndpointByIdentity(string identity) const
                return Endpoint::Ptr();
 }
 
-void EndpointManager::SendAPIMessage(Endpoint::Ptr sender,
+void EndpointManager::SendAPIMessage(const Endpoint::Ptr& sender, const Endpoint::Ptr& recipient,
     RequestMessage& message,
     function<void(const EndpointManager::Ptr&, const Endpoint::Ptr, const RequestMessage&, const ResponseMessage&, bool TimedOut)> callback, time_t timeout)
 {
@@ -312,7 +312,10 @@ void EndpointManager::SendAPIMessage(Endpoint::Ptr sender,
        m_Requests[id] = pr;
        RescheduleRequestTimer();
 
-       SendAnycastMessage(sender, message);
+       if (!recipient)
+               SendAnycastMessage(sender, message);
+       else
+               SendUnicastMessage(sender, recipient, message);
 }
 
 bool EndpointManager::RequestTimeoutLessComparer(const pair<string, PendingRequest>& a,
index 458e46858c872e3f6dfecdbadcf0cfed833edc61..61f84aa00d7214b11cccdf975d719fe9a6606dae 100644 (file)
@@ -56,7 +56,7 @@ public:
        void SendAnycastMessage(Endpoint::Ptr sender, const RequestMessage& message);
        void SendMulticastMessage(Endpoint::Ptr sender, const RequestMessage& message);
 
-       void SendAPIMessage(Endpoint::Ptr sender, RequestMessage& message,
+       void SendAPIMessage(const Endpoint::Ptr& sender, const Endpoint::Ptr& recipient, RequestMessage& message,
            function<void(const EndpointManager::Ptr&, const Endpoint::Ptr, const RequestMessage&, const ResponseMessage&, bool TimedOut)> callback, time_t timeout = 10);
 
        void ProcessResponseMessage(const Endpoint::Ptr& sender, const ResponseMessage& message);
index 1bf3c44a5bae2a394d6ae6badca378cb5acbd988..aa0b5c6a802d4c7fb1f529a79a7cfa85b324fc5b 100644 (file)
@@ -57,8 +57,19 @@ void NagiosCheckTask::RunCheck(void)
 #ifdef _MSC_VER
        fp = _popen(m_Command.c_str(), "r");
 #else /* _MSC_VER */
+       bool use_libc_popen = false;
+
        popen_noshell_pass_to_pclose pclose_arg;
-       fp = popen_noshell_compat(m_Command.c_str(), "r", &pclose_arg);
+
+       if (!use_libc_popen) {
+               fp = popen_noshell_compat(m_Command.c_str(), "r", &pclose_arg);
+
+               if (fp == NULL) // TODO: add check for valgrind
+                       use_libc_popen = true;
+       }
+
+       if (use_libc_popen)
+               fp = popen(m_Command.c_str(), "r");
 #endif /* _MSC_VER */
 
        stringstream outputbuf;
@@ -80,7 +91,10 @@ void NagiosCheckTask::RunCheck(void)
 #ifdef _MSC_VER
        status = _pclose(fp);
 #else /* _MSC_VER */
-       status = pclose_noshell(&pclose_arg);
+       if (use_libc_popen)
+               status = pclose(fp);
+       else
+               status = pclose_noshell(&pclose_arg);
 #endif /* _MSC_VER */
 
 #ifndef _MSC_VER