From fa7d0448f993d59226caaf37da8f8f1f620648fe Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Wed, 27 Jan 2016 16:43:23 +0100 Subject: [PATCH] Decode cluster messages in the WorkQueue threads refs #11014 --- lib/cli/pkiutility.cpp | 5 ++++- lib/remote/jsonrpc.cpp | 16 ++++++++++------ lib/remote/jsonrpc.hpp | 3 ++- lib/remote/jsonrpcconnection.cpp | 26 +++++++++++++++++++++++--- lib/remote/jsonrpcconnection.hpp | 3 ++- 5 files changed, 41 insertions(+), 12 deletions(-) diff --git a/lib/cli/pkiutility.cpp b/lib/cli/pkiutility.cpp index 601469391..3fc38c9e6 100644 --- a/lib/cli/pkiutility.cpp +++ b/lib/cli/pkiutility.cpp @@ -232,11 +232,12 @@ int PkiUtility::RequestCertificate(const String& host, const String& port, const JsonRpc::SendMessage(stream, request); + String jsonString; Dictionary::Ptr response; StreamReadContext src; for (;;) { - StreamReadStatus srs = JsonRpc::ReadMessage(stream, &response, src); + StreamReadStatus srs = JsonRpc::ReadMessage(stream, &jsonString, src); if (srs == StatusEof) break; @@ -244,6 +245,8 @@ int PkiUtility::RequestCertificate(const String& host, const String& port, const if (srs != StatusNewItem) continue; + response = JsonRpc::DecodeMessage(jsonString); + if (response && response->Contains("error")) { Log(LogCritical, "cli", "Could not fetch valid response. Please check the master log (notice or debug)."); #ifdef I2_DEBUG diff --git a/lib/remote/jsonrpc.cpp b/lib/remote/jsonrpc.cpp index e4b852825..efba6dbfa 100644 --- a/lib/remote/jsonrpc.cpp +++ b/lib/remote/jsonrpc.cpp @@ -34,7 +34,7 @@ void JsonRpc::SendMessage(const Stream::Ptr& stream, const Dictionary::Ptr& mess NetString::WriteStringToStream(stream, json); } -StreamReadStatus JsonRpc::ReadMessage(const Stream::Ptr& stream, Dictionary::Ptr *message, StreamReadContext& src, bool may_wait) +StreamReadStatus JsonRpc::ReadMessage(const Stream::Ptr& stream, String *message, StreamReadContext& src, bool may_wait) { String jsonString; StreamReadStatus srs = NetString::ReadStringFromStream(stream, &jsonString, src, may_wait); @@ -42,15 +42,19 @@ StreamReadStatus JsonRpc::ReadMessage(const Stream::Ptr& stream, Dictionary::Ptr if (srs != StatusNewItem) return srs; - Value value = JsonDecode(jsonString); + *message = jsonString; + + return StatusNewItem; +} + +Dictionary::Ptr JsonRpc::DecodeMessage(const String& message) +{ + Value value = JsonDecode(message); if (!value.IsObjectType()) { BOOST_THROW_EXCEPTION(std::invalid_argument("JSON-RPC" " message must be a dictionary.")); } - *message = value; - - return StatusNewItem; + return value; } - diff --git a/lib/remote/jsonrpc.hpp b/lib/remote/jsonrpc.hpp index cf8883b5e..cf525808d 100644 --- a/lib/remote/jsonrpc.hpp +++ b/lib/remote/jsonrpc.hpp @@ -36,7 +36,8 @@ class I2_REMOTE_API JsonRpc { public: static void SendMessage(const Stream::Ptr& stream, const Dictionary::Ptr& message); - static StreamReadStatus ReadMessage(const Stream::Ptr& stream, Dictionary::Ptr *message, StreamReadContext& src, bool may_wait = false); + static StreamReadStatus ReadMessage(const Stream::Ptr& stream, String *message, StreamReadContext& src, bool may_wait = false); + static Dictionary::Ptr DecodeMessage(const String& message); private: JsonRpc(void); diff --git a/lib/remote/jsonrpcconnection.cpp b/lib/remote/jsonrpcconnection.cpp index 8d2a6103d..00c953c44 100644 --- a/lib/remote/jsonrpcconnection.cpp +++ b/lib/remote/jsonrpcconnection.cpp @@ -134,8 +134,28 @@ void JsonRpcConnection::Disconnect(void) } } -void JsonRpcConnection::MessageHandler(const Dictionary::Ptr& message) +void JsonRpcConnection::MessageHandlerWrapper(const String& jsonString) { + if (m_Stream->IsEof()) + return; + + try { + MessageHandler(jsonString); + } catch (const std::exception& ex) { + Log(LogWarning, "JsonRpcConnection") + << "Error while reading JSON-RPC message for identity '" << m_Identity + << "': " << DiagnosticInformation(ex); + + Disconnect(); + + return; + } +} + +void JsonRpcConnection::MessageHandler(const String& jsonString) +{ + Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString); + m_Seen = Utility::GetTime(); if (m_HeartbeatTimeout != 0) @@ -193,14 +213,14 @@ void JsonRpcConnection::MessageHandler(const Dictionary::Ptr& message) bool JsonRpcConnection::ProcessMessage(void) { - Dictionary::Ptr message; + String message; StreamReadStatus srs = JsonRpc::ReadMessage(m_Stream, &message, m_Context, false); if (srs != StatusNewItem) return false; - l_JsonRpcConnectionWorkQueues[m_ID % l_JsonRpcConnectionWorkQueueCount].Enqueue(boost::bind(&JsonRpcConnection::MessageHandler, JsonRpcConnection::Ptr(this), message)); + l_JsonRpcConnectionWorkQueues[m_ID % l_JsonRpcConnectionWorkQueueCount].Enqueue(boost::bind(&JsonRpcConnection::MessageHandlerWrapper, JsonRpcConnection::Ptr(this), message)); return true; } diff --git a/lib/remote/jsonrpcconnection.hpp b/lib/remote/jsonrpcconnection.hpp index 258eb8e82..13d43691b 100644 --- a/lib/remote/jsonrpcconnection.hpp +++ b/lib/remote/jsonrpcconnection.hpp @@ -87,7 +87,8 @@ private: StreamReadContext m_Context; bool ProcessMessage(void); - void MessageHandler(const Dictionary::Ptr& message); + void MessageHandlerWrapper(const String& jsonString); + void MessageHandler(const String& jsonString); void DataAvailableHandler(void); static void StaticInitialize(void); -- 2.40.0