]> granicus.if.org Git - icinga2/commitdiff
cluster: Implement persistent messages.
authorGunnar Beutner <gunnar.beutner@netways.de>
Mon, 2 Sep 2013 13:12:20 +0000 (15:12 +0200)
committerGunnar Beutner <gunnar.beutner@netways.de>
Mon, 2 Sep 2013 13:12:20 +0000 (15:12 +0200)
Makefile.am
components/cluster/clustercomponent.cpp
components/cluster/clustercomponent.h
components/cluster/endpoint.cpp
components/cluster/endpoint.h

index 6439f595c8703ed23de0a75d5cadfa2a1b434f38..61a43e237e1261b65eb6ab9d5cf888d035c991a8 100644 (file)
@@ -29,6 +29,7 @@ install-data-local:
        $(MKDIR_P) $(DESTDIR)${localstatedir}/log/${PACKAGE}/compat/archives
        $(MKDIR_P) $(DESTDIR)${localstatedir}/cache/${PACKAGE}
        $(MKDIR_P) $(DESTDIR)${localstatedir}/spool/${PACKAGE}/perfdata
+       $(MKDIR_P) $(DESTDIR)${localstatedir}/spool/${PACKAGE}/cluster
        $(MKDIR_P) $(DESTDIR)${localstatedir}/lib/${PACKAGE}
        $(MKDIR_P) $(DESTDIR)${localstatedir}/run/${PACKAGE}
 
index 2f595f9d247bebf6016fdfbbc55a4e8738e7f07e..bb0578fe9c5285024434bc787042a7302629b1bd 100644 (file)
@@ -190,6 +190,22 @@ void ClusterComponent::AddConnection(const String& node, const String& service)
        Utility::QueueAsyncCallback(boost::bind(&ClusterComponent::NewClientHandler, this, client, TlsRoleClient));
 }
 
+void ClusterComponent::RelayMessage(const Endpoint::Ptr& except, const Dictionary::Ptr& message, bool persistent)
+{
+       BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
+               if (!persistent && !endpoint->IsConnected())
+                       continue;
+
+               if (endpoint == except)
+                       continue;
+
+               if (endpoint->GetName() == GetIdentity())
+                       continue;
+
+               endpoint->SendMessage(message);
+       }
+}
+
 /**
  * Processes a new client connection.
  *
@@ -225,9 +241,7 @@ void ClusterComponent::ClusterTimerHandler(void)
        message->Set("jsonrpc", "2.0");
        message->Set("method", "cluster::HeartBeat");
 
-       BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
-               endpoint->SendMessage(message);
-       }
+       RelayMessage(Endpoint::Ptr(), message, false);
 
        Array::Ptr peers = GetPeers();
 
@@ -274,9 +288,7 @@ void ClusterComponent::CheckResultHandler(const Service::Ptr& service, const Dic
        message->Set("method", "cluster::CheckResult");
        message->Set("params", params);
 
-       BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
-               endpoint->SendMessage(message);
-       }
+       RelayMessage(Endpoint::Ptr(), message, true);
 }
 
 void ClusterComponent::NextCheckChangedHandler(const Service::Ptr& service, double nextCheck, const String& authority)
@@ -293,9 +305,7 @@ void ClusterComponent::NextCheckChangedHandler(const Service::Ptr& service, doub
        message->Set("method", "cluster::SetNextCheck");
        message->Set("params", params);
 
-       BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
-               endpoint->SendMessage(message);
-       }
+       RelayMessage(Endpoint::Ptr(), message, true);
 }
 
 void ClusterComponent::NextNotificationChangedHandler(const Notification::Ptr& notification, double nextNotification, const String& authority)
@@ -312,9 +322,7 @@ void ClusterComponent::NextNotificationChangedHandler(const Notification::Ptr& n
        message->Set("method", "cluster::SetNextNotification");
        message->Set("params", params);
 
-       BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
-               endpoint->SendMessage(message);
-       }
+       RelayMessage(Endpoint::Ptr(), message, true);
 }
 
 void ClusterComponent::ForceNextCheckChangedHandler(const Service::Ptr& service, bool forced, const String& authority)
@@ -331,9 +339,7 @@ void ClusterComponent::ForceNextCheckChangedHandler(const Service::Ptr& service,
        message->Set("method", "cluster::SetForceNextCheck");
        message->Set("params", params);
 
-       BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
-               endpoint->SendMessage(message);
-       }
+       RelayMessage(Endpoint::Ptr(), message, true);
 }
 
 void ClusterComponent::ForceNextNotificationChangedHandler(const Service::Ptr& service, bool forced, const String& authority)
@@ -350,9 +356,7 @@ void ClusterComponent::ForceNextNotificationChangedHandler(const Service::Ptr& s
        message->Set("method", "cluster::SetForceNextNotification");
        message->Set("params", params);
 
-       BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
-               endpoint->SendMessage(message);
-       }
+       RelayMessage(Endpoint::Ptr(), message, true);
 }
 
 void ClusterComponent::EnableActiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority)
@@ -369,9 +373,7 @@ void ClusterComponent::EnableActiveChecksChangedHandler(const Service::Ptr& serv
        message->Set("method", "cluster::SetEnableActiveChecks");
        message->Set("params", params);
 
-       BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
-               endpoint->SendMessage(message);
-       }
+       RelayMessage(Endpoint::Ptr(), message, true);
 }
 
 void ClusterComponent::EnablePassiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority)
@@ -388,9 +390,7 @@ void ClusterComponent::EnablePassiveChecksChangedHandler(const Service::Ptr& ser
        message->Set("method", "cluster::SetEnablePassiveChecks");
        message->Set("params", params);
 
-       BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
-               endpoint->SendMessage(message);
-       }
+       RelayMessage(Endpoint::Ptr(), message, true);
 }
 
 void ClusterComponent::EnableNotificationsChangedHandler(const Service::Ptr& service, bool enabled, const String& authority)
@@ -407,9 +407,7 @@ void ClusterComponent::EnableNotificationsChangedHandler(const Service::Ptr& ser
        message->Set("method", "cluster::SetEnableNotifications");
        message->Set("params", params);
 
-       BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
-               endpoint->SendMessage(message);
-       }
+       RelayMessage(Endpoint::Ptr(), message, true);
 }
 
 void ClusterComponent::EnableFlappingChangedHandler(const Service::Ptr& service, bool enabled, const String& authority)
@@ -426,9 +424,7 @@ void ClusterComponent::EnableFlappingChangedHandler(const Service::Ptr& service,
        message->Set("method", "cluster::SetEnableFlapping");
        message->Set("params", params);
 
-       BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
-               endpoint->SendMessage(message);
-       }
+       RelayMessage(Endpoint::Ptr(), message, true);
 }
 
 void ClusterComponent::CommentAddedHandler(const Service::Ptr& service, const Dictionary::Ptr& comment, const String& authority)
@@ -445,9 +441,7 @@ void ClusterComponent::CommentAddedHandler(const Service::Ptr& service, const Di
        message->Set("method", "cluster::AddComment");
        message->Set("params", params);
 
-       BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
-               endpoint->SendMessage(message);
-       }
+       RelayMessage(Endpoint::Ptr(), message, true);
 }
 
 void ClusterComponent::CommentRemovedHandler(const Service::Ptr& service, const Dictionary::Ptr& comment, const String& authority)
@@ -464,9 +458,7 @@ void ClusterComponent::CommentRemovedHandler(const Service::Ptr& service, const
        message->Set("method", "cluster::RemoveComment");
        message->Set("params", params);
 
-       BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
-               endpoint->SendMessage(message);
-       }
+       RelayMessage(Endpoint::Ptr(), message, true);
 }
 
 void ClusterComponent::DowntimeAddedHandler(const Service::Ptr& service, const Dictionary::Ptr& downtime, const String& authority)
@@ -483,9 +475,7 @@ void ClusterComponent::DowntimeAddedHandler(const Service::Ptr& service, const D
        message->Set("method", "cluster::AddDowntime");
        message->Set("params", params);
 
-       BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
-               endpoint->SendMessage(message);
-       }
+       RelayMessage(Endpoint::Ptr(), message, true);
 }
 
 void ClusterComponent::DowntimeRemovedHandler(const Service::Ptr& service, const Dictionary::Ptr& downtime, const String& authority)
@@ -502,9 +492,7 @@ void ClusterComponent::DowntimeRemovedHandler(const Service::Ptr& service, const
        message->Set("method", "cluster::RemoveDowntime");
        message->Set("params", params);
 
-       BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
-               endpoint->SendMessage(message);
-       }
+       RelayMessage(Endpoint::Ptr(), message, true);
 }
 
 void ClusterComponent::AcknowledgementSetHandler(const Service::Ptr& service, const String& author, const String& comment, AcknowledgementType type, double expiry, const String& authority)
@@ -524,9 +512,7 @@ void ClusterComponent::AcknowledgementSetHandler(const Service::Ptr& service, co
        message->Set("method", "cluster::SetAcknowledgement");
        message->Set("params", params);
 
-       BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
-               endpoint->SendMessage(message);
-       }
+       RelayMessage(Endpoint::Ptr(), message, true);
 }
 
 void ClusterComponent::AcknowledgementClearedHandler(const Service::Ptr& service, const String& authority)
@@ -542,17 +528,12 @@ void ClusterComponent::AcknowledgementClearedHandler(const Service::Ptr& service
        message->Set("method", "cluster::ClearAcknowledgement");
        message->Set("params", params);
 
-       BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
-               endpoint->SendMessage(message);
-       }
+       RelayMessage(Endpoint::Ptr(), message, true);
 }
 
 void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message)
 {
-       BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
-               if (sender != endpoint)
-                       endpoint->SendMessage(message);
-       }
+       RelayMessage(sender, message, true);
 
        if (message->Get("method") == "cluster::HeartBeat") {
                sender->SetSeen(Utility::GetTime());
index 7254feb0348e397b72495e68d738631d545f2711..e1242095b2a9fc28735c026cf0c9accfd40511d3 100644 (file)
@@ -79,6 +79,8 @@ private:
        void NewClientHandler(const Socket::Ptr& client, TlsRole role);
        void ListenerThreadProc(const Socket::Ptr& server);
 
+       void RelayMessage(const Endpoint::Ptr& except, const Dictionary::Ptr& message, bool persistent);
+
        void CheckResultHandler(const Service::Ptr& service, const Dictionary::Ptr& cr, const String& authority);
        void NextCheckChangedHandler(const Service::Ptr& service, double nextCheck, const String& authority);
        void NextNotificationChangedHandler(const Notification::Ptr& notification, double nextCheck, const String& authority);
index 261c4b8af5fba43183759d21bceb4e89c4e5e463..01ba7b9d01213c7c457c5ce92761a4cb0acce27c 100644 (file)
@@ -26,6 +26,7 @@
 #include "base/logger_fwd.h"
 #include "config/configitembuilder.h"
 #include <boost/smart_ptr/make_shared.hpp>
+#include <fstream>
 
 using namespace icinga;
 
@@ -34,6 +35,41 @@ REGISTER_TYPE(Endpoint);
 boost::signals2::signal<void (const Endpoint::Ptr&)> Endpoint::OnConnected;
 boost::signals2::signal<void (const Endpoint::Ptr&, const Dictionary::Ptr&)> Endpoint::OnMessageReceived;
 
+void Endpoint::OnConfigLoaded(void)
+{
+       ObjectLock olock(this);
+
+       OpenSpoolFile();
+}
+
+void Endpoint::Stop(void)
+{
+       ObjectLock olock(this);
+
+       CloseSpoolFile();
+}
+
+void Endpoint::OpenSpoolFile(void)
+{
+       ASSERT(OwnsLock());
+
+       String path = GetSpoolPath();
+       std::fstream *fp = new std::fstream(path.CStr(), std::fstream::out | std::ofstream::app);
+
+       if (!fp->good())
+               Log(LogWarning, "cluster", "Could not open spool file: " + path);
+
+       m_SpoolFile = boost::make_shared<StdioStream>(fp, true);
+}
+
+void Endpoint::CloseSpoolFile(void)
+{
+       ASSERT(OwnsLock());
+
+       m_SpoolFile->Close();
+       m_SpoolFile.reset();
+}
+
 /**
  * Checks whether this endpoint is connected.
  *
@@ -56,6 +92,24 @@ void Endpoint::SetClient(const Stream::Ptr& client)
        {
                ObjectLock olock(this);
 
+               CloseSpoolFile();
+
+               String path = GetSpoolPath();
+               std::ifstream *fp = new std::ifstream(path.CStr(), std::ifstream::in);
+
+               while (fp && !fp->eof()) {
+                       char data[1024];
+                       fp->read(data, sizeof(data));
+                       client->Write(data, fp->gcount());
+               }
+
+               fp->close();
+               delete fp;
+
+               (void) unlink(path.CStr());
+
+               OpenSpoolFile();
+
                m_Client = client;
        }
 
@@ -67,13 +121,15 @@ void Endpoint::SetClient(const Stream::Ptr& client)
 
 void Endpoint::SendMessage(const Dictionary::Ptr& message)
 {
-       if (!IsConnected()) {
-               // TODO: persist the message
-               return;
-       }
+       Stream::Ptr destination;
+
+       if (!IsConnected())
+               destination = m_SpoolFile;
+       else
+               destination = GetClient();
 
        try {
-               JsonRpc::SendMessage(GetClient(), message);
+               JsonRpc::SendMessage(destination, message);
        } catch (const std::exception& ex) {
                std::ostringstream msgbuf;
                msgbuf << "Error while sending JSON-RPC message for endpoint '" << GetName() << "': " << boost::diagnostic_information(ex);
@@ -136,6 +192,11 @@ void Endpoint::SetSeen(double ts)
        m_Seen = ts;
 }
 
+String Endpoint::GetSpoolPath(void) const
+{
+       return Application::GetLocalStateDir() + "/spool/icinga2/cluster/" + GetName() + ".ns";
+}
+
 void Endpoint::InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) const
 {
        DynamicObject::InternalSerialize(bag, attributeTypes);
index b52dba08ce5b58e991ad2a904cfd5df76b33f308..3530d671b198ce993845e3cdc7180dfaea374af8 100644 (file)
@@ -22,6 +22,7 @@
 
 #include "base/dynamicobject.h"
 #include "base/stream.h"
+#include "base/stdiostream.h"
 #include <boost/signals2.hpp>
 
 namespace icinga
@@ -56,10 +57,15 @@ public:
        double GetSeen(void) const;
        void SetSeen(double ts);
 
+       String GetSpoolPath(void) const;
+
 protected:
        virtual void InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) const;
        virtual void InternalDeserialize(const Dictionary::Ptr& bag, int attributeTypes);
 
+       virtual void OnConfigLoaded(void);
+       virtual void Stop(void);
+
 private:
        Dictionary::Ptr m_Subscriptions;
        String m_Host;
@@ -67,8 +73,12 @@ private:
 
        Stream::Ptr m_Client;
        double m_Seen;
+       StdioStream::Ptr m_SpoolFile;
 
        void MessageThreadProc(const Stream::Ptr& stream);
+
+       void OpenSpoolFile(void);
+       void CloseSpoolFile(void);
 };
 
 }