]> granicus.if.org Git - icinga2/commitdiff
Delegation bugfixes.
authorGunnar Beutner <gunnar.beutner@netways.de>
Thu, 21 Jun 2012 10:51:50 +0000 (12:51 +0200)
committerGunnar Beutner <gunnar.beutner@netways.de>
Thu, 21 Jun 2012 10:52:13 +0000 (12:52 +0200)
base/utility.cpp
components/checker/checkercomponent.cpp
components/checker/checkercomponent.h
components/delegation/delegationcomponent.cpp
components/discovery/discoverycomponent.cpp
icinga/endpointmanager.cpp
icinga/endpointmanager.h
icinga/jsonrpcendpoint.cpp
icinga/jsonrpcendpoint.h

index d280edd2925c9411c021f827185d8fb3fb20537a..e4ce045c1ebec7ec0fd6c8283c76586d8af21422 100644 (file)
@@ -100,6 +100,14 @@ shared_ptr<SSL_CTX> Utility::MakeSSLContext(string pubkey, string privkey, strin
        if (!SSL_CTX_load_verify_locations(sslContext.get(), cakey.c_str(), NULL))
                throw OpenSSLException("Could not load public CA key file", ERR_get_error());
 
+       STACK_OF(X509_NAME) *cert_names;
+
+       cert_names = SSL_load_client_CA_file(cakey.c_str());
+       if (cert_names == NULL)
+               throw OpenSSLException("SSL_load_client_CA_file() failed", ERR_get_error());
+
+       SSL_CTX_set_client_CA_list(sslContext.get(), cert_names);
+
        return sslContext;
 }
 
index 69bed96401e3c509fbed9a30c4de845e4437a880..14364cba557cafed06a3811f0e255f06748aa3d9 100644 (file)
@@ -31,8 +31,6 @@ void CheckerComponent::Start(void)
        m_CheckerEndpoint = boost::make_shared<VirtualEndpoint>();
        m_CheckerEndpoint->RegisterTopicHandler("checker::AssignService",
                boost::bind(&CheckerComponent::AssignServiceRequestHandler, this, _2, _3));
-       m_CheckerEndpoint->RegisterTopicHandler("checker::RevokeService",
-               boost::bind(&CheckerComponent::RevokeServiceRequestHandler, this, _2, _3));
        m_CheckerEndpoint->RegisterTopicHandler("checker::ClearServices",
                boost::bind(&CheckerComponent::ClearServicesRequestHandler, this, _2, _3));
        m_CheckerEndpoint->RegisterPublication("checker::CheckResult");
@@ -78,6 +76,8 @@ void CheckerComponent::CheckTimerHandler(void)
 
                Application::Log(LogDebug, "checker", "Executing service check for '" + service.GetName() + "'");
 
+               m_PendingServices.insert(service.GetName());
+
                CheckTask::Ptr task = CheckTask::CreateTask(service);
                task->Enqueue();
 
@@ -109,6 +109,11 @@ void CheckerComponent::ResultTimerHandler(void)
 
                Service service = task->GetService();
 
+               /* if the service isn't in the set of pending services
+                * it was removed and we need to ignore this check result. */
+               if (m_PendingServices.find(service.GetName()) == m_PendingServices.end())
+                       continue;
+
                CheckResult result = task->GetResult();
                Application::Log(LogDebug, "checker", "Got result for service '" + service.GetName() + "'");
 
@@ -127,6 +132,7 @@ void CheckerComponent::ResultTimerHandler(void)
                        failed++;
 
                service.SetNextCheck(now + service.GetCheckInterval());
+               m_PendingServices.erase(service.GetName());
                m_Services.push(service);
        }
 
@@ -157,49 +163,6 @@ void CheckerComponent::AssignServiceRequestHandler(const Endpoint::Ptr& sender,
 
        Application::Log(LogDebug, "checker", "Accepted delegation for service '" + service.GetName() + "'");
 
-       /* force a service check */
-       m_CheckTimer->Reschedule(0);
-
-       string id;
-       if (request.GetID(&id)) {
-               ResponseMessage rm;
-               rm.SetID(id);
-
-               MessagePart result;
-               rm.SetResult(result);
-               GetEndpointManager()->SendUnicastMessage(m_CheckerEndpoint, sender, rm);
-       }
-}
-
-void CheckerComponent::RevokeServiceRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request)
-{
-       MessagePart params;
-       if (!request.GetParams(&params))
-               return;
-
-       string name;
-       if (!params.GetProperty("service", &name))
-               return;
-
-       vector<Service> services;
-
-       while (!m_Services.empty()) {
-               Service service = m_Services.top();
-
-               if (service.GetName() == name)
-                       continue;
-
-               // TODO: take care of services that are currently being checked
-
-               services.push_back(service);
-       }
-
-       vector<Service>::const_iterator it;
-       for (it = services.begin(); it != services.end(); it++)
-               m_Services.push(*it);
-
-       Application::Log(LogDebug, "checker", "Revoked delegation for service '" + name + "'");
-
        string id;
        if (request.GetID(&id)) {
                ResponseMessage rm;
@@ -214,7 +177,12 @@ void CheckerComponent::RevokeServiceRequestHandler(const Endpoint::Ptr& sender,
 void CheckerComponent::ClearServicesRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request)
 {
        Application::Log(LogDebug, "checker", "Clearing service delegations.");
+
+       /* clear the services lists */
        m_Services = ServiceQueue();
+       m_PendingServices.clear();
+
+       /* TODO: clear checks we've already sent to the thread pool */
 
        string id;
        if (request.GetID(&id)) {
index 46ad620b767736afdc95933b81ac9c444cf22baa..630f23fb530d76d0da6c5ff28a498e66297861be 100644 (file)
@@ -51,7 +51,7 @@ private:
        VirtualEndpoint::Ptr m_CheckerEndpoint;
 
        ServiceQueue m_Services;
-       set<Service> m_PendingServices;
+       set<string> m_PendingServices;
 
        Timer::Ptr m_CheckTimer;
 
index 29881475780f81a4f0770a2a6eac1ad646aed7fe..9fa36001862a14e070c8829c098044fcb2875416 100644 (file)
@@ -43,7 +43,7 @@ void DelegationComponent::Start(void)
 
        m_DelegationEndpoint = boost::make_shared<VirtualEndpoint>();
        m_DelegationEndpoint->RegisterPublication("checker::AssignService");
-       m_DelegationEndpoint->RegisterPublication("checker::RevokeService");
+       m_DelegationEndpoint->RegisterPublication("checker::ClearServices");
        GetEndpointManager()->RegisterEndpoint(m_DelegationEndpoint);
 }
 
@@ -112,10 +112,6 @@ void DelegationComponent::DelegationTimerHandler(void)
        for (eit = GetEndpointManager()->Begin(); eit != GetEndpointManager()->End(); eit++)
                histogram[eit->second] = 0;
 
-       /* nothing to do if we have no checkers */
-       if (histogram.size() == 0)
-               return;
-
        vector<Service> services;
 
        /* build "checker -> service count" histogram */
@@ -152,19 +148,23 @@ void DelegationComponent::DelegationTimerHandler(void)
                        oldEndpoint = GetEndpointManager()->GetEndpointByIdentity(checker);
 
                vector<Endpoint::Ptr> candidates = GetCheckerCandidates(service);
-               std::random_shuffle(candidates.begin(), candidates.end());
-
-               stringstream msgbuf;
-               msgbuf << "Service: " << service.GetName() << ", candidates: " << candidates.size();
-               Application::Log(LogDebug, "delegation", msgbuf.str());
 
-               int avg_services = 0;
+               int avg_services = 0, overflow_tolerance = 0;
                vector<Endpoint::Ptr>::iterator cit;
-               for (cit = candidates.begin(); cit != candidates.end(); cit++)
-                       avg_services += histogram[*cit];
 
-               avg_services /= candidates.size();
-               int overflow_tolerance = candidates.size() * 2;
+               if (candidates.size() > 0) {
+                       std::random_shuffle(candidates.begin(), candidates.end());
+
+                       stringstream msgbuf;
+                       msgbuf << "Service: " << service.GetName() << ", candidates: " << candidates.size();
+                       Application::Log(LogDebug, "delegation", msgbuf.str());
+
+                       for (cit = candidates.begin(); cit != candidates.end(); cit++)
+                               avg_services += histogram[*cit];
+
+                       avg_services /= candidates.size();
+                       overflow_tolerance = candidates.size() * 2;
+               }
 
                /* don't re-assign service if the checker is still valid
                 * and doesn't have too many services */
@@ -194,7 +194,7 @@ void DelegationComponent::DelegationTimerHandler(void)
                        delegated++;
                }
 
-               assert(!service.GetChecker().empty());
+               assert(candidates.size() == 0 || !service.GetChecker().empty());
        }
 
        if (delegated > 0) {
index 328268e3d02ab17a39b4734eb4d8914ca7bf3177..daf433b2fba1cd3aa74ae93a9ae008e03c23c4b0 100644 (file)
@@ -105,6 +105,10 @@ void DiscoveryComponent::CheckExistingEndpoint(const Endpoint::Ptr& self, const
  */
 void DiscoveryComponent::NewEndpointHandler(const Endpoint::Ptr& endpoint)
 {
+       /* ignore local endpoints */
+       if (endpoint->IsLocal())
+               return;
+
        /* accept discovery::RegisterComponent messages from any endpoint */
        endpoint->RegisterPublication("discovery::RegisterComponent");
 
@@ -354,8 +358,13 @@ void DiscoveryComponent::ProcessDiscoveryMessage(const string& identity, const D
 
        time(&(info->LastSeen));
 
-       message.GetNode(&info->Node);
-       message.GetService(&info->Service);
+       string node;
+       if (message.GetNode(&node) && !node.empty())
+               info->Node = node;
+
+       string service;
+       if (message.GetService(&service) && !service.empty())
+               info->Service = service;
 
        ConfigObject::Ptr endpointConfig = ConfigObject::GetObject("endpoint", identity);
        Dictionary::Ptr roles;
@@ -472,6 +481,10 @@ void DiscoveryComponent::DiscoveryTimerHandler(void)
                string identity = i->first;
                ComponentDiscoveryInfo::Ptr info = i->second;
 
+               /* there's no need to reconnect to ourself */
+               if (identity == GetEndpointManager()->GetIdentity())
+                       continue;
+
                curr = i;
                i++;
 
@@ -492,7 +505,14 @@ void DiscoveryComponent::DiscoveryTimerHandler(void)
                } else {
                        /* TODO: figure out whether we actually want to connect to this component */
                        /* try and reconnect to this component */
-                       endpointManager->AddConnection(info->Node, info->Service);
+                       try {
+                               if (!info->Node.empty() && !info->Service.empty())
+                                       endpointManager->AddConnection(info->Node, info->Service);
+                       } catch (const std::exception& ex) {
+                               stringstream msgbuf;
+                               msgbuf << "Exception while trying to reconnect to endpoint '" << endpoint->GetIdentity() << "': " << ex.what();;
+                               Application::Log(LogInformation, "discovery", msgbuf.str());
+                       }
                }
        }
 }
index 03747e1bd1671fafde45f77962e7035612121c43..2f03990a4e9a0bc06c15967969eff8b44e4121f6 100644 (file)
 
 using namespace icinga;
 
+/**
+ * Constructor for the EndpointManager class.
+ */
+EndpointManager::EndpointManager(void)
+       : m_NextMessageID(0)
+{
+       m_RequestTimer = boost::make_shared<Timer>();
+       m_RequestTimer->OnTimerExpired.connect(boost::bind(&EndpointManager::RequestTimerHandler, this));
+       m_RequestTimer->SetInterval(5);
+       m_RequestTimer->Start();
+}
+
 /**
  * Sets the identity of the endpoint manager. This identity is used when
  * connecting to remote peers.
@@ -147,9 +159,6 @@ void EndpointManager::UnregisterServer(JsonRpcServer::Ptr server)
  */
 void EndpointManager::RegisterEndpoint(Endpoint::Ptr endpoint)
 {
-       if (!endpoint->IsLocal() && endpoint->GetIdentity() != "")
-               throw invalid_argument("Identity must be empty.");
-
        endpoint->SetEndpointManager(GetSelf());
 
        UnregisterEndpoint(endpoint);
@@ -310,7 +319,6 @@ void EndpointManager::SendAPIMessage(const Endpoint::Ptr& sender, const Endpoint
        pr.Timeout = time(NULL) + timeout;
 
        m_Requests[id] = pr;
-       RescheduleRequestTimer();
 
        if (!recipient)
                SendAnycastMessage(sender, message);
@@ -324,29 +332,6 @@ bool EndpointManager::RequestTimeoutLessComparer(const pair<string, PendingReque
        return a.second.Timeout < b.second.Timeout;
 }
 
-void EndpointManager::RescheduleRequestTimer(void)
-{
-       map<string, PendingRequest>::iterator it;
-       it = min_element(m_Requests.begin(), m_Requests.end(),
-           &EndpointManager::RequestTimeoutLessComparer);
-
-       if (!m_RequestTimer) {
-               m_RequestTimer = boost::make_shared<Timer>();
-               m_RequestTimer->OnTimerExpired.connect(boost::bind(&EndpointManager::RequestTimerHandler, this));
-       }
-
-       if (it != m_Requests.end()) {
-               time_t now;
-               time(&now);
-
-               time_t next_timeout = (it->second.Timeout < now) ? now : it->second.Timeout;
-               m_RequestTimer->SetInterval(next_timeout - now);
-               m_RequestTimer->Start();
-       } else {
-               m_RequestTimer->Stop();
-       }
-}
-
 void EndpointManager::RequestTimerHandler(void)
 {
        map<string, PendingRequest>::iterator it;
@@ -359,8 +344,6 @@ void EndpointManager::RequestTimerHandler(void)
                        break;
                }
        }
-
-       RescheduleRequestTimer();
 }
 
 void EndpointManager::ProcessResponseMessage(const Endpoint::Ptr& sender, const ResponseMessage& message)
@@ -378,7 +361,6 @@ void EndpointManager::ProcessResponseMessage(const Endpoint::Ptr& sender, const
        it->second.Callback(GetSelf(), sender, it->second.Request, message, false);
 
        m_Requests.erase(it);
-       RescheduleRequestTimer();
 }
 
 EndpointManager::Iterator EndpointManager::Begin(void)
index 61f84aa00d7214b11cccdf975d719fe9a6606dae..663e48af6d3b2f25a2ecb6b9ff76a36ff5de34f3 100644 (file)
@@ -36,9 +36,7 @@ public:
 
        typedef map<string, Endpoint::Ptr>::iterator Iterator;
 
-       EndpointManager(void)
-               : m_NextMessageID(0)
-       { }
+       EndpointManager(void);
 
        void SetIdentity(string identity);
        string GetIdentity(void) const;
@@ -102,7 +100,6 @@ private:
        void UnregisterServer(JsonRpcServer::Ptr server);
 
        static bool RequestTimeoutLessComparer(const pair<string, PendingRequest>& a, const pair<string, PendingRequest>& b);
-       void RescheduleRequestTimer(void);
        void RequestTimerHandler(void);
 
        void NewClientHandler(const TcpClient::Ptr& client);
index 97fc57a5ae3896be8a03f17169b5e4f588ce0734..466ce3d8af1ba55526081daf910d8392f4a8bb7c 100644 (file)
@@ -142,9 +142,9 @@ void JsonRpcEndpoint::ClientErrorHandler(const std::exception& ex)
        Application::Log(LogWarning, "jsonrpc", message.str());
 }
 
-void JsonRpcEndpoint::VerifyCertificateHandler(boolvalid, const shared_ptr<X509>& certificate)
+void JsonRpcEndpoint::VerifyCertificateHandler(bool *valid, const shared_ptr<X509>& certificate)
 {
-       if (certificate && valid) {
+       if (certificate && *valid) {
                string identity = Utility::GetCertificateCN(certificate);
 
                if (GetIdentity().empty() && !identity.empty()) {
index b7e8802657eb616ac464297441b7789b3006b791..cc9f050c7301d4e64489773eb001da2f1f6225e2 100644 (file)
@@ -65,7 +65,7 @@ private:
        void NewMessageHandler(const MessagePart& message);
        void ClientClosedHandler(void);
        void ClientErrorHandler(const std::exception& ex);
-       void VerifyCertificateHandler(boolvalid, const shared_ptr<X509>& certificate);
+       void VerifyCertificateHandler(bool *valid, const shared_ptr<X509>& certificate);
 };
 
 }