]> granicus.if.org Git - icinga2/commitdiff
Restore the previous performance of replaying logs
authorAlexander A. Klimov <alexander.klimov@icinga.com>
Tue, 26 Feb 2019 10:13:34 +0000 (11:13 +0100)
committerAlexander A. Klimov <alexander.klimov@icinga.com>
Mon, 1 Apr 2019 11:31:16 +0000 (13:31 +0200)
lib/remote/apilistener.cpp
lib/remote/jsonrpc.cpp
lib/remote/jsonrpc.hpp
lib/remote/jsonrpcconnection.cpp
lib/remote/jsonrpcconnection.hpp

index cf4de595266cdb215b46ac8cfe5ab274b75d8689..75a676143710c36060c00f54fa849748d5bada47 100644 (file)
@@ -1277,7 +1277,7 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
                                }
 
                                try  {
-                                       client->SendMessage(JsonDecode(pmessage->Get("message")));
+                                       client->SendRawMessage(pmessage->Get("message"));
                                        count++;
                                } catch (const std::exception& ex) {
                                        Log(LogWarning, "ApiListener")
index 5f4b6ae9180bf2f1eb4e70bb8d68c04ee9823b8c..03f3c7d0e08a7ea0e87ba0b5660a6cc463dbe06a 100644 (file)
@@ -68,8 +68,18 @@ size_t JsonRpc::SendMessage(const Stream::Ptr& stream, const Dictionary::Ptr& me
  */
 size_t JsonRpc::SendMessage(const std::shared_ptr<AsioTlsStream>& stream, const Dictionary::Ptr& message, boost::asio::yield_context yc)
 {
-       String json = JsonEncode(message);
+       return JsonRpc::SendRawMessage(stream, JsonEncode(message), yc);
+}
 
+/**
+ * Sends a message to the connected peer and returns the bytes sent.
+ *
+ * @param message The message.
+ *
+ * @return The amount of bytes sent.
+ */
+size_t JsonRpc::SendRawMessage(const std::shared_ptr<AsioTlsStream>& stream, const String& json, boost::asio::yield_context yc)
+{
 #ifdef I2_DEBUG
        if (GetDebugJsonRpcCached())
                std::cerr << ConsoleColorTag(Console_ForegroundBlue) << ">> " << json << ConsoleColorTag(Console_Normal) << "\n";
index 137d42a3b7b8326c84a64ddc144bf30d04e10681..faf9c07e8d212ba13fc5fe22d36fff7eee32231d 100644 (file)
@@ -23,6 +23,7 @@ class JsonRpc
 public:
        static size_t SendMessage(const Stream::Ptr& stream, const Dictionary::Ptr& message);
        static size_t SendMessage(const std::shared_ptr<AsioTlsStream>& stream, const Dictionary::Ptr& message, boost::asio::yield_context yc);
+       static size_t SendRawMessage(const std::shared_ptr<AsioTlsStream>& stream, const String& json, boost::asio::yield_context yc);
        static StreamReadStatus ReadMessage(const Stream::Ptr& stream, String *message, StreamReadContext& src, bool may_wait = false, ssize_t maxMessageLength = -1);
        static String ReadMessage(const std::shared_ptr<AsioTlsStream>& stream, boost::asio::yield_context yc, ssize_t maxMessageLength = -1);
        static Dictionary::Ptr DecodeMessage(const String& message);
index 16066a6aee3aced489481fafc9e5b28b9507a8a4..e54a998d08c3919be4aafd55f263336349237143 100644 (file)
@@ -6,6 +6,7 @@
 #include "remote/jsonrpc.hpp"
 #include "base/configtype.hpp"
 #include "base/io-engine.hpp"
+#include "base/json.hpp"
 #include "base/objectlock.hpp"
 #include "base/utility.hpp"
 #include "base/logger.hpp"
@@ -106,7 +107,7 @@ void JsonRpcConnection::WriteOutgoingMessages(boost::asio::yield_context yc)
                if (!queue.empty()) {
                        try {
                                for (auto& message : queue) {
-                                       size_t bytesSent = JsonRpc::SendMessage(m_Stream, message, yc);
+                                       size_t bytesSent = JsonRpc::SendRawMessage(m_Stream, message, yc);
 
                                        if (m_Endpoint) {
                                                m_Endpoint->AddMessageSent(bytesSent);
@@ -163,9 +164,17 @@ void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message)
        m_IoStrand.post([this, message]() { SendMessageInternal(message); });
 }
 
+void JsonRpcConnection::SendRawMessage(const String& message)
+{
+       m_IoStrand.post([this, message]() {
+               m_OutgoingMessagesQueue.emplace_back(message);
+               m_OutgoingMessagesQueued.Set();
+       });
+}
+
 void JsonRpcConnection::SendMessageInternal(const Dictionary::Ptr& message)
 {
-       m_OutgoingMessagesQueue.emplace_back(message);
+       m_OutgoingMessagesQueue.emplace_back(JsonEncode(message));
        m_OutgoingMessagesQueued.Set();
 }
 
index 8f48fc4cdff688315aa66fa0cd237b1e47db2cb6..994dd7368110aa5b668b03ad959b25aafcf9acd3 100644 (file)
@@ -55,6 +55,7 @@ public:
        void Disconnect();
 
        void SendMessage(const Dictionary::Ptr& request);
+       void SendRawMessage(const String& request);
 
        static Value HeartbeatAPIHandler(const intrusive_ptr<MessageOrigin>& origin, const Dictionary::Ptr& params);
 
@@ -72,7 +73,7 @@ private:
        double m_Seen;
        double m_NextHeartbeat;
        boost::asio::io_service::strand m_IoStrand;
-       std::vector<Dictionary::Ptr> m_OutgoingMessagesQueue;
+       std::vector<String> m_OutgoingMessagesQueue;
        AsioConditionVariable m_OutgoingMessagesQueued;
        AsioConditionVariable m_WriterDone;
        bool m_ShuttingDown;