From: Gunnar Beutner Date: Mon, 16 Sep 2013 07:30:31 +0000 (+0200) Subject: cluster: Make log replays non-blocking. X-Git-Tag: v0.0.3~509 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=e1b8e1180ccaa908acb6fe55494ad33aa173a0fc;p=icinga2 cluster: Make log replays non-blocking. --- diff --git a/components/cluster/clustercomponent.cpp b/components/cluster/clustercomponent.cpp index 7417ac6fb..518aba893 100644 --- a/components/cluster/clustercomponent.cpp +++ b/components/cluster/clustercomponent.cpp @@ -248,7 +248,12 @@ void ClusterComponent::RelayMessage(const Endpoint::Ptr& except, const Dictionar if (endpoint->GetName() == GetIdentity()) continue; - endpoint->SendMessage(message); + { + ObjectLock olock(endpoint); + + if (!endpoint->IsSyncing()) + endpoint->SendMessage(message); + } } } @@ -321,55 +326,69 @@ void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Pt { int count = 0; - ASSERT(OwnsLock()); + ASSERT(!OwnsLock()); - CloseLogFile(); - RotateLogFile(); + for (;;) { + ObjectLock olock(this); - std::vector files; - Utility::Glob(GetClusterDir() + "log/*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1)); - std::sort(files.begin(), files.end()); + CloseLogFile(); + RotateLogFile(); - BOOST_FOREACH(int ts, files) { - String path = GetClusterDir() + "log/" + Convert::ToString(ts); + std::vector files; + Utility::Glob(GetClusterDir() + "log/*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1)); + std::sort(files.begin(), files.end()); - if (ts < endpoint->GetLocalLogPosition()) - continue; + if (files.size() > 1) { + OpenLogFile(); + olock.Unlock(); + } + + BOOST_FOREACH(int ts, files) { + String path = GetClusterDir() + "log/" + Convert::ToString(ts); + + if (ts < endpoint->GetLocalLogPosition()) + continue; - Log(LogInformation, "cluster", "Replaying log: " + path); + Log(LogInformation, "cluster", "Replaying log: " + path); - std::fstream *fp = new std::fstream(path.CStr(), std::fstream::in); - StdioStream::Ptr logStream = boost::make_shared(fp, true); - ZlibStream::Ptr lstream = boost::make_shared(logStream); + std::fstream *fp = new std::fstream(path.CStr(), std::fstream::in); + StdioStream::Ptr logStream = boost::make_shared(fp, true); + ZlibStream::Ptr lstream = boost::make_shared(logStream); - String message; - while (true) { - try { - if (!NetString::ReadStringFromStream(lstream, &message)) + String message; + while (true) { + try { + if (!NetString::ReadStringFromStream(lstream, &message)) + break; + } catch (std::exception&) { + /* Log files may be incomplete or corrupted. This is perfectly OK. */ break; - } catch (std::exception&) { - /* Log files may be incomplete or corrupted. This is perfectly OK. */ - break; - } + } - Dictionary::Ptr pmessage = Value::Deserialize(message); + Dictionary::Ptr pmessage = Value::Deserialize(message); - if (pmessage->Get("timestamp") < endpoint->GetLocalLogPosition()) - continue; + if (pmessage->Get("timestamp") < endpoint->GetLocalLogPosition()) + continue; - if (pmessage->Get("except") == endpoint->GetName()) - continue; + if (pmessage->Get("except") == endpoint->GetName()) + continue; - NetString::WriteStringToStream(stream, pmessage->Get("message")); - count++; + NetString::WriteStringToStream(stream, pmessage->Get("message")); + count++; + } + + lstream->Close(); } - lstream->Close(); - } + Log(LogInformation, "cluster", "Replayed " + Convert::ToString(count) + " messages."); - Log(LogInformation, "cluster", "Replayed " + Convert::ToString(count) + " messages."); + if (files.size() == 1) { + ObjectLock olock2(endpoint); - OpenLogFile(); + endpoint->SetSyncing(false); + break; + } + } } void ClusterComponent::ConfigGlobHandler(const Dictionary::Ptr& config, const String& file, bool basename) @@ -411,7 +430,17 @@ void ClusterComponent::NewClientHandler(const Socket::Ptr& client, TlsRole role) return; } - endpoint->SetSeen(Utility::GetTime()); + { + ObjectLock olock(endpoint); + + Stream::Ptr oldClient = endpoint->GetClient(); + if (oldClient) + oldClient->Close(); + + endpoint->SetSyncing(true); + endpoint->SetSeen(Utility::GetTime()); + endpoint->SetClient(tlsStream); + } Dictionary::Ptr config = boost::make_shared(); Array::Ptr configFiles = endpoint->GetConfigFiles(); @@ -435,17 +464,7 @@ void ClusterComponent::NewClientHandler(const Socket::Ptr& client, TlsRole role) String json = Value(message).Serialize(); NetString::WriteStringToStream(tlsStream, json); - { - ObjectLock olock(this); - - Stream::Ptr oldClient = endpoint->GetClient(); - if (oldClient) - oldClient->Close(); - - endpoint->SetClient(tlsStream); - - ReplayLog(endpoint, tlsStream); - } + ReplayLog(endpoint, tlsStream); } void ClusterComponent::ClusterTimerHandler(void) diff --git a/components/cluster/endpoint.cpp b/components/cluster/endpoint.cpp index a2785e99e..45609a389 100644 --- a/components/cluster/endpoint.cpp +++ b/components/cluster/endpoint.cpp @@ -34,6 +34,10 @@ REGISTER_TYPE(Endpoint); boost::signals2::signal Endpoint::OnConnected; boost::signals2::signal Endpoint::OnMessageReceived; +Endpoint::Endpoint(void) + : m_Syncing(false) +{ } + /** * Checks whether this endpoint is connected. * @@ -46,18 +50,12 @@ bool Endpoint::IsConnected(void) const Stream::Ptr Endpoint::GetClient(void) const { - ObjectLock olock(this); - return m_Client; } void Endpoint::SetClient(const Stream::Ptr& client) { - { - ObjectLock olock(this); - - m_Client = client; - } + m_Client = client; if (client) { boost::thread thread(boost::bind(&Endpoint::MessageThreadProc, this, client)); @@ -111,8 +109,6 @@ void Endpoint::MessageThreadProc(const Stream::Ptr& stream) */ String Endpoint::GetHost(void) const { - ObjectLock olock(this); - return m_Host; } @@ -123,8 +119,6 @@ String Endpoint::GetHost(void) const */ String Endpoint::GetPort(void) const { - ObjectLock olock(this); - return m_Port; } @@ -168,6 +162,16 @@ void Endpoint::SetRemoteLogPosition(double ts) m_RemoteLogPosition = ts; } +bool Endpoint::IsSyncing(void) const +{ + return m_Syncing; +} + +void Endpoint::SetSyncing(bool syncing) +{ + m_Syncing = syncing; +} + Dictionary::Ptr Endpoint::GetFeatures(void) const { return m_Features; diff --git a/components/cluster/endpoint.h b/components/cluster/endpoint.h index 38a0b99d3..aa64bfc4d 100644 --- a/components/cluster/endpoint.h +++ b/components/cluster/endpoint.h @@ -41,6 +41,8 @@ public: DECLARE_PTR_TYPEDEFS(Endpoint); DECLARE_TYPENAME(Endpoint); + Endpoint(void); + static boost::signals2::signal OnConnected; static boost::signals2::signal OnMessageReceived; @@ -70,6 +72,9 @@ public: bool HasFeature(const String& type) const; + bool IsSyncing(void) const; + void SetSyncing(bool syncing); + protected: virtual void InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) const; virtual void InternalDeserialize(const Dictionary::Ptr& bag, int attributeTypes); @@ -86,6 +91,7 @@ private: double m_LocalLogPosition; double m_RemoteLogPosition; Dictionary::Ptr m_Features; + bool m_Syncing; void MessageThreadProc(const Stream::Ptr& stream); };