]> granicus.if.org Git - icinga2/commitdiff
Implemented discovery::Welcome message type.
authorGunnar Beutner <gunnar.beutner@netways.de>
Mon, 7 May 2012 12:52:49 +0000 (14:52 +0200)
committerGunnar Beutner <gunnar.beutner@netways.de>
Mon, 7 May 2012 12:52:49 +0000 (14:52 +0200)
components/discovery/discoverycomponent.cpp
components/discovery/discoverycomponent.h
icinga/endpoint.cpp
icinga/endpoint.h
icinga/endpointmanager.cpp
icinga/endpointmanager.h

index ce88f2028615990f6c0b1832d8537e1c5af8d529..7b65e60c1496dbacf8e47159b36b5cd3a0ce9bcc 100644 (file)
@@ -24,6 +24,8 @@ void DiscoveryComponent::Start(void)
        m_DiscoveryEndpoint->RegisterMethodSource("discovery::RegisterComponent");
        m_DiscoveryEndpoint->RegisterMethodHandler("discovery::NewComponent",
                bind_weak(&DiscoveryComponent::NewComponentMessageHandler, shared_from_this()));
+       m_DiscoveryEndpoint->RegisterMethodHandler("discovery::Welcome",
+               bind_weak(&DiscoveryComponent::WelcomeMessageHandler, shared_from_this()));
 
        GetEndpointManager()->ForeachEndpoint(bind(&DiscoveryComponent::NewEndpointHandler, this, _1));
        GetEndpointManager()->OnNewEndpoint += bind_weak(&DiscoveryComponent::NewEndpointHandler, shared_from_this());
@@ -71,6 +73,8 @@ int DiscoveryComponent::NewEndpointHandler(const NewEndpointEventArgs& neea)
                neea.Endpoint->RegisterMethodSource("discovery::RegisterComponent");
        }
 
+       neea.Endpoint->RegisterMethodSource("discovery::Welcome");
+
        /* TODO: implement message broker authorisation */
        neea.Endpoint->RegisterMethodSource("discovery::NewComponent");
 
@@ -181,11 +185,61 @@ int DiscoveryComponent::NewIdentityHandler(const EventArgs& ea)
                return 0;
        }
 
-       // TODO: send discovery::Welcome message
-       // TODO: add subscriptions/provides to this endpoint
+       FinishDiscoverySetup(endpoint);
+
+       return 0;
+}
+
+int DiscoveryComponent::WelcomeMessageHandler(const NewRequestEventArgs& nrea)
+{
+       Endpoint::Ptr endpoint = nrea.Sender;
+
+       if (endpoint->GetHandshakeCounter() >= 2)
+               return 0;
+
+       endpoint->IncrementHandshakeCounter();
+
+       if (endpoint->GetHandshakeCounter() >= 2) {
+               EventArgs ea;
+               ea.Source = shared_from_this();
+               endpoint->OnSessionEstablished(ea);
+       }
+
        return 0;
 }
 
+void DiscoveryComponent::FinishDiscoverySetup(Endpoint::Ptr endpoint)
+{
+       if (endpoint->GetHandshakeCounter() >= 2)
+               return;
+
+       // we assume the other component _always_ wants
+       // discovery::Welcome messages from us
+       endpoint->RegisterMethodSink("discovery::Welcome");
+       JsonRpcRequest request;
+       request.SetMethod("discovery::Welcome");
+       GetEndpointManager()->SendUnicastRequest(m_DiscoveryEndpoint, endpoint, request);
+
+       ComponentDiscoveryInfo::Ptr info;
+
+       if (GetComponentDiscoveryInfo(endpoint->GetIdentity(), &info)) {
+               set<string>::iterator i;
+               for (i = info->PublishedMethods.begin(); i != info->PublishedMethods.end(); i++)
+                       endpoint->RegisterMethodSource(*i);
+
+               for (i = info->SubscribedMethods.begin(); i != info->SubscribedMethods.end(); i++)
+                       endpoint->RegisterMethodSink(*i);
+       }
+
+       endpoint->IncrementHandshakeCounter();
+
+       if (endpoint->GetHandshakeCounter() >= 2) {
+               EventArgs ea;
+               ea.Source = shared_from_this();
+               endpoint->OnSessionEstablished(ea);
+       }
+}
+
 void DiscoveryComponent::SendDiscoveryMessage(string method, string identity, Endpoint::Ptr recipient)
 {
        JsonRpcRequest request;
@@ -257,7 +311,12 @@ void DiscoveryComponent::ProcessDiscoveryMessage(string identity, DiscoveryMessa
 
        m_Components[identity] = info;
 
-       SendDiscoveryMessage("discovery::NewComponent", identity, Endpoint::Ptr());
+       if (IsBroker())
+               SendDiscoveryMessage("discovery::NewComponent", identity, Endpoint::Ptr());
+
+       Endpoint::Ptr endpoint = GetEndpointManager()->GetEndpointByIdentity(identity);
+       if (endpoint)
+               FinishDiscoverySetup(endpoint);
 }
 
 int DiscoveryComponent::NewComponentMessageHandler(const NewRequestEventArgs& nrea)
@@ -287,7 +346,8 @@ int DiscoveryComponent::ReconnectTimerHandler(const TimerEventArgs& tea)
 
        map<string, ComponentDiscoveryInfo::Ptr>::iterator i;
        for (i = m_Components.begin(); i != m_Components.end(); i++) {
-               if (endpointManager->HasConnectedEndpoint(i->first))
+               Endpoint::Ptr endpoint = endpointManager->GetEndpointByIdentity(i->first);
+               if (endpoint)
                        continue;
 
                ComponentDiscoveryInfo::Ptr info = i->second;
index 2a20c863be83b0ab0e29e10a5bd7c692b6388198..269acf5cb2473eee0c04f475c77f3bc828ad9464 100644 (file)
@@ -31,6 +31,8 @@ private:
        int NewComponentMessageHandler(const NewRequestEventArgs& nrea);
        int RegisterComponentMessageHandler(const NewRequestEventArgs& nrea);
 
+       int WelcomeMessageHandler(const NewRequestEventArgs& nrea);
+
        void SendDiscoveryMessage(string method, string identity, Endpoint::Ptr recipient);
        void ProcessDiscoveryMessage(string identity, DiscoveryMessage message);
 
@@ -45,6 +47,8 @@ private:
 
        bool IsBroker(void) const;
 
+       void FinishDiscoverySetup(Endpoint::Ptr endpoint);
+
 public:
        virtual string GetName(void) const;
        virtual void Start(void);
index 3fc126e3d986820ea3266ca3e93629ae88a79e9a..8847a76c508f821483f5e784d1c510c0d012d06e 100644 (file)
@@ -2,6 +2,11 @@
 
 using namespace icinga;
 
+Endpoint::Endpoint(void)
+{
+       m_HandshakeCounter = false;
+}
+
 string Endpoint::GetIdentity(void) const
 {
        return m_Identity;
@@ -120,3 +125,13 @@ set<string>::const_iterator Endpoint::EndSources(void) const
 {
        return m_MethodSources.end();
 }
+
+void Endpoint::IncrementHandshakeCounter(void)
+{
+       m_HandshakeCounter++;
+}
+
+unsigned short Endpoint::GetHandshakeCounter(void) const
+{
+       return m_HandshakeCounter;
+}
index 3e14051cf53d02c5626bb22b1592073589287fee..92235a97226be84ef3c6193fc480eedf88e10b3e 100644 (file)
@@ -17,6 +17,7 @@ private:
        string m_Identity;
        set<string> m_MethodSinks;
        set<string> m_MethodSources;
+       unsigned short m_HandshakeCounter;
 
        weak_ptr<EndpointManager> m_EndpointManager;
 
@@ -24,12 +25,17 @@ public:
        typedef shared_ptr<Endpoint> Ptr;
        typedef weak_ptr<Endpoint> WeakPtr;
 
+       Endpoint(void);
+
        virtual string GetAddress(void) const = 0;
 
        string GetIdentity(void) const;
        void SetIdentity(string identity);
        bool HasIdentity(void) const;
 
+       void IncrementHandshakeCounter();
+       unsigned short GetHandshakeCounter(void) const;
+
        shared_ptr<EndpointManager> GetEndpointManager(void) const;
        void SetEndpointManager(weak_ptr<EndpointManager> manager);
 
index 19e8d9dc10771c83a3925ca4790daf5efd0deccd..254176b08703c66298135635580166b75030d36b 100644 (file)
@@ -147,13 +147,13 @@ void EndpointManager::ForeachEndpoint(function<int (const NewEndpointEventArgs&)
        }
 }
 
-bool EndpointManager::HasConnectedEndpoint(string identity) const
+Endpoint::Ptr EndpointManager::GetEndpointByIdentity(string identity) const
 {
        list<Endpoint::Ptr>::const_iterator i;
        for (i = m_Endpoints.begin(); i != m_Endpoints.end(); i++) {
                if ((*i)->GetIdentity() == identity)
-                       return true;
+                       return *i;
        }
 
-       return false;
+       return Endpoint::Ptr();
 }
index 2061e36c2bb845cf7988464ec47adb415e27cbbe..b543826a9fde9030666c804992bdd7bd967429ad 100644 (file)
@@ -44,7 +44,7 @@ public:
 
        void ForeachEndpoint(function<int (const NewEndpointEventArgs&)> callback);
 
-       bool HasConnectedEndpoint(string identity) const;
+       Endpoint::Ptr GetEndpointByIdentity(string identity) const;
 
        Event<NewEndpointEventArgs> OnNewEndpoint;
 };