]> granicus.if.org Git - icinga2/commitdiff
Fixed subscription code.
authorGunnar Beutner <gunnar.beutner@netways.de>
Wed, 25 Apr 2012 18:35:37 +0000 (20:35 +0200)
committerGunnar Beutner <gunnar.beutner@netways.de>
Wed, 25 Apr 2012 18:35:37 +0000 (20:35 +0200)
components/configrpc/configrpccomponent.cpp
components/configrpc/configrpccomponent.h
icinga-app/icinga.conf
icinga/discoverycomponent.cpp
icinga/discoverycomponent.h
icinga/endpoint.cpp
icinga/endpoint.h
icinga/subscriptioncomponent.cpp

index b4a82f86cf46ec032d11a0c4c75599205d03419e..7cfdf069b72b34258c95b7121e72b5796aa70ba3 100644 (file)
@@ -28,6 +28,9 @@ void ConfigRpcComponent::Start(void)
                m_ConfigRpcEndpoint->RegisterMethodSource("config::PropertyChanged");
        }
 
+       m_ConfigRpcEndpoint->RegisterMethodHandler("message::Welcome",
+           bind_weak(&ConfigRpcComponent::WelcomeMessageHandler, shared_from_this()));
+
        m_ConfigRpcEndpoint->RegisterMethodHandler("config::ObjectCreated",
            bind_weak(&ConfigRpcComponent::RemoteObjectUpdatedHandler, shared_from_this()));
        m_ConfigRpcEndpoint->RegisterMethodHandler("config::ObjectRemoved",
@@ -36,9 +39,6 @@ void ConfigRpcComponent::Start(void)
            bind_weak(&ConfigRpcComponent::RemoteObjectUpdatedHandler, shared_from_this()));
 
        endpointManager->RegisterEndpoint(m_ConfigRpcEndpoint);
-
-       endpointManager->OnNewEndpoint += bind_weak(&ConfigRpcComponent::NewEndpointHandler, shared_from_this());
-       endpointManager->ForeachEndpoint(bind(&ConfigRpcComponent::NewEndpointHandler, this, _1));
 }
 
 void ConfigRpcComponent::Stop(void)
@@ -46,20 +46,12 @@ void ConfigRpcComponent::Stop(void)
        // TODO: implement
 }
 
-int ConfigRpcComponent::NewEndpointHandler(const NewEndpointEventArgs& ea)
-{
-       ea.Endpoint->OnSessionEstablished += bind_weak(&ConfigRpcComponent::SessionEstablishedHandler, shared_from_this());
-
-       return 0;
-}
-
-int ConfigRpcComponent::SessionEstablishedHandler(const EventArgs& ea)
+int ConfigRpcComponent::WelcomeMessageHandler(const NewRequestEventArgs& ea)
 {
        JsonRpcRequest request;
        request.SetMethod("config::FetchObjects");
 
-       Endpoint::Ptr endpoint = static_pointer_cast<Endpoint>(ea.Source);
-       GetEndpointManager()->SendUnicastRequest(m_ConfigRpcEndpoint, endpoint, request);
+       GetEndpointManager()->SendUnicastRequest(m_ConfigRpcEndpoint, ea.Sender, request);
 
        return 0;
 }
index 27ce4bb3bba63dd5bf3dd0b252f76e8688a22e45..823c61d584657b156f6a7024140a5fb5f42dfc3e 100644 (file)
@@ -9,8 +9,7 @@ class ConfigRpcComponent : public IcingaComponent
 private:
        VirtualEndpoint::Ptr m_ConfigRpcEndpoint;
 
-       int NewEndpointHandler(const NewEndpointEventArgs& ea);
-       int SessionEstablishedHandler(const EventArgs& ea);
+       int WelcomeMessageHandler(const NewRequestEventArgs& ea);
 
        int LocalObjectCreatedHandler(const EventArgs& ea);
        int LocalObjectRemovedHandler(const EventArgs& ea);
index 38d761f359fcdbb8b17fbc6e6061ffded69d83b2..c7ec54767ed366b9786b4a438687ffc35e4cd942 100644 (file)
@@ -4,7 +4,10 @@
                "demo": { "replicate": "0" }
        },
        "rpcconnection": {
-               "kekslistener": { "replicate": "0", "hostname": "10.0.10.14", "port": "7777" }
+               "kekslistener": { "replicate": "0", "hostname": "127.0.0.1", "port": "7777" }
+       },
+       "rpclistener": {
+               "kekslistener": { "replicate": "0", "port": "7777" }
        },
        "host": {
                "localhost": { "ipaddr": "127.0.0.1" }
index d2fd5bee73a8a84a788a35a7d469aa3967939075..2b09a07059ed099c526b226dc784a53aa35dc5a2 100644 (file)
@@ -10,6 +10,9 @@ string DiscoveryComponent::GetName(void) const
 void DiscoveryComponent::Start(void)
 {
        m_DiscoveryEndpoint = make_shared<VirtualEndpoint>();
+       m_DiscoveryEndpoint->RegisterMethodHandler("message::Welcome",
+               bind_weak(&DiscoveryComponent::GetPeersMessageHandler, shared_from_this()));
+
        m_DiscoveryEndpoint->RegisterMethodSource("discovery::PeerAvailable");
        m_DiscoveryEndpoint->RegisterMethodHandler("discovery::GetPeers",
                bind_weak(&DiscoveryComponent::GetPeersMessageHandler, shared_from_this()));
@@ -25,22 +28,11 @@ void DiscoveryComponent::Stop(void)
                mgr->UnregisterEndpoint(m_DiscoveryEndpoint);
 }
 
-int DiscoveryComponent::NewEndpointHandler(const NewEndpointEventArgs& neea)
-{
-       neea.Endpoint->OnSessionEstablished += bind_weak(&DiscoveryComponent::SessionEstablishedHandler, shared_from_this());
-
-       /* TODO: register handler for new sink/source */
-
-       return 0;
-}
-
-int DiscoveryComponent::SessionEstablishedHandler(const EventArgs& neea)
+int DiscoveryComponent::WelcomeMessageHandler(const NewRequestEventArgs& neea)
 {
        JsonRpcRequest request;
        request.SetMethod("discovery::GetPeers");
-
-       Endpoint::Ptr endpoint = static_pointer_cast<Endpoint>(neea.Source);
-       GetEndpointManager()->SendUnicastRequest(m_DiscoveryEndpoint, endpoint, request);
+       GetEndpointManager()->SendUnicastRequest(m_DiscoveryEndpoint, neea.Sender, request);
 
        /* TODO: send information about this client to all other clients */
 
index 3e8325e95e2e4a5fa4797d97c4cb17abfd7d072b..7d5e65008bc194320901d1b9036e10834ea8bf51 100644 (file)
@@ -11,8 +11,7 @@ private:
 
        IcingaApplication::Ptr GetIcingaApplication(void) const;
 
-       int NewEndpointHandler(const NewEndpointEventArgs& neea);
-       int SessionEstablishedHandler(const EventArgs& neea);
+       int WelcomeMessageHandler(const NewRequestEventArgs& neea);
        int GetPeersMessageHandler(const NewRequestEventArgs& nrea);
 
 public:
index 7d5e82f4ff86ad7db1c24e99bfa2ce46e3e6e4ce..98165032980a147b7d9d9e0950ca62990aeaa45b 100644 (file)
@@ -14,8 +14,6 @@ void Endpoint::SetIdentity(string identity)
        EventArgs ea;
        ea.Source = shared_from_this();
        OnIdentityChanged(ea);
-
-       OnSessionEstablished(ea);
 }
 
 bool Endpoint::HasIdentity(void) const
index 0f22867de12bfa187dbebf06c0a055e45fc89330..1980fdd48e89744339cba7b9e9113b2c9fd12fb2 100644 (file)
@@ -66,7 +66,6 @@ public:
        int CountMethodSources(void) const;
 
        Event<EventArgs> OnIdentityChanged;
-       Event<EventArgs> OnSessionEstablished;
 };
 
 }
index 190b45ea094f504f90fa45d2d0c509f22f1ab617..c4b03362e5c04c72aa8157d80564d32c0905d8e3 100644 (file)
@@ -12,6 +12,7 @@ void SubscriptionComponent::Start(void)
        m_SubscriptionEndpoint = make_shared<VirtualEndpoint>();
        m_SubscriptionEndpoint->RegisterMethodHandler("message::Subscribe", bind_weak(&SubscriptionComponent::SubscribeMessageHandler, shared_from_this()));
        m_SubscriptionEndpoint->RegisterMethodHandler("message::Provide", bind_weak(&SubscriptionComponent::ProvideMessageHandler, shared_from_this()));
+       m_SubscriptionEndpoint->RegisterMethodSource("message::Welcome");
        m_SubscriptionEndpoint->RegisterMethodSource("message::Subscribe");
        m_SubscriptionEndpoint->RegisterMethodSource("message::Provide");
 
@@ -38,7 +39,6 @@ int SubscriptionComponent::SyncSubscription(Endpoint::Ptr target, string type, c
        SubscriptionMessage subscriptionMessage;
        subscriptionMessage.SetMethod(nmea.Method);
        request.SetParams(subscriptionMessage);
-
        GetEndpointManager()->SendUnicastRequest(m_SubscriptionEndpoint, target, request);
 
        return 0;
@@ -68,11 +68,19 @@ int SubscriptionComponent::NewEndpointHandler(const NewEndpointEventArgs& neea)
        neea.Endpoint->AddAllowedMethodSinkPrefix("message::");
        neea.Endpoint->AddAllowedMethodSourcePrefix("message::");
 
+       /* we just assume the peer wants those messages */
+       neea.Endpoint->RegisterMethodSink("message::Welcome");
        neea.Endpoint->RegisterMethodSink("message::Subscribe");
        neea.Endpoint->RegisterMethodSink("message::Provide");
 
        GetEndpointManager()->ForeachEndpoint(bind(&SubscriptionComponent::SyncSubscriptions, this, neea.Endpoint, _1));
 
+       /* signal the peer that we're done syncing subscriptions and are now
+        * ready to accept messages. */
+       JsonRpcRequest request;
+       request.SetMethod("message::Welcome");
+       GetEndpointManager()->SendUnicastRequest(m_SubscriptionEndpoint, neea.Endpoint, request);
+
        return 0;
 }