]> granicus.if.org Git - icinga2/commitdiff
Implement support for anonymous multicast messages.
authorGunnar Beutner <gunnar.beutner@netways.de>
Tue, 22 Jan 2013 09:14:52 +0000 (10:14 +0100)
committerGunnar Beutner <gunnar.beutner@netways.de>
Tue, 22 Jan 2013 09:14:52 +0000 (10:14 +0100)
Fixes #3545

components/demo/democomponent.cpp
components/replication/replicationcomponent.cpp
components/replication/replicationcomponent.h
lib/remoting/endpointmanager.cpp
lib/remoting/endpointmanager.h

index b72b02f1ecb48316b0b2ae1350c9cfdece3d856c..19145330cf0771800dbb080bb280a1e54c5fac62 100644 (file)
@@ -69,7 +69,7 @@ void DemoComponent::HelloWorldRequestHandler(const Endpoint::Ptr& sender,
     const RequestMessage& request)
 {
        Logger::Write(LogInformation, "demo", "Got 'hello world' from identity=" +
-           sender->GetName());
+           (sender ? sender->GetName() : "(anonymous)"));
 }
 
 EXPORT_COMPONENT(demo, DemoComponent);
index ec717566a1e05a2a3e5e0f6abee2f102e2e875eb..ef2c71767ca665a8f3710c79715fedaf55bedbe3 100644 (file)
@@ -35,13 +35,13 @@ void ReplicationComponent::Start(void)
        Endpoint::OnConnected.connect(boost::bind(&ReplicationComponent::EndpointConnectedHandler, this, _1));
        
        m_Endpoint->RegisterTopicHandler("config::ObjectUpdate",
-           boost::bind(&ReplicationComponent::RemoteObjectUpdateHandler, this, _2, _3));
+           boost::bind(&ReplicationComponent::RemoteObjectUpdateHandler, this, _3));
        m_Endpoint->RegisterTopicHandler("config::ObjectRemoved",
            boost::bind(&ReplicationComponent::RemoteObjectRemovedHandler, this, _3));
 
        /* service status */
        m_Endpoint->RegisterTopicHandler("checker::ServiceStateChange",
-           boost::bind(&ReplicationComponent::ServiceStateChangeRequestHandler, _2, _3));
+           boost::bind(&ReplicationComponent::ServiceStateChangeRequestHandler, _3));
 }
 
 /**
@@ -52,7 +52,7 @@ void ReplicationComponent::Stop(void)
        m_Endpoint->Unregister();
 }
 
-void ReplicationComponent::ServiceStateChangeRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request)
+void ReplicationComponent::ServiceStateChangeRequestHandler(const RequestMessage& request)
 {
        ServiceStateChangeMessage params;
        if (!request.GetParams(&params))
@@ -163,7 +163,7 @@ void ReplicationComponent::TransactionClosingHandler(const set<DynamicObject::Pt
        }
 }
 
-void ReplicationComponent::RemoteObjectUpdateHandler(const Endpoint::Ptr& sender, const RequestMessage& request)
+void ReplicationComponent::RemoteObjectUpdateHandler(const RequestMessage& request)
 {
        MessagePart params;
        if (!request.GetParams(&params))
index 8b8dfc30dec60486dc5b3f12f361008468efbe47..136770d5d0078b66137476e19bd839a9a230d481 100644 (file)
@@ -35,7 +35,7 @@ public:
 private:
        Endpoint::Ptr m_Endpoint;
 
-       static void ServiceStateChangeRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request);
+       static void ServiceStateChangeRequestHandler(const RequestMessage& request);
 
        void EndpointConnectedHandler(const Endpoint::Ptr& endpoint);
 
@@ -43,7 +43,7 @@ private:
        void LocalObjectUnregisteredHandler(const DynamicObject::Ptr& object);
        void TransactionClosingHandler(const set<DynamicObject::Ptr>& modifiedObjects);
 
-       void RemoteObjectUpdateHandler(const Endpoint::Ptr& sender, const RequestMessage& request);
+       void RemoteObjectUpdateHandler(const RequestMessage& request);
        void RemoteObjectRemovedHandler(const RequestMessage& request);
 
        static RequestMessage MakeObjectMessage(const DynamicObject::Ptr& object,
index 799f3c2f8327240f22a7b21d10292201891caf51..077b10643f45faee6781d34259bd9f7ac09e3261 100644 (file)
@@ -184,18 +184,31 @@ void EndpointManager::ClientClosedHandler(const Stream::Ptr& client)
        m_PendingClients.erase(tlsStream);
 }
 
+/**
+ * Sends an anonymous unicast message to the specified recipient.
+ *
+ * @param recipient The recipient of the message.
+ * @param message The message.
+ */
+void EndpointManager::SendUnicastMessage(const Endpoint::Ptr& recipient,
+    const MessagePart& message)
+{
+       SendUnicastMessage(Endpoint::Ptr(), recipient, message);
+}
+
 /**
  * Sends a unicast message to the specified recipient.
  *
  * @param sender The sender of the message.
  * @param recipient The recipient of the message.
- * @param message The request.
+ * @param message The message.
  */
 void EndpointManager::SendUnicastMessage(const Endpoint::Ptr& sender,
     const Endpoint::Ptr& recipient, const MessagePart& message)
 {
-       /* don't forward messages between non-local endpoints */
-       if (!sender->IsLocal() && !recipient->IsLocal())
+       /* don't forward messages between non-local endpoints, assume that
+        * anonymous senders (sender == null) are local */
+       if ((sender && !sender->IsLocal()) && !recipient->IsLocal())
                return;
 
        if (ResponseMessage::IsResponseMessage(message))
@@ -237,6 +250,17 @@ void EndpointManager::SendAnycastMessage(const Endpoint::Ptr& sender,
        SendUnicastMessage(sender, recipient, message);
 }
 
+/**
+ * Sends an anonymous message to all recipients who have a subscription for the
+ * message#s topic.
+ *
+ * @param message The message.
+ */
+void EndpointManager::SendMulticastMessage(const RequestMessage& message)
+{
+       SendMulticastMessage(Endpoint::Ptr(), message);
+}
+
 /**
  * Sends a message to all recipients who have a subscription for the
  * message's topic.
index b491846a10ef5b8d73728dcb2c82c34cb2430bbe..120f9f57b5913c468fa7be0d07a0df9310d6fe4e 100644 (file)
@@ -47,8 +47,10 @@ public:
        void AddListener(const String& service);
        void AddConnection(const String& node, const String& service);
 
+       void SendUnicastMessage(const Endpoint::Ptr& recipient, const MessagePart& message);
        void SendUnicastMessage(const Endpoint::Ptr& sender, const Endpoint::Ptr& recipient, const MessagePart& message);
        void SendAnycastMessage(const Endpoint::Ptr& sender, const RequestMessage& message);
+       void SendMulticastMessage(const RequestMessage& message);
        void SendMulticastMessage(const Endpoint::Ptr& sender, const RequestMessage& message);
 
        typedef function<void(const EndpointManager::Ptr&, const Endpoint::Ptr, const RequestMessage&, const ResponseMessage&, bool TimedOut)> APICallback;