#include "base/configtype.hpp"
#include "base/logger.hpp"
#include "base/utility.hpp"
+#include <boost/asio/deadline_timer.hpp>
+#include <boost/asio/spawn.hpp>
+#include <boost/date_time/posix_time/posix_time_duration.hpp>
using namespace icinga;
REGISTER_APIFUNCTION(Heartbeat, event, &JsonRpcConnection::HeartbeatAPIHandler);
+void JsonRpcConnection::HandleAndWriteHeartbeats(boost::asio::yield_context yc)
+{
+ boost::asio::deadline_timer timer (m_Stream->get_io_service());
+
+ for (;;) {
+ timer.expires_from_now(boost::posix_time::seconds(10));
+ timer.async_wait(yc);
+
+ if (m_ShuttingDown) {
+ break;
+ }
+
+ if (m_NextHeartbeat != 0 && m_NextHeartbeat < Utility::GetTime()) {
+ Log(LogWarning, "JsonRpcConnection")
+ << "Client for endpoint '" << m_Endpoint->GetName() << "' has requested "
+ << "heartbeat message but hasn't responded in time. Closing connection.";
+
+ Disconnect();
+ break;
+ }
+
+ m_OutgoingMessagesQueue.emplace_back(new Dictionary({
+ { "jsonrpc", "2.0" },
+ { "method", "event::Heartbeat" },
+ { "params", new Dictionary({
+ { "timeout", 120 }
+ }) }
+ }));
+
+ m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin);
+ }
+}
+
Value JsonRpcConnection::HeartbeatAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params)
{
+ Value vtimeout = params->Get("timeout");
+
+ if (!vtimeout.IsEmpty()) {
+ origin->FromClient->m_NextHeartbeat = Utility::GetTime() + vtimeout;
+ }
+
return Empty;
}
JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated,
const std::shared_ptr<AsioTlsStream>& stream, ConnectionRole role)
: m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream),
- m_Role(role), m_Timestamp(Utility::GetTime()), m_IoStrand(stream->get_io_service()),
+ m_Role(role), m_Timestamp(Utility::GetTime()), m_NextHeartbeat(0), m_IoStrand(stream->get_io_service()),
m_OutgoingMessagesQueued(stream->get_io_service()), m_WriterDone(stream->get_io_service()), m_ShuttingDown(false)
{
if (authenticated)
asio::spawn(m_IoStrand, [this, preventGc](asio::yield_context yc) { HandleIncomingMessages(yc); });
asio::spawn(m_IoStrand, [this, preventGc](asio::yield_context yc) { WriteOutgoingMessages(yc); });
+ asio::spawn(m_IoStrand, [this, preventGc](asio::yield_context yc) { HandleAndWriteHeartbeats(yc); });
}
void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
std::shared_ptr<AsioTlsStream> m_Stream;
ConnectionRole m_Role;
double m_Timestamp;
+ double m_NextHeartbeat;
boost::asio::io_service::strand m_IoStrand;
std::vector<Dictionary::Ptr> m_OutgoingMessagesQueue;
boost::asio::deadline_timer m_OutgoingMessagesQueued;
void HandleIncomingMessages(boost::asio::yield_context yc);
void WriteOutgoingMessages(boost::asio::yield_context yc);
+ void HandleAndWriteHeartbeats(boost::asio::yield_context yc);
bool ProcessMessage();
void MessageHandler(const String& jsonString);