From: Gunnar Beutner Date: Fri, 30 Aug 2013 07:34:58 +0000 (+0200) Subject: cluster: Send heartbeats. X-Git-Tag: v0.0.3~646 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=f3638877ebd037520650c6ba8e463c03515cc182;p=icinga2 cluster: Send heartbeats. --- diff --git a/components/cluster/clustercomponent.cpp b/components/cluster/clustercomponent.cpp index c3390cd7b..7be73d26a 100644 --- a/components/cluster/clustercomponent.cpp +++ b/components/cluster/clustercomponent.cpp @@ -47,10 +47,10 @@ void ClusterComponent::Start(void) if (!GetBindPort().IsEmpty()) AddListener(GetBindPort()); - m_ReconnectTimer = boost::make_shared(); - m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&ClusterComponent::ReconnectTimerHandler, this)); - m_ReconnectTimer->SetInterval(5); - m_ReconnectTimer->Start(); + m_ClusterTimer = boost::make_shared(); + m_ClusterTimer->OnTimerExpired.connect(boost::bind(&ClusterComponent::ClusterTimerHandler, this)); + m_ClusterTimer->SetInterval(5); + m_ClusterTimer->Start(); Service::OnNewCheckResult.connect(bind(&ClusterComponent::CheckResultHandler, this, _1, _2, _3)); Service::OnNextCheckChanged.connect(bind(&ClusterComponent::NextCheckChangedHandler, this, _1, _2, _3)); @@ -226,8 +226,17 @@ void ClusterComponent::NewClientHandler(const Socket::Ptr& client, TlsRole role) endpoint->SetClient(tlsStream); } -void ClusterComponent::ReconnectTimerHandler(void) +void ClusterComponent::ClusterTimerHandler(void) { + /* broadcast a heartbeat message */ + Dictionary::Ptr message = boost::make_shared(); + message->Set("jsonrpc", "2.0"); + message->Set("method", "cluster::HeartBeat"); + + BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { + endpoint->SendMessage(message); + } + Array::Ptr peers = GetPeers(); if (!peers) @@ -553,6 +562,11 @@ void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Diction endpoint->SendMessage(message); } + if (message->Get("method") == "cluster::HeartBeat") { + sender->SetSeen(Utility::GetTime()); + return; + } + Dictionary::Ptr params = message->Get("params"); if (!params) diff --git a/components/cluster/clustercomponent.h b/components/cluster/clustercomponent.h index e33bc86d2..7254feb03 100644 --- a/components/cluster/clustercomponent.h +++ b/components/cluster/clustercomponent.h @@ -68,8 +68,8 @@ private: shared_ptr m_SSLContext; String m_Identity; - Timer::Ptr m_ReconnectTimer; - void ReconnectTimerHandler(void); + Timer::Ptr m_ClusterTimer; + void ClusterTimerHandler(void); std::set m_Servers; diff --git a/components/cluster/endpoint.cpp b/components/cluster/endpoint.cpp index 7c51e6dc5..5dee9254d 100644 --- a/components/cluster/endpoint.cpp +++ b/components/cluster/endpoint.cpp @@ -126,6 +126,16 @@ String Endpoint::GetPort(void) const return m_Port; } +double Endpoint::GetSeen(void) const +{ + return m_Seen; +} + +void Endpoint::SetSeen(double ts) +{ + m_Seen = ts; +} + void Endpoint::InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) const { DynamicObject::InternalSerialize(bag, attributeTypes); @@ -134,6 +144,9 @@ void Endpoint::InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) bag->Set("host", m_Host); bag->Set("port", m_Port); } + + if (attributeTypes & Attribute_State) + bag->Set("seen", m_Seen); } void Endpoint::InternalDeserialize(const Dictionary::Ptr& bag, int attributeTypes) @@ -144,4 +157,7 @@ void Endpoint::InternalDeserialize(const Dictionary::Ptr& bag, int attributeType m_Host = bag->Get("host"); m_Port = bag->Get("port"); } + + if (attributeTypes & Attribute_State) + m_Seen = bag->Get("seen"); } diff --git a/components/cluster/endpoint.h b/components/cluster/endpoint.h index a7344e700..b52dba08c 100644 --- a/components/cluster/endpoint.h +++ b/components/cluster/endpoint.h @@ -53,6 +53,9 @@ public: String GetHost(void) const; String GetPort(void) const; + double GetSeen(void) const; + void SetSeen(double ts); + protected: virtual void InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) const; virtual void InternalDeserialize(const Dictionary::Ptr& bag, int attributeTypes); @@ -63,6 +66,7 @@ private: String m_Port; Stream::Ptr m_Client; + double m_Seen; void MessageThreadProc(const Stream::Ptr& stream); };