From: Michael Friedrich Date: Wed, 25 Nov 2015 12:11:41 +0000 (+0100) Subject: Use a work queue for replaying the cluster log X-Git-Tag: v2.4.2~103 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=91e1e9d93efcd35df0c70342ec1aa30664e0e39d;p=icinga2 Use a work queue for replaying the cluster log refs #10713 --- diff --git a/lib/remote/apilistener.cpp b/lib/remote/apilistener.cpp index 6853f0010..bc49a2f30 100644 --- a/lib/remote/apilistener.cpp +++ b/lib/remote/apilistener.cpp @@ -48,7 +48,7 @@ REGISTER_STATSFUNCTION(ApiListener, &ApiListener::StatsFunc); REGISTER_APIFUNCTION(Hello, icinga, &ApiListener::HelloAPIHandler); ApiListener::ApiListener(void) - : m_LogMessageCount(0) + : m_SyncQueue(0, 4), m_LogMessageCount(0) { } void ApiListener::OnConfigLoaded(void) @@ -373,26 +373,8 @@ void ApiListener::NewClientHandlerInternal(const Socket::Ptr& client, const Stri if (endpoint) { endpoint->AddClient(aclient); - if (need_sync) { - { - ObjectLock olock(endpoint); - - endpoint->SetSyncing(true); - } - - Log(LogInformation, "ApiListener") - << "Sending updates for endpoint '" << endpoint->GetName() << "'."; - - /* sync zone file config */ - SendConfigUpdate(aclient); - /* sync runtime config */ - SendRuntimeConfigObjects(aclient); - - Log(LogInformation, "ApiListener") - << "Finished sending updates for endpoint '" << endpoint->GetName() << "'."; - - ReplayLog(aclient); - } + if (need_sync) + m_SyncQueue.Enqueue(boost::bind(&ApiListener::SyncClient, this, aclient, endpoint)); } else AddAnonymousClient(aclient); } else { @@ -404,6 +386,33 @@ void ApiListener::NewClientHandlerInternal(const Socket::Ptr& client, const Stri } } +void ApiListener::SyncClient(const JsonRpcConnection::Ptr& aclient, const Endpoint::Ptr& endpoint) +{ + try { + { + ObjectLock olock(endpoint); + + endpoint->SetSyncing(true); + } + + Log(LogInformation, "ApiListener") + << "Sending updates for endpoint '" << endpoint->GetName() << "'."; + + /* sync zone file config */ + SendConfigUpdate(aclient); + /* sync runtime config */ + SendRuntimeConfigObjects(aclient); + + Log(LogInformation, "ApiListener") + << "Finished sending updates for endpoint '" << endpoint->GetName() << "'."; + + ReplayLog(aclient); + } catch (const std::exception& ex) { + Log(LogCritical, "ApiListener") + << "Error while syncing endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex); + } +} + void ApiListener::ApiTimerHandler(void) { double now = Utility::GetTime(); diff --git a/lib/remote/apilistener.hpp b/lib/remote/apilistener.hpp index 78480b186..f5fa65b80 100644 --- a/lib/remote/apilistener.hpp +++ b/lib/remote/apilistener.hpp @@ -113,6 +113,7 @@ private: void ListenerThreadProc(const Socket::Ptr& server); WorkQueue m_RelayQueue; + WorkQueue m_SyncQueue; boost::mutex m_LogLock; Stream::Ptr m_LogFile; @@ -143,6 +144,8 @@ private: void DeleteConfigObject(const ConfigObject::Ptr& object, const MessageOrigin::Ptr& origin, const JsonRpcConnection::Ptr& client = JsonRpcConnection::Ptr()); void SendRuntimeConfigObjects(const JsonRpcConnection::Ptr& aclient); + + void SyncClient(const JsonRpcConnection::Ptr& aclient, const Endpoint::Ptr& endpoint); }; } diff --git a/lib/remote/jsonrpcconnection-heartbeat.cpp b/lib/remote/jsonrpcconnection-heartbeat.cpp index 5e66f70ed..e09162c58 100644 --- a/lib/remote/jsonrpcconnection-heartbeat.cpp +++ b/lib/remote/jsonrpcconnection-heartbeat.cpp @@ -46,12 +46,6 @@ void JsonRpcConnection::HeartbeatTimerHandler(void) { BOOST_FOREACH(const Endpoint::Ptr& endpoint, ConfigType::GetObjectsByType()) { BOOST_FOREACH(const JsonRpcConnection::Ptr& client, endpoint->GetClients()) { - if (endpoint->GetSyncing()) { - Log(LogInformation, "JsonRpcConnection") - << "Not sending heartbeat for endpoint '" << endpoint->GetName() << "' because we're replaying the log for it."; - continue; - } - if (client->m_NextHeartbeat != 0 && client->m_NextHeartbeat < Utility::GetTime()) { Log(LogWarning, "JsonRpcConnection") << "Client for endpoint '" << endpoint->GetName() << "' has requested "