From: Gunnar Beutner Date: Tue, 3 Sep 2013 08:30:28 +0000 (+0200) Subject: cluster: Don't replay logs unless they're relevant. X-Git-Tag: v0.0.3~616 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=8098329a33acf12527339537d12f3282cf344826;p=icinga2 cluster: Don't replay logs unless they're relevant. --- diff --git a/components/cluster/clustercomponent.cpp b/components/cluster/clustercomponent.cpp index 7810c9a48..f4229af1f 100644 --- a/components/cluster/clustercomponent.cpp +++ b/components/cluster/clustercomponent.cpp @@ -40,7 +40,10 @@ void ClusterComponent::Start(void) { DynamicObject::Start(); - OpenLogFile(); + { + ObjectLock olock(this); + OpenLogFile(); + } /* set up SSL context */ shared_ptr cert = GetX509Certificate(GetCertificateFile()); @@ -82,6 +85,7 @@ void ClusterComponent::Start(void) */ void ClusterComponent::Stop(void) { + ObjectLock olock(this); CloseLogFile(); } @@ -202,13 +206,15 @@ void ClusterComponent::RelayMessage(const Endpoint::Ptr& except, const Dictionar if (persistent) { Dictionary::Ptr pmessage = boost::make_shared(); - pmessage->Set("timestamp", Utility::GetTime()); + double ts = Utility::GetTime(); + pmessage->Set("timestamp", ts); pmessage->Set("message", message); ObjectLock olock(this); String json = Value(pmessage).Serialize(); NetString::WriteStringToStream(m_LogFile, json); m_LogMessageCount++; + m_LogMessageTimestamp = ts; if (m_LogMessageCount > 250000) { CloseLogFile(); @@ -237,9 +243,9 @@ String ClusterComponent::GetClusterDir(void) const void ClusterComponent::OpenLogFile(void) { - std::ostringstream msgbuf; - msgbuf << GetClusterDir() << static_cast(Utility::GetTime()); - String path = msgbuf.str(); + ASSERT(OwnsLock()); + + String path = GetClusterDir() + "current"; std::fstream *fp = new std::fstream(path.CStr(), std::fstream::out | std::ofstream::app); @@ -250,12 +256,21 @@ void ClusterComponent::OpenLogFile(void) m_LogFile = boost::make_shared(fp, true); m_LogMessageCount = 0; + m_LogMessageTimestamp = 0; } void ClusterComponent::CloseLogFile(void) { + ASSERT(OwnsLock()); + m_LogFile->Close(); m_LogFile.reset(); + + if (m_LogMessageTimestamp != 0) { + String oldpath = GetClusterDir() + "current"; + String newpath = GetClusterDir() + Convert::ToString(static_cast(m_LogMessageTimestamp) + 1); + (void) rename(oldpath.CStr(), newpath.CStr()); + } } void ClusterComponent::LogGlobHandler(std::vector& files, const String& file) @@ -282,6 +297,9 @@ void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Pt msgbuf << GetClusterDir() << ts; String path = msgbuf.str(); + if (ts < endpoint->GetLocalLogPosition()) + continue; + Log(LogInformation, "cluster", "Replaying log: " + path); std::fstream *fp = new std::fstream(path.CStr(), std::fstream::in); diff --git a/components/cluster/clustercomponent.h b/components/cluster/clustercomponent.h index 391092239..052920893 100644 --- a/components/cluster/clustercomponent.h +++ b/components/cluster/clustercomponent.h @@ -89,6 +89,7 @@ private: void ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Ptr& stream); StdioStream::Ptr m_LogFile; + double m_LogMessageTimestamp; size_t m_LogMessageCount; void CheckResultHandler(const Service::Ptr& service, const Dictionary::Ptr& cr, const String& authority);