]> granicus.if.org Git - icinga2/commitdiff
cluster: Don't replay log messages twice.
authorGunnar Beutner <gunnar.beutner@netways.de>
Mon, 16 Sep 2013 07:49:28 +0000 (09:49 +0200)
committerGunnar Beutner <gunnar.beutner@netways.de>
Mon, 16 Sep 2013 07:49:28 +0000 (09:49 +0200)
components/cluster/clustercomponent.cpp

index 518aba893cc52749c4a6597887d07a36c1b04246..41150e4eb289acb2efa3e387ce658922e703817d 100644 (file)
@@ -325,6 +325,8 @@ void ClusterComponent::LogGlobHandler(std::vector<int>& files, const String& fil
 void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Ptr& stream)
 {
        int count = 0;
+       double peer_ts = endpoint->GetLocalLogPosition();
+       bool last_sync = false;
 
        ASSERT(!OwnsLock());
 
@@ -338,15 +340,17 @@ void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Pt
                Utility::Glob(GetClusterDir() + "log/*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1));
                std::sort(files.begin(), files.end());
 
-               if (files.size() > 1) {
+               if (count == 0 || count > 50000) {
                        OpenLogFile();
                        olock.Unlock();
+               } else {
+                       last_sync = true;
                }
 
                BOOST_FOREACH(int ts, files) {
                        String path = GetClusterDir() + "log/" + Convert::ToString(ts);
 
-                       if (ts < endpoint->GetLocalLogPosition())
+                       if (ts < peer_ts)
                                continue;
 
                        Log(LogInformation, "cluster", "Replaying log: " + path);
@@ -367,7 +371,7 @@ void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Pt
 
                                Dictionary::Ptr pmessage = Value::Deserialize(message);
 
-                               if (pmessage->Get("timestamp") < endpoint->GetLocalLogPosition())
+                               if (pmessage->Get("timestamp") < peer_ts)
                                        continue;
 
                                if (pmessage->Get("except") == endpoint->GetName())
@@ -375,6 +379,8 @@ void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Pt
 
                                NetString::WriteStringToStream(stream, pmessage->Get("message"));
                                count++;
+
+                               peer_ts = pmessage->Get("timestamp");
                        }
 
                        lstream->Close();
@@ -382,7 +388,7 @@ void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Pt
 
                Log(LogInformation, "cluster", "Replayed " + Convert::ToString(count) + " messages.");
 
-               if (files.size() == 1) {
+               if (last_sync) {
                        ObjectLock olock2(endpoint);
 
                        endpoint->SetSyncing(false);