]> granicus.if.org Git - icinga2/commitdiff
cluster: Fix how replaying old messages works.
authorGunnar Beutner <gunnar.beutner@netways.de>
Tue, 3 Sep 2013 08:08:02 +0000 (10:08 +0200)
committerGunnar Beutner <gunnar.beutner@netways.de>
Tue, 3 Sep 2013 08:08:02 +0000 (10:08 +0200)
Makefile.am
components/cluster/clustercomponent.cpp
components/cluster/clustercomponent.h
components/cluster/endpoint.cpp
components/cluster/endpoint.h

index 61a43e237e1261b65eb6ab9d5cf888d035c991a8..183a70d6e507aa6118fdc876897e39524851c323 100644 (file)
@@ -29,7 +29,7 @@ install-data-local:
        $(MKDIR_P) $(DESTDIR)${localstatedir}/log/${PACKAGE}/compat/archives
        $(MKDIR_P) $(DESTDIR)${localstatedir}/cache/${PACKAGE}
        $(MKDIR_P) $(DESTDIR)${localstatedir}/spool/${PACKAGE}/perfdata
-       $(MKDIR_P) $(DESTDIR)${localstatedir}/spool/${PACKAGE}/cluster
+       $(MKDIR_P) $(DESTDIR)${localstatedir}/lib/${PACKAGE}/cluster
        $(MKDIR_P) $(DESTDIR)${localstatedir}/lib/${PACKAGE}
        $(MKDIR_P) $(DESTDIR)${localstatedir}/run/${PACKAGE}
 
index bb0578fe9c5285024434bc787042a7302629b1bd..54a56f22d6a5c93b2af36391ceb858d7421f5108 100644 (file)
 
 #include "cluster/clustercomponent.h"
 #include "cluster/endpoint.h"
+#include "base/netstring.h"
 #include "base/dynamictype.h"
 #include "base/logger_fwd.h"
 #include "base/objectlock.h"
 #include "base/networkstream.h"
+#include "base/application.h"
+#include "base/convert.h"
 #include <boost/smart_ptr/make_shared.hpp>
+#include <fstream>
 
 using namespace icinga;
 
@@ -36,6 +40,8 @@ void ClusterComponent::Start(void)
 {
        DynamicObject::Start();
 
+       OpenLogFile();
+
        /* set up SSL context */
        shared_ptr<X509> cert = GetX509Certificate(GetCertificateFile());
        m_Identity = GetCertificateCN(cert);
@@ -76,7 +82,7 @@ void ClusterComponent::Start(void)
  */
 void ClusterComponent::Stop(void)
 {
-       /* Nothing to do here. */
+       CloseLogFile();
 }
 
 String ClusterComponent::GetCertificateFile(void) const
@@ -192,6 +198,24 @@ void ClusterComponent::AddConnection(const String& node, const String& service)
 
 void ClusterComponent::RelayMessage(const Endpoint::Ptr& except, const Dictionary::Ptr& message, bool persistent)
 {
+       message->Set("ts", Utility::GetTime());
+
+       if (persistent) {
+               Dictionary::Ptr pmessage = boost::make_shared<Dictionary>();
+               pmessage->Set("timestamp", Utility::GetTime());
+               pmessage->Set("message", message);
+
+               ObjectLock olock(this);
+               String json = Value(pmessage).Serialize();
+               NetString::WriteStringToStream(m_LogFile, json);
+               m_LogMessageCount++;
+
+               if (m_LogMessageCount > 250000) {
+                       CloseLogFile();
+                       OpenLogFile();
+               }
+       }
+
        BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
                if (!persistent && !endpoint->IsConnected())
                        continue;
@@ -206,6 +230,83 @@ void ClusterComponent::RelayMessage(const Endpoint::Ptr& except, const Dictionar
        }
 }
 
+String ClusterComponent::GetClusterDir(void) const
+{
+       return Application::GetLocalStateDir() + "/lib/icinga2/cluster/";
+}
+
+void ClusterComponent::OpenLogFile(void)
+{
+       std::ostringstream msgbuf;
+       msgbuf << GetClusterDir() << static_cast<long>(Utility::GetTime());
+       String path = msgbuf.str();
+
+       std::fstream *fp = new std::fstream(path.CStr(), std::fstream::out | std::ofstream::app);
+
+       if (!fp->good()) {
+               Log(LogWarning, "cluster", "Could not open spool file: " + path);
+               return;
+       }
+
+       m_LogFile = boost::make_shared<StdioStream>(fp, true);
+       m_LogMessageCount = 0;
+}
+
+void ClusterComponent::CloseLogFile(void)
+{
+       m_LogFile->Close();
+       m_LogFile.reset();
+}
+
+void ClusterComponent::LogGlobHandler(std::vector<int>& files, const String& file)
+{
+       String name = Utility::BaseName(file);
+       int ts = Convert::ToLong(name);
+       files.push_back(ts);
+}
+
+void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Ptr& stream)
+{
+       int count = 0;
+
+       ASSERT(OwnsLock());
+
+       CloseLogFile();
+
+       std::vector<int> files;
+       Utility::Glob(GetClusterDir() + "*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1));
+       std::sort(files.begin(), files.end());
+
+       BOOST_FOREACH(int ts, files) {
+               std::ostringstream msgbuf;
+               msgbuf << GetClusterDir() << ts;
+               String path = msgbuf.str();
+
+               Log(LogInformation, "cluster", "Replaying log: " + path);
+
+               std::fstream *fp = new std::fstream(path.CStr(), std::fstream::in);
+               StdioStream::Ptr lstream = boost::make_shared<StdioStream>(fp, true);
+
+               String message;
+               while (NetString::ReadStringFromStream(lstream, &message)) {
+                       Dictionary::Ptr pmessage = Value::Deserialize(message);
+
+                       if (pmessage->Get("timestamp") < endpoint->GetLocalLogPosition())
+                               continue;
+
+                       String json = Value(pmessage->Get("message")).Serialize();
+                       NetString::WriteStringToStream(stream, json);
+                       count++;
+               }
+
+               lstream->Close();
+       }
+
+       Log(LogInformation, "cluster", "Replayed " + Convert::ToString(count) + " messages.");
+
+       OpenLogFile();
+}
+
 /**
  * Processes a new client connection.
  *
@@ -231,6 +332,11 @@ void ClusterComponent::NewClientHandler(const Socket::Ptr& client, TlsRole role)
                return;
        }
 
+       {
+               ObjectLock olock(this);
+               ReplayLog(endpoint, tlsStream);
+       }
+
        endpoint->SetClient(tlsStream);
 }
 
@@ -540,6 +646,18 @@ void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Diction
                return;
        }
 
+       if (sender->GetRemoteLogPosition() + 10 < message->Get("ts")) {
+               Dictionary::Ptr lparams = boost::make_shared<Dictionary>();
+               lparams->Set("log_position", message->Get("ts"));
+
+               Dictionary::Ptr lmessage = boost::make_shared<Dictionary>();
+               lmessage->Set("jsonrpc", "2.0");
+               lmessage->Set("method", "cluster::SetLogPosition");
+               lmessage->Set("params", lparams);
+
+               sender->SendMessage(lmessage);
+       }
+
        Dictionary::Ptr params = message->Get("params");
 
        if (!params)
@@ -720,6 +838,8 @@ void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Diction
 
                ObjectLock olock(service);
                service->ClearAcknowledgement(sender->GetName());
+       } else if (message->Get("method") == "cluster::SetLogPosition") {
+               sender->SetRemoteLogPosition(params->Get("log_position"));
        }
 }
 
index e1242095b2a9fc28735c026cf0c9accfd40511d3..391092239f115d9cc95af81fb06b74bc0d542e25 100644 (file)
@@ -27,6 +27,7 @@
 #include "base/tlsstream.h"
 #include "base/utility.h"
 #include "base/tlsutility.h"
+#include "base/stdiostream.h"
 #include "icinga/service.h"
 #include "cluster/endpoint.h"
 
@@ -50,6 +51,7 @@ public:
        String GetBindHost(void) const;
        String GetBindPort(void) const;
        Array::Ptr GetPeers(void) const;
+       String GetClusterDir(void) const;
 
        shared_ptr<SSL_CTX> GetSSLContext(void) const;
        String GetIdentity(void) const;
@@ -81,6 +83,14 @@ private:
 
        void RelayMessage(const Endpoint::Ptr& except, const Dictionary::Ptr& message, bool persistent);
 
+       void OpenLogFile(void);
+       void CloseLogFile(void);
+       static void LogGlobHandler(std::vector<int>& files, const String& file);
+       void ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Ptr& stream);
+
+       StdioStream::Ptr m_LogFile;
+       size_t m_LogMessageCount;
+
        void CheckResultHandler(const Service::Ptr& service, const Dictionary::Ptr& cr, const String& authority);
        void NextCheckChangedHandler(const Service::Ptr& service, double nextCheck, const String& authority);
        void NextNotificationChangedHandler(const Notification::Ptr& notification, double nextCheck, const String& authority);
@@ -97,7 +107,6 @@ private:
        void AcknowledgementSetHandler(const Service::Ptr& service, const String& author, const String& comment, AcknowledgementType type, double expiry, const String& authority);
        void AcknowledgementClearedHandler(const Service::Ptr& service, const String& authority);
        void MessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message);
-
 };
 
 }
index 01ba7b9d01213c7c457c5ce92761a4cb0acce27c..ad74889a36a62839be45c30e342613da0b1eef9e 100644 (file)
@@ -26,7 +26,6 @@
 #include "base/logger_fwd.h"
 #include "config/configitembuilder.h"
 #include <boost/smart_ptr/make_shared.hpp>
-#include <fstream>
 
 using namespace icinga;
 
@@ -35,41 +34,6 @@ REGISTER_TYPE(Endpoint);
 boost::signals2::signal<void (const Endpoint::Ptr&)> Endpoint::OnConnected;
 boost::signals2::signal<void (const Endpoint::Ptr&, const Dictionary::Ptr&)> Endpoint::OnMessageReceived;
 
-void Endpoint::OnConfigLoaded(void)
-{
-       ObjectLock olock(this);
-
-       OpenSpoolFile();
-}
-
-void Endpoint::Stop(void)
-{
-       ObjectLock olock(this);
-
-       CloseSpoolFile();
-}
-
-void Endpoint::OpenSpoolFile(void)
-{
-       ASSERT(OwnsLock());
-
-       String path = GetSpoolPath();
-       std::fstream *fp = new std::fstream(path.CStr(), std::fstream::out | std::ofstream::app);
-
-       if (!fp->good())
-               Log(LogWarning, "cluster", "Could not open spool file: " + path);
-
-       m_SpoolFile = boost::make_shared<StdioStream>(fp, true);
-}
-
-void Endpoint::CloseSpoolFile(void)
-{
-       ASSERT(OwnsLock());
-
-       m_SpoolFile->Close();
-       m_SpoolFile.reset();
-}
-
 /**
  * Checks whether this endpoint is connected.
  *
@@ -92,24 +56,6 @@ void Endpoint::SetClient(const Stream::Ptr& client)
        {
                ObjectLock olock(this);
 
-               CloseSpoolFile();
-
-               String path = GetSpoolPath();
-               std::ifstream *fp = new std::ifstream(path.CStr(), std::ifstream::in);
-
-               while (fp && !fp->eof()) {
-                       char data[1024];
-                       fp->read(data, sizeof(data));
-                       client->Write(data, fp->gcount());
-               }
-
-               fp->close();
-               delete fp;
-
-               (void) unlink(path.CStr());
-
-               OpenSpoolFile();
-
                m_Client = client;
        }
 
@@ -121,15 +67,13 @@ void Endpoint::SetClient(const Stream::Ptr& client)
 
 void Endpoint::SendMessage(const Dictionary::Ptr& message)
 {
-       Stream::Ptr destination;
+       Stream::Ptr client = GetClient();
 
-       if (!IsConnected())
-               destination = m_SpoolFile;
-       else
-               destination = GetClient();
+       if (!client)
+               return;
 
        try {
-               JsonRpc::SendMessage(destination, message);
+               JsonRpc::SendMessage(client, message);
        } catch (const std::exception& ex) {
                std::ostringstream msgbuf;
                msgbuf << "Error while sending JSON-RPC message for endpoint '" << GetName() << "': " << boost::diagnostic_information(ex);
@@ -192,9 +136,24 @@ void Endpoint::SetSeen(double ts)
        m_Seen = ts;
 }
 
-String Endpoint::GetSpoolPath(void) const
+double Endpoint::GetLocalLogPosition(void) const
 {
-       return Application::GetLocalStateDir() + "/spool/icinga2/cluster/" + GetName() + ".ns";
+       return m_LocalLogPosition;
+}
+
+void Endpoint::SetLocalLogPosition(double ts)
+{
+       m_LocalLogPosition = ts;
+}
+
+double Endpoint::GetRemoteLogPosition(void) const
+{
+       return m_RemoteLogPosition;
+}
+
+void Endpoint::SetRemoteLogPosition(double ts)
+{
+       m_RemoteLogPosition = ts;
 }
 
 void Endpoint::InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) const
@@ -206,8 +165,11 @@ void Endpoint::InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes)
                bag->Set("port", m_Port);
        }
 
-       if (attributeTypes & Attribute_State)
+       if (attributeTypes & Attribute_State) {
                bag->Set("seen", m_Seen);
+               bag->Set("local_log_position", m_LocalLogPosition);
+               bag->Set("remote_log_position", m_RemoteLogPosition);
+       }
 }
 
 void Endpoint::InternalDeserialize(const Dictionary::Ptr& bag, int attributeTypes)
@@ -219,6 +181,9 @@ void Endpoint::InternalDeserialize(const Dictionary::Ptr& bag, int attributeType
                m_Port = bag->Get("port");
        }
 
-       if (attributeTypes & Attribute_State)
+       if (attributeTypes & Attribute_State) {
                m_Seen = bag->Get("seen");
+               m_LocalLogPosition = bag->Get("local_log_position");
+               m_RemoteLogPosition = bag->Get("remote_log_position");
+       }
 }
index 3530d671b198ce993845e3cdc7180dfaea374af8..52e28a49322479bb399450640ecd0c8cee96540e 100644 (file)
@@ -22,7 +22,6 @@
 
 #include "base/dynamicobject.h"
 #include "base/stream.h"
-#include "base/stdiostream.h"
 #include <boost/signals2.hpp>
 
 namespace icinga
@@ -57,15 +56,16 @@ public:
        double GetSeen(void) const;
        void SetSeen(double ts);
 
-       String GetSpoolPath(void) const;
+       double GetLocalLogPosition(void) const;
+       void SetLocalLogPosition(double ts);
+
+       double GetRemoteLogPosition(void) const;
+       void SetRemoteLogPosition(double ts);
 
 protected:
        virtual void InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) const;
        virtual void InternalDeserialize(const Dictionary::Ptr& bag, int attributeTypes);
 
-       virtual void OnConfigLoaded(void);
-       virtual void Stop(void);
-
 private:
        Dictionary::Ptr m_Subscriptions;
        String m_Host;
@@ -73,12 +73,10 @@ private:
 
        Stream::Ptr m_Client;
        double m_Seen;
-       StdioStream::Ptr m_SpoolFile;
+       double m_LocalLogPosition;
+       double m_RemoteLogPosition;
 
        void MessageThreadProc(const Stream::Ptr& stream);
-
-       void OpenSpoolFile(void);
-       void CloseSpoolFile(void);
 };
 
 }