]> granicus.if.org Git - icinga2/commitdiff
cluster: Don't replay logs unless they're relevant.
authorGunnar Beutner <gunnar.beutner@netways.de>
Tue, 3 Sep 2013 08:30:28 +0000 (10:30 +0200)
committerGunnar Beutner <gunnar.beutner@netways.de>
Tue, 3 Sep 2013 08:30:28 +0000 (10:30 +0200)
components/cluster/clustercomponent.cpp
components/cluster/clustercomponent.h

index 7810c9a486bd63cd54dc040ca9e9454155e370b5..f4229af1fc41b8d970074cdd725af90a3acf515e 100644 (file)
@@ -40,7 +40,10 @@ void ClusterComponent::Start(void)
 {
        DynamicObject::Start();
 
-       OpenLogFile();
+       {
+               ObjectLock olock(this);
+               OpenLogFile();
+       }
 
        /* set up SSL context */
        shared_ptr<X509> cert = GetX509Certificate(GetCertificateFile());
@@ -82,6 +85,7 @@ void ClusterComponent::Start(void)
  */
 void ClusterComponent::Stop(void)
 {
+       ObjectLock olock(this);
        CloseLogFile();
 }
 
@@ -202,13 +206,15 @@ void ClusterComponent::RelayMessage(const Endpoint::Ptr& except, const Dictionar
 
        if (persistent) {
                Dictionary::Ptr pmessage = boost::make_shared<Dictionary>();
-               pmessage->Set("timestamp", Utility::GetTime());
+               double ts = Utility::GetTime();
+               pmessage->Set("timestamp", ts);
                pmessage->Set("message", message);
 
                ObjectLock olock(this);
                String json = Value(pmessage).Serialize();
                NetString::WriteStringToStream(m_LogFile, json);
                m_LogMessageCount++;
+               m_LogMessageTimestamp = ts;
 
                if (m_LogMessageCount > 250000) {
                        CloseLogFile();
@@ -237,9 +243,9 @@ String ClusterComponent::GetClusterDir(void) const
 
 void ClusterComponent::OpenLogFile(void)
 {
-       std::ostringstream msgbuf;
-       msgbuf << GetClusterDir() << static_cast<long>(Utility::GetTime());
-       String path = msgbuf.str();
+       ASSERT(OwnsLock());
+
+       String path = GetClusterDir() + "current";
 
        std::fstream *fp = new std::fstream(path.CStr(), std::fstream::out | std::ofstream::app);
 
@@ -250,12 +256,21 @@ void ClusterComponent::OpenLogFile(void)
 
        m_LogFile = boost::make_shared<StdioStream>(fp, true);
        m_LogMessageCount = 0;
+       m_LogMessageTimestamp = 0;
 }
 
 void ClusterComponent::CloseLogFile(void)
 {
+       ASSERT(OwnsLock());
+
        m_LogFile->Close();
        m_LogFile.reset();
+
+       if (m_LogMessageTimestamp != 0) {
+               String oldpath = GetClusterDir() + "current";
+               String newpath = GetClusterDir() + Convert::ToString(static_cast<int>(m_LogMessageTimestamp) + 1);
+               (void) rename(oldpath.CStr(), newpath.CStr());
+       }
 }
 
 void ClusterComponent::LogGlobHandler(std::vector<int>& files, const String& file)
@@ -282,6 +297,9 @@ void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Pt
                msgbuf << GetClusterDir() << ts;
                String path = msgbuf.str();
 
+               if (ts < endpoint->GetLocalLogPosition())
+                       continue;
+
                Log(LogInformation, "cluster", "Replaying log: " + path);
 
                std::fstream *fp = new std::fstream(path.CStr(), std::fstream::in);
index 391092239f115d9cc95af81fb06b74bc0d542e25..0529208930fcef15939c453be86294ed6e9d9a18 100644 (file)
@@ -89,6 +89,7 @@ private:
        void ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Ptr& stream);
 
        StdioStream::Ptr m_LogFile;
+       double m_LogMessageTimestamp;
        size_t m_LogMessageCount;
 
        void CheckResultHandler(const Service::Ptr& service, const Dictionary::Ptr& cr, const String& authority);