JsonRpc::SendMessage(stream, request);
+ String jsonString;
Dictionary::Ptr response;
StreamReadContext src;
for (;;) {
- StreamReadStatus srs = JsonRpc::ReadMessage(stream, &response, src);
+ StreamReadStatus srs = JsonRpc::ReadMessage(stream, &jsonString, src);
if (srs == StatusEof)
break;
if (srs != StatusNewItem)
continue;
+ response = JsonRpc::DecodeMessage(jsonString);
+
if (response && response->Contains("error")) {
Log(LogCritical, "cli", "Could not fetch valid response. Please check the master log (notice or debug).");
#ifdef I2_DEBUG
NetString::WriteStringToStream(stream, json);
}
-StreamReadStatus JsonRpc::ReadMessage(const Stream::Ptr& stream, Dictionary::Ptr *message, StreamReadContext& src, bool may_wait)
+StreamReadStatus JsonRpc::ReadMessage(const Stream::Ptr& stream, String *message, StreamReadContext& src, bool may_wait)
{
String jsonString;
StreamReadStatus srs = NetString::ReadStringFromStream(stream, &jsonString, src, may_wait);
if (srs != StatusNewItem)
return srs;
- Value value = JsonDecode(jsonString);
+ *message = jsonString;
+
+ return StatusNewItem;
+}
+
+Dictionary::Ptr JsonRpc::DecodeMessage(const String& message)
+{
+ Value value = JsonDecode(message);
if (!value.IsObjectType<Dictionary>()) {
BOOST_THROW_EXCEPTION(std::invalid_argument("JSON-RPC"
" message must be a dictionary."));
}
- *message = value;
-
- return StatusNewItem;
+ return value;
}
-
{
public:
static void SendMessage(const Stream::Ptr& stream, const Dictionary::Ptr& message);
- static StreamReadStatus ReadMessage(const Stream::Ptr& stream, Dictionary::Ptr *message, StreamReadContext& src, bool may_wait = false);
+ static StreamReadStatus ReadMessage(const Stream::Ptr& stream, String *message, StreamReadContext& src, bool may_wait = false);
+ static Dictionary::Ptr DecodeMessage(const String& message);
private:
JsonRpc(void);
}
}
-void JsonRpcConnection::MessageHandler(const Dictionary::Ptr& message)
+void JsonRpcConnection::MessageHandlerWrapper(const String& jsonString)
{
+ if (m_Stream->IsEof())
+ return;
+
+ try {
+ MessageHandler(jsonString);
+ } catch (const std::exception& ex) {
+ Log(LogWarning, "JsonRpcConnection")
+ << "Error while reading JSON-RPC message for identity '" << m_Identity
+ << "': " << DiagnosticInformation(ex);
+
+ Disconnect();
+
+ return;
+ }
+}
+
+void JsonRpcConnection::MessageHandler(const String& jsonString)
+{
+ Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString);
+
m_Seen = Utility::GetTime();
if (m_HeartbeatTimeout != 0)
bool JsonRpcConnection::ProcessMessage(void)
{
- Dictionary::Ptr message;
+ String message;
StreamReadStatus srs = JsonRpc::ReadMessage(m_Stream, &message, m_Context, false);
if (srs != StatusNewItem)
return false;
- l_JsonRpcConnectionWorkQueues[m_ID % l_JsonRpcConnectionWorkQueueCount].Enqueue(boost::bind(&JsonRpcConnection::MessageHandler, JsonRpcConnection::Ptr(this), message));
+ l_JsonRpcConnectionWorkQueues[m_ID % l_JsonRpcConnectionWorkQueueCount].Enqueue(boost::bind(&JsonRpcConnection::MessageHandlerWrapper, JsonRpcConnection::Ptr(this), message));
return true;
}
StreamReadContext m_Context;
bool ProcessMessage(void);
- void MessageHandler(const Dictionary::Ptr& message);
+ void MessageHandlerWrapper(const String& jsonString);
+ void MessageHandler(const String& jsonString);
void DataAvailableHandler(void);
static void StaticInitialize(void);