From: Gunnar Beutner Date: Tue, 3 Sep 2013 08:08:02 +0000 (+0200) Subject: cluster: Fix how replaying old messages works. X-Git-Tag: v0.0.3~618 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=b844ea1bcba01a0c88c23bb67fbcbc397a4fe0ea;p=icinga2 cluster: Fix how replaying old messages works. --- diff --git a/Makefile.am b/Makefile.am index 61a43e237..183a70d6e 100644 --- a/Makefile.am +++ b/Makefile.am @@ -29,7 +29,7 @@ install-data-local: $(MKDIR_P) $(DESTDIR)${localstatedir}/log/${PACKAGE}/compat/archives $(MKDIR_P) $(DESTDIR)${localstatedir}/cache/${PACKAGE} $(MKDIR_P) $(DESTDIR)${localstatedir}/spool/${PACKAGE}/perfdata - $(MKDIR_P) $(DESTDIR)${localstatedir}/spool/${PACKAGE}/cluster + $(MKDIR_P) $(DESTDIR)${localstatedir}/lib/${PACKAGE}/cluster $(MKDIR_P) $(DESTDIR)${localstatedir}/lib/${PACKAGE} $(MKDIR_P) $(DESTDIR)${localstatedir}/run/${PACKAGE} diff --git a/components/cluster/clustercomponent.cpp b/components/cluster/clustercomponent.cpp index bb0578fe9..54a56f22d 100644 --- a/components/cluster/clustercomponent.cpp +++ b/components/cluster/clustercomponent.cpp @@ -19,11 +19,15 @@ #include "cluster/clustercomponent.h" #include "cluster/endpoint.h" +#include "base/netstring.h" #include "base/dynamictype.h" #include "base/logger_fwd.h" #include "base/objectlock.h" #include "base/networkstream.h" +#include "base/application.h" +#include "base/convert.h" #include +#include using namespace icinga; @@ -36,6 +40,8 @@ void ClusterComponent::Start(void) { DynamicObject::Start(); + OpenLogFile(); + /* set up SSL context */ shared_ptr cert = GetX509Certificate(GetCertificateFile()); m_Identity = GetCertificateCN(cert); @@ -76,7 +82,7 @@ void ClusterComponent::Start(void) */ void ClusterComponent::Stop(void) { - /* Nothing to do here. */ + CloseLogFile(); } String ClusterComponent::GetCertificateFile(void) const @@ -192,6 +198,24 @@ void ClusterComponent::AddConnection(const String& node, const String& service) void ClusterComponent::RelayMessage(const Endpoint::Ptr& except, const Dictionary::Ptr& message, bool persistent) { + message->Set("ts", Utility::GetTime()); + + if (persistent) { + Dictionary::Ptr pmessage = boost::make_shared(); + pmessage->Set("timestamp", Utility::GetTime()); + pmessage->Set("message", message); + + ObjectLock olock(this); + String json = Value(pmessage).Serialize(); + NetString::WriteStringToStream(m_LogFile, json); + m_LogMessageCount++; + + if (m_LogMessageCount > 250000) { + CloseLogFile(); + OpenLogFile(); + } + } + BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { if (!persistent && !endpoint->IsConnected()) continue; @@ -206,6 +230,83 @@ void ClusterComponent::RelayMessage(const Endpoint::Ptr& except, const Dictionar } } +String ClusterComponent::GetClusterDir(void) const +{ + return Application::GetLocalStateDir() + "/lib/icinga2/cluster/"; +} + +void ClusterComponent::OpenLogFile(void) +{ + std::ostringstream msgbuf; + msgbuf << GetClusterDir() << static_cast(Utility::GetTime()); + String path = msgbuf.str(); + + std::fstream *fp = new std::fstream(path.CStr(), std::fstream::out | std::ofstream::app); + + if (!fp->good()) { + Log(LogWarning, "cluster", "Could not open spool file: " + path); + return; + } + + m_LogFile = boost::make_shared(fp, true); + m_LogMessageCount = 0; +} + +void ClusterComponent::CloseLogFile(void) +{ + m_LogFile->Close(); + m_LogFile.reset(); +} + +void ClusterComponent::LogGlobHandler(std::vector& files, const String& file) +{ + String name = Utility::BaseName(file); + int ts = Convert::ToLong(name); + files.push_back(ts); +} + +void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Ptr& stream) +{ + int count = 0; + + ASSERT(OwnsLock()); + + CloseLogFile(); + + std::vector files; + Utility::Glob(GetClusterDir() + "*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1)); + std::sort(files.begin(), files.end()); + + BOOST_FOREACH(int ts, files) { + std::ostringstream msgbuf; + msgbuf << GetClusterDir() << ts; + String path = msgbuf.str(); + + Log(LogInformation, "cluster", "Replaying log: " + path); + + std::fstream *fp = new std::fstream(path.CStr(), std::fstream::in); + StdioStream::Ptr lstream = boost::make_shared(fp, true); + + String message; + while (NetString::ReadStringFromStream(lstream, &message)) { + Dictionary::Ptr pmessage = Value::Deserialize(message); + + if (pmessage->Get("timestamp") < endpoint->GetLocalLogPosition()) + continue; + + String json = Value(pmessage->Get("message")).Serialize(); + NetString::WriteStringToStream(stream, json); + count++; + } + + lstream->Close(); + } + + Log(LogInformation, "cluster", "Replayed " + Convert::ToString(count) + " messages."); + + OpenLogFile(); +} + /** * Processes a new client connection. * @@ -231,6 +332,11 @@ void ClusterComponent::NewClientHandler(const Socket::Ptr& client, TlsRole role) return; } + { + ObjectLock olock(this); + ReplayLog(endpoint, tlsStream); + } + endpoint->SetClient(tlsStream); } @@ -540,6 +646,18 @@ void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Diction return; } + if (sender->GetRemoteLogPosition() + 10 < message->Get("ts")) { + Dictionary::Ptr lparams = boost::make_shared(); + lparams->Set("log_position", message->Get("ts")); + + Dictionary::Ptr lmessage = boost::make_shared(); + lmessage->Set("jsonrpc", "2.0"); + lmessage->Set("method", "cluster::SetLogPosition"); + lmessage->Set("params", lparams); + + sender->SendMessage(lmessage); + } + Dictionary::Ptr params = message->Get("params"); if (!params) @@ -720,6 +838,8 @@ void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Diction ObjectLock olock(service); service->ClearAcknowledgement(sender->GetName()); + } else if (message->Get("method") == "cluster::SetLogPosition") { + sender->SetRemoteLogPosition(params->Get("log_position")); } } diff --git a/components/cluster/clustercomponent.h b/components/cluster/clustercomponent.h index e1242095b..391092239 100644 --- a/components/cluster/clustercomponent.h +++ b/components/cluster/clustercomponent.h @@ -27,6 +27,7 @@ #include "base/tlsstream.h" #include "base/utility.h" #include "base/tlsutility.h" +#include "base/stdiostream.h" #include "icinga/service.h" #include "cluster/endpoint.h" @@ -50,6 +51,7 @@ public: String GetBindHost(void) const; String GetBindPort(void) const; Array::Ptr GetPeers(void) const; + String GetClusterDir(void) const; shared_ptr GetSSLContext(void) const; String GetIdentity(void) const; @@ -81,6 +83,14 @@ private: void RelayMessage(const Endpoint::Ptr& except, const Dictionary::Ptr& message, bool persistent); + void OpenLogFile(void); + void CloseLogFile(void); + static void LogGlobHandler(std::vector& files, const String& file); + void ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Ptr& stream); + + StdioStream::Ptr m_LogFile; + size_t m_LogMessageCount; + void CheckResultHandler(const Service::Ptr& service, const Dictionary::Ptr& cr, const String& authority); void NextCheckChangedHandler(const Service::Ptr& service, double nextCheck, const String& authority); void NextNotificationChangedHandler(const Notification::Ptr& notification, double nextCheck, const String& authority); @@ -97,7 +107,6 @@ private: void AcknowledgementSetHandler(const Service::Ptr& service, const String& author, const String& comment, AcknowledgementType type, double expiry, const String& authority); void AcknowledgementClearedHandler(const Service::Ptr& service, const String& authority); void MessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message); - }; } diff --git a/components/cluster/endpoint.cpp b/components/cluster/endpoint.cpp index 01ba7b9d0..ad74889a3 100644 --- a/components/cluster/endpoint.cpp +++ b/components/cluster/endpoint.cpp @@ -26,7 +26,6 @@ #include "base/logger_fwd.h" #include "config/configitembuilder.h" #include -#include using namespace icinga; @@ -35,41 +34,6 @@ REGISTER_TYPE(Endpoint); boost::signals2::signal Endpoint::OnConnected; boost::signals2::signal Endpoint::OnMessageReceived; -void Endpoint::OnConfigLoaded(void) -{ - ObjectLock olock(this); - - OpenSpoolFile(); -} - -void Endpoint::Stop(void) -{ - ObjectLock olock(this); - - CloseSpoolFile(); -} - -void Endpoint::OpenSpoolFile(void) -{ - ASSERT(OwnsLock()); - - String path = GetSpoolPath(); - std::fstream *fp = new std::fstream(path.CStr(), std::fstream::out | std::ofstream::app); - - if (!fp->good()) - Log(LogWarning, "cluster", "Could not open spool file: " + path); - - m_SpoolFile = boost::make_shared(fp, true); -} - -void Endpoint::CloseSpoolFile(void) -{ - ASSERT(OwnsLock()); - - m_SpoolFile->Close(); - m_SpoolFile.reset(); -} - /** * Checks whether this endpoint is connected. * @@ -92,24 +56,6 @@ void Endpoint::SetClient(const Stream::Ptr& client) { ObjectLock olock(this); - CloseSpoolFile(); - - String path = GetSpoolPath(); - std::ifstream *fp = new std::ifstream(path.CStr(), std::ifstream::in); - - while (fp && !fp->eof()) { - char data[1024]; - fp->read(data, sizeof(data)); - client->Write(data, fp->gcount()); - } - - fp->close(); - delete fp; - - (void) unlink(path.CStr()); - - OpenSpoolFile(); - m_Client = client; } @@ -121,15 +67,13 @@ void Endpoint::SetClient(const Stream::Ptr& client) void Endpoint::SendMessage(const Dictionary::Ptr& message) { - Stream::Ptr destination; + Stream::Ptr client = GetClient(); - if (!IsConnected()) - destination = m_SpoolFile; - else - destination = GetClient(); + if (!client) + return; try { - JsonRpc::SendMessage(destination, message); + JsonRpc::SendMessage(client, message); } catch (const std::exception& ex) { std::ostringstream msgbuf; msgbuf << "Error while sending JSON-RPC message for endpoint '" << GetName() << "': " << boost::diagnostic_information(ex); @@ -192,9 +136,24 @@ void Endpoint::SetSeen(double ts) m_Seen = ts; } -String Endpoint::GetSpoolPath(void) const +double Endpoint::GetLocalLogPosition(void) const { - return Application::GetLocalStateDir() + "/spool/icinga2/cluster/" + GetName() + ".ns"; + return m_LocalLogPosition; +} + +void Endpoint::SetLocalLogPosition(double ts) +{ + m_LocalLogPosition = ts; +} + +double Endpoint::GetRemoteLogPosition(void) const +{ + return m_RemoteLogPosition; +} + +void Endpoint::SetRemoteLogPosition(double ts) +{ + m_RemoteLogPosition = ts; } void Endpoint::InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) const @@ -206,8 +165,11 @@ void Endpoint::InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) bag->Set("port", m_Port); } - if (attributeTypes & Attribute_State) + if (attributeTypes & Attribute_State) { bag->Set("seen", m_Seen); + bag->Set("local_log_position", m_LocalLogPosition); + bag->Set("remote_log_position", m_RemoteLogPosition); + } } void Endpoint::InternalDeserialize(const Dictionary::Ptr& bag, int attributeTypes) @@ -219,6 +181,9 @@ void Endpoint::InternalDeserialize(const Dictionary::Ptr& bag, int attributeType m_Port = bag->Get("port"); } - if (attributeTypes & Attribute_State) + if (attributeTypes & Attribute_State) { m_Seen = bag->Get("seen"); + m_LocalLogPosition = bag->Get("local_log_position"); + m_RemoteLogPosition = bag->Get("remote_log_position"); + } } diff --git a/components/cluster/endpoint.h b/components/cluster/endpoint.h index 3530d671b..52e28a493 100644 --- a/components/cluster/endpoint.h +++ b/components/cluster/endpoint.h @@ -22,7 +22,6 @@ #include "base/dynamicobject.h" #include "base/stream.h" -#include "base/stdiostream.h" #include namespace icinga @@ -57,15 +56,16 @@ public: double GetSeen(void) const; void SetSeen(double ts); - String GetSpoolPath(void) const; + double GetLocalLogPosition(void) const; + void SetLocalLogPosition(double ts); + + double GetRemoteLogPosition(void) const; + void SetRemoteLogPosition(double ts); protected: virtual void InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) const; virtual void InternalDeserialize(const Dictionary::Ptr& bag, int attributeTypes); - virtual void OnConfigLoaded(void); - virtual void Stop(void); - private: Dictionary::Ptr m_Subscriptions; String m_Host; @@ -73,12 +73,10 @@ private: Stream::Ptr m_Client; double m_Seen; - StdioStream::Ptr m_SpoolFile; + double m_LocalLogPosition; + double m_RemoteLogPosition; void MessageThreadProc(const Stream::Ptr& stream); - - void OpenSpoolFile(void); - void CloseSpoolFile(void); }; }