]> granicus.if.org Git - icinga2/commitdiff
Implemented broker authorisation.
authorGunnar Beutner <gunnar.beutner@netways.de>
Tue, 8 May 2012 07:20:42 +0000 (09:20 +0200)
committerGunnar Beutner <gunnar.beutner@netways.de>
Tue, 8 May 2012 07:20:42 +0000 (09:20 +0200)
components/discovery/discoverycomponent.cpp
components/discovery/discoverycomponent.h
icinga-app/icinga2.conf
icinga/endpoint.cpp
icinga/endpoint.h
icinga/endpointmanager.cpp
icinga/endpointmanager.h

index 7b65e60c1496dbacf8e47159b36b5cd3a0ce9bcc..cc128c2509b69195c0c46ecbc10faf23383d6657 100644 (file)
@@ -27,15 +27,15 @@ void DiscoveryComponent::Start(void)
        m_DiscoveryEndpoint->RegisterMethodHandler("discovery::Welcome",
                bind_weak(&DiscoveryComponent::WelcomeMessageHandler, shared_from_this()));
 
-       GetEndpointManager()->ForeachEndpoint(bind(&DiscoveryComponent::NewEndpointHandler, this, _1));
+       GetEndpointManager()->ForEachEndpoint(bind(&DiscoveryComponent::NewEndpointHandler, this, _1));
        GetEndpointManager()->OnNewEndpoint += bind_weak(&DiscoveryComponent::NewEndpointHandler, shared_from_this());
 
        GetEndpointManager()->RegisterEndpoint(m_DiscoveryEndpoint);
 
-       m_DiscoveryConnectTimer = make_shared<Timer>();
-       m_DiscoveryConnectTimer->SetInterval(30);
-       m_DiscoveryConnectTimer->OnTimerExpired += bind_weak(&DiscoveryComponent::ReconnectTimerHandler, shared_from_this());
-       m_DiscoveryConnectTimer->Start();
+       m_DiscoveryTimer = make_shared<Timer>();
+       m_DiscoveryTimer->SetInterval(30);
+       m_DiscoveryTimer->OnTimerExpired += bind_weak(&DiscoveryComponent::DiscoveryTimerHandler, shared_from_this());
+       m_DiscoveryTimer->Start();
 }
 
 void DiscoveryComponent::Stop(void)
@@ -73,13 +73,9 @@ int DiscoveryComponent::NewEndpointHandler(const NewEndpointEventArgs& neea)
                neea.Endpoint->RegisterMethodSource("discovery::RegisterComponent");
        }
 
+       /* accept discovery::Welcome messages from any endpoint */
        neea.Endpoint->RegisterMethodSource("discovery::Welcome");
 
-       /* TODO: implement message broker authorisation */
-       neea.Endpoint->RegisterMethodSource("discovery::NewComponent");
-
-       /* TODO: register handler to unregister this endpoint when it's closed */
-
        return 0;
 }
 
@@ -97,8 +93,8 @@ int DiscoveryComponent::DiscoverySourceHandler(const NewMethodEventArgs& nmea, C
 
 int DiscoveryComponent::DiscoveryEndpointHandler(const NewEndpointEventArgs& neea, ComponentDiscoveryInfo::Ptr info) const
 {
-       neea.Endpoint->ForeachMethodSink(bind(&DiscoveryComponent::DiscoverySinkHandler, this, _1, info));
-       neea.Endpoint->ForeachMethodSource(bind(&DiscoveryComponent::DiscoverySourceHandler, this, _1, info));
+       neea.Endpoint->ForEachMethodSink(bind(&DiscoveryComponent::DiscoverySinkHandler, this, _1, info));
+       neea.Endpoint->ForEachMethodSource(bind(&DiscoveryComponent::DiscoverySourceHandler, this, _1, info));
        return 0;
 }
 
@@ -107,8 +103,9 @@ bool DiscoveryComponent::GetComponentDiscoveryInfo(string component, ComponentDi
        if (component == GetEndpointManager()->GetIdentity()) {
                /* Build fake discovery info for ourselves */
                *info = make_shared<ComponentDiscoveryInfo>();
-               GetEndpointManager()->ForeachEndpoint(bind(&DiscoveryComponent::DiscoveryEndpointHandler, this, _1, *info));
+               GetEndpointManager()->ForEachEndpoint(bind(&DiscoveryComponent::DiscoveryEndpointHandler, this, _1, *info));
                
+               (*info)->LastSeen = 0;
                (*info)->Node = GetIcingaApplication()->GetNode();
                (*info)->Service = GetIcingaApplication()->GetService();
 
@@ -146,9 +143,16 @@ int DiscoveryComponent::NewIdentityHandler(const EventArgs& ea)
                        return 0;
                }
 
-               GetEndpointManager()->ForeachEndpoint(bind(&DiscoveryComponent::CheckExistingEndpoint, this, endpoint, _1));
+               GetEndpointManager()->ForEachEndpoint(bind(&DiscoveryComponent::CheckExistingEndpoint, this, endpoint, _1));
        }
 
+       ConfigCollection::Ptr brokerCollection = GetApplication()->GetConfigHive()->GetCollection("broker");
+       if (brokerCollection->GetObject(identity)) {
+               /* accept discovery::NewComponent messages from brokers */
+               endpoint->RegisterMethodSource("discovery::NewComponent");
+       }
+
+
        // we assume the other component _always_ wants
        // discovery::RegisterComponent messages from us
        endpoint->RegisterMethodSink("discovery::RegisterComponent");
@@ -281,8 +285,14 @@ void DiscoveryComponent::SendDiscoveryMessage(string method, string identity, En
 
 void DiscoveryComponent::ProcessDiscoveryMessage(string identity, DiscoveryMessage message)
 {
+       /* ignore discovery messages that are about ourselves */
+       if (identity == GetEndpointManager()->GetIdentity())
+               return;
+
        ComponentDiscoveryInfo::Ptr info = make_shared<ComponentDiscoveryInfo>();
 
+       time(&(info->LastSeen));
+
        message.GetNode(&info->Node);
        message.GetService(&info->Service);
 
@@ -290,6 +300,9 @@ void DiscoveryComponent::ProcessDiscoveryMessage(string identity, DiscoveryMessa
        if (message.GetProvides(&provides)) {
                DictionaryIterator i;
                for (i = provides.GetDictionary()->Begin(); i != provides.GetDictionary()->End(); i++) {
+                       if (IsBroker()) {
+                               /* TODO: Add authorisation checks here */
+                       }
                        info->PublishedMethods.insert(i->second);
                }
        }
@@ -298,6 +311,9 @@ void DiscoveryComponent::ProcessDiscoveryMessage(string identity, DiscoveryMessa
        if (message.GetSubscribes(&subscribes)) {
                DictionaryIterator i;
                for (i = subscribes.GetDictionary()->Begin(); i != subscribes.GetDictionary()->End(); i++) {
+                       if (IsBroker()) {
+                               /* TODO: Add authorisation checks here */
+                       }
                        info->SubscribedMethods.insert(i->second);
                }
        }
@@ -334,24 +350,78 @@ int DiscoveryComponent::NewComponentMessageHandler(const NewRequestEventArgs& nr
 
 int DiscoveryComponent::RegisterComponentMessageHandler(const NewRequestEventArgs& nrea)
 {
+       /* ignore discovery::RegisterComponent messages when we're not a broker */
+       if (!IsBroker())
+               return 0;
+
        DiscoveryMessage message;
        nrea.Request.GetParams(&message);
        ProcessDiscoveryMessage(nrea.Sender->GetIdentity(), message);
+
        return 0;
 }
 
-int DiscoveryComponent::ReconnectTimerHandler(const TimerEventArgs& tea)
+int DiscoveryComponent::BrokerConfigHandler(const EventArgs& ea)
 {
+       ConfigObject::Ptr object = static_pointer_cast<ConfigObject>(ea.Source);
+
        EndpointManager::Ptr endpointManager = GetEndpointManager();
 
+       /* Check if we're already connected to this broker. */
+       if (endpointManager->GetEndpointByIdentity(object->GetName()))
+               return 0;
+
+       string node;
+       if (!object->GetPropertyString("node", &node))
+               throw InvalidArgumentException("'node' property required for 'broker' config object.");
+
+       string service;
+       if (!object->GetPropertyString("service", &service))
+               throw InvalidArgumentException("'service' property required for 'broker' config object.");
+
+       /* reconnect to this broker */
+       endpointManager->AddConnection(node, service);
+
+       return 0;
+}
+
+int DiscoveryComponent::DiscoveryTimerHandler(const TimerEventArgs& tea)
+{
+       EndpointManager::Ptr endpointManager = GetEndpointManager();
+       
+       time_t now;
+       time(&now);
+
+       ConfigCollection::Ptr brokerCollection = GetApplication()->GetConfigHive()->GetCollection("broker");
+       brokerCollection->ForEachObject(bind(&DiscoveryComponent::BrokerConfigHandler, this, _1));
+
        map<string, ComponentDiscoveryInfo::Ptr>::iterator i;
-       for (i = m_Components.begin(); i != m_Components.end(); i++) {
-               Endpoint::Ptr endpoint = endpointManager->GetEndpointByIdentity(i->first);
-               if (endpoint)
+       for (i = m_Components.begin(); i != m_Components.end(); ) {
+               string identity = i->first;
+               ComponentDiscoveryInfo::Ptr info = i->second;
+
+               if (info->LastSeen < now - DiscoveryComponent::RegistrationTTL) {
+                       /* unregister this component if its registration has expired */
+                       i = m_Components.erase(i);
                        continue;
+               }
 
-               ComponentDiscoveryInfo::Ptr info = i->second;
-               endpointManager->AddConnection(info->Node, info->Service);
+               if (IsBroker()) {
+                       /* send discovery message to all connected components to
+                          refresh their TTL for this component */
+                       SendDiscoveryMessage("discovery::NewComponent", i->first, Endpoint::Ptr());
+               }
+
+               Endpoint::Ptr endpoint = endpointManager->GetEndpointByIdentity(identity);
+               if (endpoint) {
+                       /* update LastSeen if we're still connected to this endpoint */
+                       info->LastSeen = now;
+               } else {
+                       /* try and reconnect to this component */
+                       endpointManager->AddConnection(info->Node, info->Service);
+               }
+
+               i++;
        }
 
        return 0;
index 269acf5cb2473eee0c04f475c77f3bc828ad9464..a00b90bff75fc33658f9b81d3f054f920987c827 100644 (file)
@@ -15,6 +15,8 @@ public:
 
        set<string> SubscribedMethods;
        set<string> PublishedMethods;
+
+       time_t LastSeen;
 };
 
 class DiscoveryComponent : public IcingaComponent
@@ -23,7 +25,7 @@ private:
        VirtualEndpoint::Ptr m_DiscoveryEndpoint;
        map<string, ComponentDiscoveryInfo::Ptr> m_Components;
        bool m_Broker;
-       Timer::Ptr m_DiscoveryConnectTimer;
+       Timer::Ptr m_DiscoveryTimer;
 
        int NewEndpointHandler(const NewEndpointEventArgs& neea);
        int NewIdentityHandler(const EventArgs& ea);
@@ -43,12 +45,16 @@ private:
        int DiscoverySinkHandler(const NewMethodEventArgs& nmea, ComponentDiscoveryInfo::Ptr info) const;
        int DiscoverySourceHandler(const NewMethodEventArgs& nmea, ComponentDiscoveryInfo::Ptr info) const;
 
-       int ReconnectTimerHandler(const TimerEventArgs& tea);
+       int DiscoveryTimerHandler(const TimerEventArgs& tea);
 
        bool IsBroker(void) const;
 
        void FinishDiscoverySetup(Endpoint::Ptr endpoint);
 
+       int BrokerConfigHandler(const EventArgs& ea);
+
+       static const int RegistrationTTL = 300;
+
 public:
        virtual string GetName(void) const;
        virtual void Start(void);
index 031dbbd8141108091e4c00ab4d0940dc1e3784ac..54953b7a9a5d208325ab48bcd1b16e401db86e4c 100644 (file)
@@ -16,7 +16,7 @@
        "rpclistener": {
                "kekslistener": { "replicate": "0", "service": "8888" }
        },
-       "rpcconnection": {
-               "keksclient": { "replicate": "0", "node": "127.0.0.1", "service": "7777" }
+       "broker": {
+               "icinga-c1": { "replicate": "0", "node": "127.0.0.1", "service": "7777" }
        }
 }
\ No newline at end of file
index 8847a76c508f821483f5e784d1c510c0d012d06e..9e419279f3e9ccf64d9490d8659302665918222a 100644 (file)
@@ -51,7 +51,7 @@ bool Endpoint::IsMethodSink(string method) const
        return (m_MethodSinks.find(method) != m_MethodSinks.end());
 }
 
-void Endpoint::ForeachMethodSink(function<int (const NewMethodEventArgs&)> callback)
+void Endpoint::ForEachMethodSink(function<int (const NewMethodEventArgs&)> callback)
 {
        for (set<string>::iterator i = m_MethodSinks.begin(); i != m_MethodSinks.end(); i++) {
                NewMethodEventArgs nmea;
@@ -76,7 +76,7 @@ bool Endpoint::IsMethodSource(string method) const
        return (m_MethodSources.find(method) != m_MethodSources.end());
 }
 
-void Endpoint::ForeachMethodSource(function<int (const NewMethodEventArgs&)> callback)
+void Endpoint::ForEachMethodSource(function<int (const NewMethodEventArgs&)> callback)
 {
        for (set<string>::iterator i = m_MethodSources.begin(); i != m_MethodSources.end(); i++) {
                NewMethodEventArgs nmea;
index 92235a97226be84ef3c6193fc480eedf88e10b3e..94d661b2384b0edd26126306e1a502a06106ea45 100644 (file)
@@ -58,8 +58,8 @@ public:
        Event<NewMethodEventArgs> OnNewMethodSink;
        Event<NewMethodEventArgs> OnNewMethodSource;
 
-       void ForeachMethodSink(function<int (const NewMethodEventArgs&)> callback);
-       void ForeachMethodSource(function<int (const NewMethodEventArgs&)> callback);
+       void ForEachMethodSink(function<int (const NewMethodEventArgs&)> callback);
+       void ForEachMethodSource(function<int (const NewMethodEventArgs&)> callback);
 
        void ClearMethodSinks(void);
        void ClearMethodSources(void);
index 254176b08703c66298135635580166b75030d36b..23463e3d93a425228deb33e4fae23d26bfa0c862 100644 (file)
@@ -132,7 +132,7 @@ void EndpointManager::SendMulticastRequest(Endpoint::Ptr sender, const JsonRpcRe
        }
 }
 
-void EndpointManager::ForeachEndpoint(function<int (const NewEndpointEventArgs&)> callback)
+void EndpointManager::ForEachEndpoint(function<int (const NewEndpointEventArgs&)> callback)
 {
        NewEndpointEventArgs neea;
        neea.Source = shared_from_this();
index b543826a9fde9030666c804992bdd7bd967429ad..0729bd53b5e63306a4e58ca73b2d72da64331d1b 100644 (file)
@@ -42,7 +42,7 @@ public:
        void SendAnycastRequest(Endpoint::Ptr sender, const JsonRpcRequest& request, bool fromLocal = true);
        void SendMulticastRequest(Endpoint::Ptr sender, const JsonRpcRequest& request, bool fromLocal = true);
 
-       void ForeachEndpoint(function<int (const NewEndpointEventArgs&)> callback);
+       void ForEachEndpoint(function<int (const NewEndpointEventArgs&)> callback);
 
        Endpoint::Ptr GetEndpointByIdentity(string identity) const;