]> granicus.if.org Git - icinga2/commitdiff
Use multiple WorkQueues to process cluster messages
authorGunnar Beutner <gunnar@beutner.name>
Wed, 27 Jan 2016 14:45:58 +0000 (15:45 +0100)
committerGunnar Beutner <gunnar@beutner.name>
Wed, 27 Jan 2016 14:45:58 +0000 (15:45 +0100)
refs #11014

lib/remote/jsonrpcconnection.cpp
lib/remote/jsonrpcconnection.hpp

index 49191f93850d5310cf11ad9c265a4d8e362ac2db..8d2a6103dd4f006668635d9947af8a61b7d2dd21 100644 (file)
@@ -37,10 +37,13 @@ REGISTER_APIFUNCTION(RequestCertificate, pki, &RequestCertificateHandler);
 
 static boost::once_flag l_JsonRpcConnectionOnceFlag = BOOST_ONCE_INIT;
 static Timer::Ptr l_JsonRpcConnectionTimeoutTimer;
+static WorkQueue *l_JsonRpcConnectionWorkQueues;
+static size_t l_JsonRpcConnectionWorkQueueCount;
+static int l_JsonRpcConnectionNextID;
 
 JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated,
     const TlsStream::Ptr& stream, ConnectionRole role)
-       : m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream),
+       : m_ID(l_JsonRpcConnectionNextID++), m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream),
          m_Role(role), m_Timestamp(Utility::GetTime()), m_Seen(Utility::GetTime()),
          m_NextHeartbeat(0), m_HeartbeatTimeout(0)
 {
@@ -56,6 +59,9 @@ void JsonRpcConnection::StaticInitialize(void)
        l_JsonRpcConnectionTimeoutTimer->OnTimerExpired.connect(boost::bind(&JsonRpcConnection::TimeoutTimerHandler));
        l_JsonRpcConnectionTimeoutTimer->SetInterval(15);
        l_JsonRpcConnectionTimeoutTimer->Start();
+
+       l_JsonRpcConnectionWorkQueueCount = Application::GetConcurrency();
+       l_JsonRpcConnectionWorkQueues = new WorkQueue[l_JsonRpcConnectionWorkQueueCount];
 }
 
 void JsonRpcConnection::Start(void)
@@ -128,15 +134,8 @@ void JsonRpcConnection::Disconnect(void)
        }
 }
 
-bool JsonRpcConnection::ProcessMessage(void)
+void JsonRpcConnection::MessageHandler(const Dictionary::Ptr& message)
 {
-       Dictionary::Ptr message;
-
-       StreamReadStatus srs = JsonRpc::ReadMessage(m_Stream, &message, m_Context, false);
-
-       if (srs != StatusNewItem)
-               return false;
-
        m_Seen = Utility::GetTime();
 
        if (m_HeartbeatTimeout != 0)
@@ -147,7 +146,7 @@ bool JsonRpcConnection::ProcessMessage(void)
 
                /* ignore old messages */
                if (ts < m_Endpoint->GetRemoteLogPosition())
-                       return true;
+                       return;
 
                m_Endpoint->SetRemoteLogPosition(ts);
        }
@@ -190,6 +189,18 @@ bool JsonRpcConnection::ProcessMessage(void)
                resultMessage->Set("id", message->Get("id"));
                JsonRpc::SendMessage(m_Stream, resultMessage);
        }
+}
+
+bool JsonRpcConnection::ProcessMessage(void)
+{
+       Dictionary::Ptr message;
+
+       StreamReadStatus srs = JsonRpc::ReadMessage(m_Stream, &message, m_Context, false);
+
+       if (srs != StatusNewItem)
+               return false;
+
+       l_JsonRpcConnectionWorkQueues[m_ID % l_JsonRpcConnectionWorkQueueCount].Enqueue(boost::bind(&JsonRpcConnection::MessageHandler, JsonRpcConnection::Ptr(this), message));
 
        return true;
 }
index 1e514e8b499189c4676d09d3a7cb9905f6203762..258eb8e8214f0c37df657b1308b7a5196b9cc525 100644 (file)
@@ -72,6 +72,7 @@ public:
        static Value HeartbeatAPIHandler(const intrusive_ptr<MessageOrigin>& origin, const Dictionary::Ptr& params);
 
 private:
+       int m_ID;
        String m_Identity;
        bool m_Authenticated;
        Endpoint::Ptr m_Endpoint;
@@ -86,6 +87,7 @@ private:
        StreamReadContext m_Context;
 
        bool ProcessMessage(void);
+       void MessageHandler(const Dictionary::Ptr& message);
        void DataAvailableHandler(void);
 
        static void StaticInitialize(void);