static boost::once_flag l_JsonRpcConnectionOnceFlag = BOOST_ONCE_INIT;
static Timer::Ptr l_JsonRpcConnectionTimeoutTimer;
+static WorkQueue *l_JsonRpcConnectionWorkQueues;
+static size_t l_JsonRpcConnectionWorkQueueCount;
+static int l_JsonRpcConnectionNextID;
JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated,
const TlsStream::Ptr& stream, ConnectionRole role)
- : m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream),
+ : m_ID(l_JsonRpcConnectionNextID++), m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream),
m_Role(role), m_Timestamp(Utility::GetTime()), m_Seen(Utility::GetTime()),
m_NextHeartbeat(0), m_HeartbeatTimeout(0)
{
l_JsonRpcConnectionTimeoutTimer->OnTimerExpired.connect(boost::bind(&JsonRpcConnection::TimeoutTimerHandler));
l_JsonRpcConnectionTimeoutTimer->SetInterval(15);
l_JsonRpcConnectionTimeoutTimer->Start();
+
+ l_JsonRpcConnectionWorkQueueCount = Application::GetConcurrency();
+ l_JsonRpcConnectionWorkQueues = new WorkQueue[l_JsonRpcConnectionWorkQueueCount];
}
void JsonRpcConnection::Start(void)
}
}
-bool JsonRpcConnection::ProcessMessage(void)
+void JsonRpcConnection::MessageHandler(const Dictionary::Ptr& message)
{
- Dictionary::Ptr message;
-
- StreamReadStatus srs = JsonRpc::ReadMessage(m_Stream, &message, m_Context, false);
-
- if (srs != StatusNewItem)
- return false;
-
m_Seen = Utility::GetTime();
if (m_HeartbeatTimeout != 0)
/* ignore old messages */
if (ts < m_Endpoint->GetRemoteLogPosition())
- return true;
+ return;
m_Endpoint->SetRemoteLogPosition(ts);
}
resultMessage->Set("id", message->Get("id"));
JsonRpc::SendMessage(m_Stream, resultMessage);
}
+}
+
+bool JsonRpcConnection::ProcessMessage(void)
+{
+ Dictionary::Ptr message;
+
+ StreamReadStatus srs = JsonRpc::ReadMessage(m_Stream, &message, m_Context, false);
+
+ if (srs != StatusNewItem)
+ return false;
+
+ l_JsonRpcConnectionWorkQueues[m_ID % l_JsonRpcConnectionWorkQueueCount].Enqueue(boost::bind(&JsonRpcConnection::MessageHandler, JsonRpcConnection::Ptr(this), message));
return true;
}