]> granicus.if.org Git - icinga2/commitdiff
Use a dynamic thread pool for API connections 6633/head
authorMichael Friedrich <michael.friedrich@icinga.com>
Mon, 24 Sep 2018 14:38:48 +0000 (16:38 +0200)
committerMichael Friedrich <michael.friedrich@icinga.com>
Tue, 25 Sep 2018 10:43:10 +0000 (12:43 +0200)
The full analysis is located in #6517.

fixes #6517

lib/remote/apilistener.cpp
lib/remote/apilistener.hpp

index 4605df364098976fcdb558c76ed4de4412f26235..ed5c47a951460caa2327ee7239a21fecdce7d844 100644 (file)
@@ -112,6 +112,22 @@ void ApiListener::CopyCertificateFile(const String& oldCertPath, const String& n
        }
 }
 
+/**
+ * Returns the API thread pool.
+ *
+ * @returns The API thread pool.
+ */
+ThreadPool& ApiListener::GetTP()
+{
+       static ThreadPool tp;
+       return tp;
+}
+
+void ApiListener::EnqueueAsyncCallback(const std::function<void ()>& callback, SchedulerPolicy policy)
+{
+       GetTP().Post(callback, policy);
+}
+
 void ApiListener::OnConfigLoaded()
 {
        if (m_Instance)
@@ -364,8 +380,9 @@ void ApiListener::ListenerThreadProc(const Socket::Ptr& server)
        for (;;) {
                try {
                        Socket::Ptr client = server->Accept();
-                       std::thread thread(std::bind(&ApiListener::NewClientHandler, this, client, String(), RoleServer));
-                       thread.detach();
+
+                       /* Use dynamic thread pool with additional on demand resources with fast throughput. */
+                       EnqueueAsyncCallback(std::bind(&ApiListener::NewClientHandler, this, client, String(), RoleServer), LowLatencyScheduler);
                } catch (const std::exception&) {
                        Log(LogCritical, "ApiListener", "Cannot accept new connection.");
                }
@@ -399,9 +416,10 @@ void ApiListener::AddConnection(const Endpoint::Ptr& endpoint)
        TcpSocket::Ptr client = new TcpSocket();
 
        try {
-               endpoint->SetConnecting(true);
                client->Connect(host, port);
+
                NewClientHandler(client, endpoint->GetName(), RoleClient);
+
                endpoint->SetConnecting(false);
        } catch (const std::exception& ex) {
                endpoint->SetConnecting(false);
@@ -784,8 +802,11 @@ void ApiListener::ApiReconnectTimerHandler()
                                continue;
                        }
 
-                       std::thread thread(std::bind(&ApiListener::AddConnection, this, endpoint));
-                       thread.detach();
+                       /* Set connecting state to prevent duplicated queue inserts later. */
+                       endpoint->SetConnecting(true);
+
+                       /* Use dynamic thread pool with additional on demand resources with fast throughput. */
+                       EnqueueAsyncCallback(std::bind(&ApiListener::AddConnection, this, endpoint), LowLatencyScheduler);
                }
        }
 
index 7868352d46cbd97d1fcee6abc83d177dfc6f78e1..3a24133e7f8c822d3c6dde5ab593d93150c93800 100644 (file)
@@ -30,6 +30,7 @@
 #include "base/workqueue.hpp"
 #include "base/tcpsocket.hpp"
 #include "base/tlsstream.hpp"
+#include "base/threadpool.hpp"
 #include <set>
 
 namespace icinga
@@ -148,6 +149,9 @@ private:
        void NewClientHandlerInternal(const Socket::Ptr& client, const String& hostname, ConnectionRole role);
        void ListenerThreadProc(const Socket::Ptr& server);
 
+       static ThreadPool& GetTP();
+       static void EnqueueAsyncCallback(const std::function<void ()>& callback, SchedulerPolicy policy = DefaultScheduler);
+
        WorkQueue m_RelayQueue;
        WorkQueue m_SyncQueue{0, 4};