]> granicus.if.org Git - icinga2/commitdiff
cluster: Make log replays non-blocking.
authorGunnar Beutner <gunnar.beutner@netways.de>
Mon, 16 Sep 2013 07:30:31 +0000 (09:30 +0200)
committerGunnar Beutner <gunnar.beutner@netways.de>
Mon, 16 Sep 2013 07:30:31 +0000 (09:30 +0200)
components/cluster/clustercomponent.cpp
components/cluster/endpoint.cpp
components/cluster/endpoint.h

index 7417ac6fb933f64007e2958c87b2a7031e4ddc01..518aba893cc52749c4a6597887d07a36c1b04246 100644 (file)
@@ -248,7 +248,12 @@ void ClusterComponent::RelayMessage(const Endpoint::Ptr& except, const Dictionar
                if (endpoint->GetName() == GetIdentity())
                        continue;
 
-               endpoint->SendMessage(message);
+               {
+                       ObjectLock olock(endpoint);
+
+                       if (!endpoint->IsSyncing())
+                               endpoint->SendMessage(message);
+               }
        }
 }
 
@@ -321,55 +326,69 @@ void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Pt
 {
        int count = 0;
 
-       ASSERT(OwnsLock());
+       ASSERT(!OwnsLock());
 
-       CloseLogFile();
-       RotateLogFile();
+       for (;;) {
+               ObjectLock olock(this);
 
-       std::vector<int> files;
-       Utility::Glob(GetClusterDir() + "log/*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1));
-       std::sort(files.begin(), files.end());
+               CloseLogFile();
+               RotateLogFile();
 
-       BOOST_FOREACH(int ts, files) {
-               String path = GetClusterDir() + "log/" + Convert::ToString(ts);
+               std::vector<int> files;
+               Utility::Glob(GetClusterDir() + "log/*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1));
+               std::sort(files.begin(), files.end());
 
-               if (ts < endpoint->GetLocalLogPosition())
-                       continue;
+               if (files.size() > 1) {
+                       OpenLogFile();
+                       olock.Unlock();
+               }
+
+               BOOST_FOREACH(int ts, files) {
+                       String path = GetClusterDir() + "log/" + Convert::ToString(ts);
+
+                       if (ts < endpoint->GetLocalLogPosition())
+                               continue;
 
-               Log(LogInformation, "cluster", "Replaying log: " + path);
+                       Log(LogInformation, "cluster", "Replaying log: " + path);
 
-               std::fstream *fp = new std::fstream(path.CStr(), std::fstream::in);
-               StdioStream::Ptr logStream = boost::make_shared<StdioStream>(fp, true);
-               ZlibStream::Ptr lstream = boost::make_shared<ZlibStream>(logStream);
+                       std::fstream *fp = new std::fstream(path.CStr(), std::fstream::in);
+                       StdioStream::Ptr logStream = boost::make_shared<StdioStream>(fp, true);
+                       ZlibStream::Ptr lstream = boost::make_shared<ZlibStream>(logStream);
 
-               String message;
-               while (true) {
-                       try {
-                               if (!NetString::ReadStringFromStream(lstream, &message))
+                       String message;
+                       while (true) {
+                               try {
+                                       if (!NetString::ReadStringFromStream(lstream, &message))
+                                               break;
+                               } catch (std::exception&) {
+                                       /* Log files may be incomplete or corrupted. This is perfectly OK. */
                                        break;
-                       } catch (std::exception&) {
-                               /* Log files may be incomplete or corrupted. This is perfectly OK. */
-                               break;
-                       }
+                               }
 
-                       Dictionary::Ptr pmessage = Value::Deserialize(message);
+                               Dictionary::Ptr pmessage = Value::Deserialize(message);
 
-                       if (pmessage->Get("timestamp") < endpoint->GetLocalLogPosition())
-                               continue;
+                               if (pmessage->Get("timestamp") < endpoint->GetLocalLogPosition())
+                                       continue;
 
-                       if (pmessage->Get("except") == endpoint->GetName())
-                               continue;
+                               if (pmessage->Get("except") == endpoint->GetName())
+                                       continue;
 
-                       NetString::WriteStringToStream(stream, pmessage->Get("message"));
-                       count++;
+                               NetString::WriteStringToStream(stream, pmessage->Get("message"));
+                               count++;
+                       }
+
+                       lstream->Close();
                }
 
-               lstream->Close();
-       }
+               Log(LogInformation, "cluster", "Replayed " + Convert::ToString(count) + " messages.");
 
-       Log(LogInformation, "cluster", "Replayed " + Convert::ToString(count) + " messages.");
+               if (files.size() == 1) {
+                       ObjectLock olock2(endpoint);
 
-       OpenLogFile();
+                       endpoint->SetSyncing(false);
+                       break;
+               }
+       }
 }
 
 void ClusterComponent::ConfigGlobHandler(const Dictionary::Ptr& config, const String& file, bool basename)
@@ -411,7 +430,17 @@ void ClusterComponent::NewClientHandler(const Socket::Ptr& client, TlsRole role)
                return;
        }
 
-       endpoint->SetSeen(Utility::GetTime());
+       {
+               ObjectLock olock(endpoint);
+
+               Stream::Ptr oldClient = endpoint->GetClient();
+               if (oldClient)
+                       oldClient->Close();
+
+               endpoint->SetSyncing(true);
+               endpoint->SetSeen(Utility::GetTime());
+               endpoint->SetClient(tlsStream);
+       }
 
        Dictionary::Ptr config = boost::make_shared<Dictionary>();
        Array::Ptr configFiles = endpoint->GetConfigFiles();
@@ -435,17 +464,7 @@ void ClusterComponent::NewClientHandler(const Socket::Ptr& client, TlsRole role)
        String json = Value(message).Serialize();
        NetString::WriteStringToStream(tlsStream, json);
 
-       {
-               ObjectLock olock(this);
-
-               Stream::Ptr oldClient = endpoint->GetClient();
-               if (oldClient)
-                       oldClient->Close();
-
-               endpoint->SetClient(tlsStream);
-
-               ReplayLog(endpoint, tlsStream);
-       }
+       ReplayLog(endpoint, tlsStream);
 }
 
 void ClusterComponent::ClusterTimerHandler(void)
index a2785e99e1a494f43dd9eb04ad2529395fc10665..45609a3893c70d817eb10de172da18f767b3ece3 100644 (file)
@@ -34,6 +34,10 @@ REGISTER_TYPE(Endpoint);
 boost::signals2::signal<void (const Endpoint::Ptr&)> Endpoint::OnConnected;
 boost::signals2::signal<void (const Endpoint::Ptr&, const Dictionary::Ptr&)> Endpoint::OnMessageReceived;
 
+Endpoint::Endpoint(void)
+       : m_Syncing(false)
+{ }
+
 /**
  * Checks whether this endpoint is connected.
  *
@@ -46,18 +50,12 @@ bool Endpoint::IsConnected(void) const
 
 Stream::Ptr Endpoint::GetClient(void) const
 {
-       ObjectLock olock(this);
-
        return m_Client;
 }
 
 void Endpoint::SetClient(const Stream::Ptr& client)
 {
-       {
-               ObjectLock olock(this);
-
-               m_Client = client;
-       }
+       m_Client = client;
 
        if (client) {
                boost::thread thread(boost::bind(&Endpoint::MessageThreadProc, this, client));
@@ -111,8 +109,6 @@ void Endpoint::MessageThreadProc(const Stream::Ptr& stream)
  */
 String Endpoint::GetHost(void) const
 {
-       ObjectLock olock(this);
-
        return m_Host;
 }
 
@@ -123,8 +119,6 @@ String Endpoint::GetHost(void) const
  */
 String Endpoint::GetPort(void) const
 {
-       ObjectLock olock(this);
-
        return m_Port;
 }
 
@@ -168,6 +162,16 @@ void Endpoint::SetRemoteLogPosition(double ts)
        m_RemoteLogPosition = ts;
 }
 
+bool Endpoint::IsSyncing(void) const
+{
+       return m_Syncing;
+}
+
+void Endpoint::SetSyncing(bool syncing)
+{
+       m_Syncing = syncing;
+}
+
 Dictionary::Ptr Endpoint::GetFeatures(void) const
 {
        return m_Features;
index 38a0b99d3925c2da629333870c35d3c25bac6cbf..aa64bfc4dd40c93c34732e355409b3c8e499ed79 100644 (file)
@@ -41,6 +41,8 @@ public:
        DECLARE_PTR_TYPEDEFS(Endpoint);
        DECLARE_TYPENAME(Endpoint);
 
+       Endpoint(void);
+
        static boost::signals2::signal<void (const Endpoint::Ptr&)> OnConnected;
        static boost::signals2::signal<void (const Endpoint::Ptr&, const Dictionary::Ptr&)> OnMessageReceived;
 
@@ -70,6 +72,9 @@ public:
 
        bool HasFeature(const String& type) const;
 
+       bool IsSyncing(void) const;
+       void SetSyncing(bool syncing);
+
 protected:
        virtual void InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) const;
        virtual void InternalDeserialize(const Dictionary::Ptr& bag, int attributeTypes);
@@ -86,6 +91,7 @@ private:
        double m_LocalLogPosition;
        double m_RemoteLogPosition;
        Dictionary::Ptr m_Features;
+       bool m_Syncing;
 
        void MessageThreadProc(const Stream::Ptr& stream);
 };