]> granicus.if.org Git - icinga2/commitdiff
cluster: Send heartbeats.
authorGunnar Beutner <gunnar.beutner@netways.de>
Fri, 30 Aug 2013 07:34:58 +0000 (09:34 +0200)
committerGunnar Beutner <gunnar.beutner@netways.de>
Fri, 30 Aug 2013 07:34:58 +0000 (09:34 +0200)
components/cluster/clustercomponent.cpp
components/cluster/clustercomponent.h
components/cluster/endpoint.cpp
components/cluster/endpoint.h

index c3390cd7b37bcf6193a8200c9e74c0bf8c49af77..7be73d26adab81ecf73b50a48737af3cfe4fd669 100644 (file)
@@ -47,10 +47,10 @@ void ClusterComponent::Start(void)
        if (!GetBindPort().IsEmpty())
                AddListener(GetBindPort());
 
-       m_ReconnectTimer = boost::make_shared<Timer>();
-       m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&ClusterComponent::ReconnectTimerHandler, this));
-       m_ReconnectTimer->SetInterval(5);
-       m_ReconnectTimer->Start();
+       m_ClusterTimer = boost::make_shared<Timer>();
+       m_ClusterTimer->OnTimerExpired.connect(boost::bind(&ClusterComponent::ClusterTimerHandler, this));
+       m_ClusterTimer->SetInterval(5);
+       m_ClusterTimer->Start();
 
        Service::OnNewCheckResult.connect(bind(&ClusterComponent::CheckResultHandler, this, _1, _2, _3));
        Service::OnNextCheckChanged.connect(bind(&ClusterComponent::NextCheckChangedHandler, this, _1, _2, _3));
@@ -226,8 +226,17 @@ void ClusterComponent::NewClientHandler(const Socket::Ptr& client, TlsRole role)
        endpoint->SetClient(tlsStream);
 }
 
-void ClusterComponent::ReconnectTimerHandler(void)
+void ClusterComponent::ClusterTimerHandler(void)
 {
+       /* broadcast a heartbeat message */
+       Dictionary::Ptr message = boost::make_shared<Dictionary>();
+       message->Set("jsonrpc", "2.0");
+       message->Set("method", "cluster::HeartBeat");
+
+       BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
+               endpoint->SendMessage(message);
+       }
+
        Array::Ptr peers = GetPeers();
 
        if (!peers)
@@ -553,6 +562,11 @@ void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Diction
                        endpoint->SendMessage(message);
        }
 
+       if (message->Get("method") == "cluster::HeartBeat") {
+               sender->SetSeen(Utility::GetTime());
+               return;
+       }
+
        Dictionary::Ptr params = message->Get("params");
 
        if (!params)
index e33bc86d2cbcffed943baf24f5d5fbb813e1f9b0..7254feb0348e397b72495e68d738631d545f2711 100644 (file)
@@ -68,8 +68,8 @@ private:
        shared_ptr<SSL_CTX> m_SSLContext;
        String m_Identity;
 
-       Timer::Ptr m_ReconnectTimer;
-       void ReconnectTimerHandler(void);
+       Timer::Ptr m_ClusterTimer;
+       void ClusterTimerHandler(void);
 
        std::set<TcpSocket::Ptr> m_Servers;
 
index 7c51e6dc5ae638e74dcf9562edd131a4b67f2cc4..5dee9254de2dfdf9bfdc280ec59922c955f11d09 100644 (file)
@@ -126,6 +126,16 @@ String Endpoint::GetPort(void) const
        return m_Port;
 }
 
+double Endpoint::GetSeen(void) const
+{
+       return m_Seen;
+}
+
+void Endpoint::SetSeen(double ts)
+{
+       m_Seen = ts;
+}
+
 void Endpoint::InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) const
 {
        DynamicObject::InternalSerialize(bag, attributeTypes);
@@ -134,6 +144,9 @@ void Endpoint::InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes)
                bag->Set("host", m_Host);
                bag->Set("port", m_Port);
        }
+
+       if (attributeTypes & Attribute_State)
+               bag->Set("seen", m_Seen);
 }
 
 void Endpoint::InternalDeserialize(const Dictionary::Ptr& bag, int attributeTypes)
@@ -144,4 +157,7 @@ void Endpoint::InternalDeserialize(const Dictionary::Ptr& bag, int attributeType
                m_Host = bag->Get("host");
                m_Port = bag->Get("port");
        }
+
+       if (attributeTypes & Attribute_State)
+               m_Seen = bag->Get("seen");
 }
index a7344e7006e202d6da59b1099ec528942d402a52..b52dba08ce5b58e991ad2a904cfd5df76b33f308 100644 (file)
@@ -53,6 +53,9 @@ public:
        String GetHost(void) const;
        String GetPort(void) const;
 
+       double GetSeen(void) const;
+       void SetSeen(double ts);
+
 protected:
        virtual void InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) const;
        virtual void InternalDeserialize(const Dictionary::Ptr& bag, int attributeTypes);
@@ -63,6 +66,7 @@ private:
        String m_Port;
 
        Stream::Ptr m_Client;
+       double m_Seen;
 
        void MessageThreadProc(const Stream::Ptr& stream);
 };