]> granicus.if.org Git - icinga2/commitdiff
Use a work queue for replaying the cluster log
authorMichael Friedrich <michael.friedrich@netways.de>
Wed, 25 Nov 2015 12:11:41 +0000 (13:11 +0100)
committerGunnar Beutner <gunnar@beutner.name>
Tue, 23 Feb 2016 08:25:48 +0000 (09:25 +0100)
refs #10713

lib/remote/apilistener.cpp
lib/remote/apilistener.hpp
lib/remote/jsonrpcconnection-heartbeat.cpp

index 6853f0010ef58cf2b5af0c37db4c77c9056272e0..bc49a2f3049bc570e44ff3f956e1e9456771ffc3 100644 (file)
@@ -48,7 +48,7 @@ REGISTER_STATSFUNCTION(ApiListener, &ApiListener::StatsFunc);
 REGISTER_APIFUNCTION(Hello, icinga, &ApiListener::HelloAPIHandler);
 
 ApiListener::ApiListener(void)
-       : m_LogMessageCount(0)
+       : m_SyncQueue(0, 4), m_LogMessageCount(0)
 { }
 
 void ApiListener::OnConfigLoaded(void)
@@ -373,26 +373,8 @@ void ApiListener::NewClientHandlerInternal(const Socket::Ptr& client, const Stri
                if (endpoint) {
                        endpoint->AddClient(aclient);
 
-                       if (need_sync) {
-                               {
-                                       ObjectLock olock(endpoint);
-
-                                       endpoint->SetSyncing(true);
-                               }
-
-                               Log(LogInformation, "ApiListener")
-                                   << "Sending updates for endpoint '" << endpoint->GetName() << "'.";
-
-                               /* sync zone file config */
-                               SendConfigUpdate(aclient);
-                               /* sync runtime config */
-                               SendRuntimeConfigObjects(aclient);
-
-                               Log(LogInformation, "ApiListener")
-                                   << "Finished sending updates for endpoint '" << endpoint->GetName() << "'.";
-
-                               ReplayLog(aclient);
-                       }
+                       if (need_sync)
+                               m_SyncQueue.Enqueue(boost::bind(&ApiListener::SyncClient, this, aclient, endpoint));
                } else
                        AddAnonymousClient(aclient);
        } else {
@@ -404,6 +386,33 @@ void ApiListener::NewClientHandlerInternal(const Socket::Ptr& client, const Stri
        }
 }
 
+void ApiListener::SyncClient(const JsonRpcConnection::Ptr& aclient, const Endpoint::Ptr& endpoint)
+{
+       try {
+               {
+                       ObjectLock olock(endpoint);
+
+                       endpoint->SetSyncing(true);
+               }
+
+               Log(LogInformation, "ApiListener")
+                   << "Sending updates for endpoint '" << endpoint->GetName() << "'.";
+
+               /* sync zone file config */
+               SendConfigUpdate(aclient);
+               /* sync runtime config */
+               SendRuntimeConfigObjects(aclient);
+
+               Log(LogInformation, "ApiListener")
+                   << "Finished sending updates for endpoint '" << endpoint->GetName() << "'.";
+
+               ReplayLog(aclient);
+       } catch (const std::exception& ex) {
+               Log(LogCritical, "ApiListener")
+                   << "Error while syncing endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex);
+       }
+}
+
 void ApiListener::ApiTimerHandler(void)
 {
        double now = Utility::GetTime();
index 78480b18629de2e1a0a169d5c1cf223af8cc3c7e..f5fa65b80681e271af3471c71bfc8d75fb503e2c 100644 (file)
@@ -113,6 +113,7 @@ private:
        void ListenerThreadProc(const Socket::Ptr& server);
 
        WorkQueue m_RelayQueue;
+       WorkQueue m_SyncQueue;
 
        boost::mutex m_LogLock;
        Stream::Ptr m_LogFile;
@@ -143,6 +144,8 @@ private:
        void DeleteConfigObject(const ConfigObject::Ptr& object, const MessageOrigin::Ptr& origin,
            const JsonRpcConnection::Ptr& client = JsonRpcConnection::Ptr());
        void SendRuntimeConfigObjects(const JsonRpcConnection::Ptr& aclient);
+
+       void SyncClient(const JsonRpcConnection::Ptr& aclient, const Endpoint::Ptr& endpoint);
 };
 
 }
index 5e66f70ed2b0f56c1320d9c4c9566cdfa2bc3c6e..e09162c586c0530adeeb34e3dee2ee4af2c940e5 100644 (file)
@@ -46,12 +46,6 @@ void JsonRpcConnection::HeartbeatTimerHandler(void)
 {
        BOOST_FOREACH(const Endpoint::Ptr& endpoint, ConfigType::GetObjectsByType<Endpoint>()) {
                BOOST_FOREACH(const JsonRpcConnection::Ptr& client, endpoint->GetClients()) {
-                       if (endpoint->GetSyncing()) {
-                               Log(LogInformation, "JsonRpcConnection")
-                                   << "Not sending heartbeat for endpoint '" << endpoint->GetName() << "' because we're replaying the log for it.";
-                               continue;
-                       }
-
                        if (client->m_NextHeartbeat != 0 && client->m_NextHeartbeat < Utility::GetTime()) {
                                Log(LogWarning, "JsonRpcConnection")
                                    << "Client for endpoint '" << endpoint->GetName() << "' has requested "