]> granicus.if.org Git - icinga2/commitdiff
Decode cluster messages in the WorkQueue threads
authorGunnar Beutner <gunnar@beutner.name>
Wed, 27 Jan 2016 15:43:23 +0000 (16:43 +0100)
committerGunnar Beutner <gunnar@beutner.name>
Wed, 27 Jan 2016 15:52:01 +0000 (16:52 +0100)
refs #11014

lib/cli/pkiutility.cpp
lib/remote/jsonrpc.cpp
lib/remote/jsonrpc.hpp
lib/remote/jsonrpcconnection.cpp
lib/remote/jsonrpcconnection.hpp

index 60146939150e568e1d13a60fba61c350fdc3efc5..3fc38c9e690f1d6bbc2ae909b3f76719655c1b4e 100644 (file)
@@ -232,11 +232,12 @@ int PkiUtility::RequestCertificate(const String& host, const String& port, const
 
        JsonRpc::SendMessage(stream, request);
 
+       String jsonString;
        Dictionary::Ptr response;
        StreamReadContext src;
 
        for (;;) {
-               StreamReadStatus srs = JsonRpc::ReadMessage(stream, &response, src);
+               StreamReadStatus srs = JsonRpc::ReadMessage(stream, &jsonString, src);
 
                if (srs == StatusEof)
                        break;
@@ -244,6 +245,8 @@ int PkiUtility::RequestCertificate(const String& host, const String& port, const
                if (srs != StatusNewItem)
                        continue;
 
+               response = JsonRpc::DecodeMessage(jsonString);
+
                if (response && response->Contains("error")) {
                        Log(LogCritical, "cli", "Could not fetch valid response. Please check the master log (notice or debug).");
 #ifdef I2_DEBUG
index e4b852825d1b03756320c53d216731299c3266fd..efba6dbfaf326146e7b48fd4c2d5a90037af0c38 100644 (file)
@@ -34,7 +34,7 @@ void JsonRpc::SendMessage(const Stream::Ptr& stream, const Dictionary::Ptr& mess
        NetString::WriteStringToStream(stream, json);
 }
 
-StreamReadStatus JsonRpc::ReadMessage(const Stream::Ptr& stream, Dictionary::Ptr *message, StreamReadContext& src, bool may_wait)
+StreamReadStatus JsonRpc::ReadMessage(const Stream::Ptr& stream, String *message, StreamReadContext& src, bool may_wait)
 {
        String jsonString;
        StreamReadStatus srs = NetString::ReadStringFromStream(stream, &jsonString, src, may_wait);
@@ -42,15 +42,19 @@ StreamReadStatus JsonRpc::ReadMessage(const Stream::Ptr& stream, Dictionary::Ptr
        if (srs != StatusNewItem)
                return srs;
 
-       Value value = JsonDecode(jsonString);
+       *message = jsonString;
+
+       return StatusNewItem;
+}
+
+Dictionary::Ptr JsonRpc::DecodeMessage(const String& message)
+{
+       Value value = JsonDecode(message);
 
        if (!value.IsObjectType<Dictionary>()) {
                BOOST_THROW_EXCEPTION(std::invalid_argument("JSON-RPC"
                    " message must be a dictionary."));
        }
 
-       *message = value;
-
-       return StatusNewItem;
+       return value;
 }
-
index cf8883b5ec29bdc3041933d1c7ffa265bc8d1522..cf525808d775314fe1f1d75ce53b371045760805 100644 (file)
@@ -36,7 +36,8 @@ class I2_REMOTE_API JsonRpc
 {
 public:
        static void SendMessage(const Stream::Ptr& stream, const Dictionary::Ptr& message);
-       static StreamReadStatus ReadMessage(const Stream::Ptr& stream, Dictionary::Ptr *message, StreamReadContext& src, bool may_wait = false);
+       static StreamReadStatus ReadMessage(const Stream::Ptr& stream, String *message, StreamReadContext& src, bool may_wait = false);
+       static Dictionary::Ptr DecodeMessage(const String& message);
 
 private:
        JsonRpc(void);
index 8d2a6103dd4f006668635d9947af8a61b7d2dd21..00c953c444f9f6e9ba24c5510324fda54d92dbd6 100644 (file)
@@ -134,8 +134,28 @@ void JsonRpcConnection::Disconnect(void)
        }
 }
 
-void JsonRpcConnection::MessageHandler(const Dictionary::Ptr& message)
+void JsonRpcConnection::MessageHandlerWrapper(const String& jsonString)
 {
+       if (m_Stream->IsEof())
+               return;
+
+       try {
+               MessageHandler(jsonString);
+       } catch (const std::exception& ex) {
+               Log(LogWarning, "JsonRpcConnection")
+                   << "Error while reading JSON-RPC message for identity '" << m_Identity
+                   << "': " << DiagnosticInformation(ex);
+
+               Disconnect();
+
+               return;
+       }
+}
+
+void JsonRpcConnection::MessageHandler(const String& jsonString)
+{
+       Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString);
+
        m_Seen = Utility::GetTime();
 
        if (m_HeartbeatTimeout != 0)
@@ -193,14 +213,14 @@ void JsonRpcConnection::MessageHandler(const Dictionary::Ptr& message)
 
 bool JsonRpcConnection::ProcessMessage(void)
 {
-       Dictionary::Ptr message;
+       String message;
 
        StreamReadStatus srs = JsonRpc::ReadMessage(m_Stream, &message, m_Context, false);
 
        if (srs != StatusNewItem)
                return false;
 
-       l_JsonRpcConnectionWorkQueues[m_ID % l_JsonRpcConnectionWorkQueueCount].Enqueue(boost::bind(&JsonRpcConnection::MessageHandler, JsonRpcConnection::Ptr(this), message));
+       l_JsonRpcConnectionWorkQueues[m_ID % l_JsonRpcConnectionWorkQueueCount].Enqueue(boost::bind(&JsonRpcConnection::MessageHandlerWrapper, JsonRpcConnection::Ptr(this), message));
 
        return true;
 }
index 258eb8e8214f0c37df657b1308b7a5196b9cc525..13d43691b7ff6c5064b45e053310528c31c827b0 100644 (file)
@@ -87,7 +87,8 @@ private:
        StreamReadContext m_Context;
 
        bool ProcessMessage(void);
-       void MessageHandler(const Dictionary::Ptr& message);
+       void MessageHandlerWrapper(const String& jsonString);
+       void MessageHandler(const String& jsonString);
        void DataAvailableHandler(void);
 
        static void StaticInitialize(void);