]> granicus.if.org Git - icinga2/commitdiff
JsonRpcConnection: re-add heartbeats
authorAlexander A. Klimov <alexander.klimov@icinga.com>
Wed, 20 Feb 2019 11:28:49 +0000 (12:28 +0100)
committerAlexander A. Klimov <alexander.klimov@icinga.com>
Mon, 1 Apr 2019 11:31:16 +0000 (13:31 +0200)
lib/remote/jsonrpcconnection-heartbeat.cpp
lib/remote/jsonrpcconnection.cpp
lib/remote/jsonrpcconnection.hpp

index 91fa518915edea16c3c9fe6ac3030f3bd8158396..569ffbb1cd46ad0309933d86072f8308f5c95159 100644 (file)
@@ -7,13 +7,55 @@
 #include "base/configtype.hpp"
 #include "base/logger.hpp"
 #include "base/utility.hpp"
+#include <boost/asio/deadline_timer.hpp>
+#include <boost/asio/spawn.hpp>
+#include <boost/date_time/posix_time/posix_time_duration.hpp>
 
 using namespace icinga;
 
 REGISTER_APIFUNCTION(Heartbeat, event, &JsonRpcConnection::HeartbeatAPIHandler);
 
+void JsonRpcConnection::HandleAndWriteHeartbeats(boost::asio::yield_context yc)
+{
+       boost::asio::deadline_timer timer (m_Stream->get_io_service());
+
+       for (;;) {
+               timer.expires_from_now(boost::posix_time::seconds(10));
+               timer.async_wait(yc);
+
+               if (m_ShuttingDown) {
+                       break;
+               }
+
+               if (m_NextHeartbeat != 0 && m_NextHeartbeat < Utility::GetTime()) {
+                       Log(LogWarning, "JsonRpcConnection")
+                               << "Client for endpoint '" << m_Endpoint->GetName() << "' has requested "
+                               << "heartbeat message but hasn't responded in time. Closing connection.";
+
+                       Disconnect();
+                       break;
+               }
+
+               m_OutgoingMessagesQueue.emplace_back(new Dictionary({
+                       { "jsonrpc", "2.0" },
+                       { "method", "event::Heartbeat" },
+                       { "params", new Dictionary({
+                               { "timeout", 120 }
+                       }) }
+               }));
+
+               m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin);
+       }
+}
+
 Value JsonRpcConnection::HeartbeatAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params)
 {
+       Value vtimeout = params->Get("timeout");
+
+       if (!vtimeout.IsEmpty()) {
+               origin->FromClient->m_NextHeartbeat = Utility::GetTime() + vtimeout;
+       }
+
        return Empty;
 }
 
index ff2e67ebf5f4e91ad8ece751234d4e9f6fb68c21..4a50ea20c5217a6f4b158af7c58afb051af8fe23 100644 (file)
@@ -26,7 +26,7 @@ REGISTER_APIFUNCTION(SetLogPosition, log, &SetLogPositionHandler);
 JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated,
        const std::shared_ptr<AsioTlsStream>& stream, ConnectionRole role)
        : m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream),
-       m_Role(role), m_Timestamp(Utility::GetTime()), m_IoStrand(stream->get_io_service()),
+       m_Role(role), m_Timestamp(Utility::GetTime()), m_NextHeartbeat(0), m_IoStrand(stream->get_io_service()),
        m_OutgoingMessagesQueued(stream->get_io_service()), m_WriterDone(stream->get_io_service()), m_ShuttingDown(false)
 {
        if (authenticated)
@@ -44,6 +44,7 @@ void JsonRpcConnection::Start()
 
        asio::spawn(m_IoStrand, [this, preventGc](asio::yield_context yc) { HandleIncomingMessages(yc); });
        asio::spawn(m_IoStrand, [this, preventGc](asio::yield_context yc) { WriteOutgoingMessages(yc); });
+       asio::spawn(m_IoStrand, [this, preventGc](asio::yield_context yc) { HandleAndWriteHeartbeats(yc); });
 }
 
 void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
index 51787244e992f2e511571ce1ec5c112f96ab6498..54a71b889252ae9a7be0c75310b34aa7f4e7e8bf 100644 (file)
@@ -67,6 +67,7 @@ private:
        std::shared_ptr<AsioTlsStream> m_Stream;
        ConnectionRole m_Role;
        double m_Timestamp;
+       double m_NextHeartbeat;
        boost::asio::io_service::strand m_IoStrand;
        std::vector<Dictionary::Ptr> m_OutgoingMessagesQueue;
        boost::asio::deadline_timer m_OutgoingMessagesQueued;
@@ -75,6 +76,7 @@ private:
 
        void HandleIncomingMessages(boost::asio::yield_context yc);
        void WriteOutgoingMessages(boost::asio::yield_context yc);
+       void HandleAndWriteHeartbeats(boost::asio::yield_context yc);
 
        bool ProcessMessage();
        void MessageHandler(const String& jsonString);