From d1b705613de8f37cd055047168531473557b978d Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Wed, 27 Jan 2016 15:45:58 +0100 Subject: [PATCH] Use multiple WorkQueues to process cluster messages refs #11014 --- lib/remote/jsonrpcconnection.cpp | 31 +++++++++++++++++++++---------- lib/remote/jsonrpcconnection.hpp | 2 ++ 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/lib/remote/jsonrpcconnection.cpp b/lib/remote/jsonrpcconnection.cpp index a209788eb..cce2b0e28 100644 --- a/lib/remote/jsonrpcconnection.cpp +++ b/lib/remote/jsonrpcconnection.cpp @@ -37,10 +37,13 @@ REGISTER_APIFUNCTION(RequestCertificate, pki, &RequestCertificateHandler); static boost::once_flag l_JsonRpcConnectionOnceFlag = BOOST_ONCE_INIT; static Timer::Ptr l_JsonRpcConnectionTimeoutTimer; +static WorkQueue *l_JsonRpcConnectionWorkQueues; +static size_t l_JsonRpcConnectionWorkQueueCount; +static int l_JsonRpcConnectionNextID; JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated, const TlsStream::Ptr& stream, ConnectionRole role) - : m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream), + : m_ID(l_JsonRpcConnectionNextID++), m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream), m_Role(role), m_Timestamp(Utility::GetTime()), m_Seen(Utility::GetTime()), m_NextHeartbeat(0), m_HeartbeatTimeout(0) { @@ -56,6 +59,9 @@ void JsonRpcConnection::StaticInitialize(void) l_JsonRpcConnectionTimeoutTimer->OnTimerExpired.connect(boost::bind(&JsonRpcConnection::TimeoutTimerHandler)); l_JsonRpcConnectionTimeoutTimer->SetInterval(15); l_JsonRpcConnectionTimeoutTimer->Start(); + + l_JsonRpcConnectionWorkQueueCount = Application::GetConcurrency(); + l_JsonRpcConnectionWorkQueues = new WorkQueue[l_JsonRpcConnectionWorkQueueCount]; } void JsonRpcConnection::Start(void) @@ -128,15 +134,8 @@ void JsonRpcConnection::Disconnect(void) } } -bool JsonRpcConnection::ProcessMessage(void) +void JsonRpcConnection::MessageHandler(const Dictionary::Ptr& message) { - Dictionary::Ptr message; - - StreamReadStatus srs = JsonRpc::ReadMessage(m_Stream, &message, m_Context, false); - - if (srs != StatusNewItem) - return false; - m_Seen = Utility::GetTime(); if (m_HeartbeatTimeout != 0) @@ -147,7 +146,7 @@ bool JsonRpcConnection::ProcessMessage(void) /* ignore old messages */ if (ts < m_Endpoint->GetRemoteLogPosition()) - return true; + return; m_Endpoint->SetRemoteLogPosition(ts); } @@ -190,6 +189,18 @@ bool JsonRpcConnection::ProcessMessage(void) resultMessage->Set("id", message->Get("id")); JsonRpc::SendMessage(m_Stream, resultMessage); } +} + +bool JsonRpcConnection::ProcessMessage(void) +{ + Dictionary::Ptr message; + + StreamReadStatus srs = JsonRpc::ReadMessage(m_Stream, &message, m_Context, false); + + if (srs != StatusNewItem) + return false; + + l_JsonRpcConnectionWorkQueues[m_ID % l_JsonRpcConnectionWorkQueueCount].Enqueue(boost::bind(&JsonRpcConnection::MessageHandler, JsonRpcConnection::Ptr(this), message)); return true; } diff --git a/lib/remote/jsonrpcconnection.hpp b/lib/remote/jsonrpcconnection.hpp index 1e514e8b4..258eb8e82 100644 --- a/lib/remote/jsonrpcconnection.hpp +++ b/lib/remote/jsonrpcconnection.hpp @@ -72,6 +72,7 @@ public: static Value HeartbeatAPIHandler(const intrusive_ptr& origin, const Dictionary::Ptr& params); private: + int m_ID; String m_Identity; bool m_Authenticated; Endpoint::Ptr m_Endpoint; @@ -86,6 +87,7 @@ private: StreamReadContext m_Context; bool ProcessMessage(void); + void MessageHandler(const Dictionary::Ptr& message); void DataAvailableHandler(void); static void StaticInitialize(void); -- 2.40.0