]> granicus.if.org Git - icinga2/commitdiff
Implement loop detection for cluster links.
authorGunnar Beutner <gunnar.beutner@netways.de>
Fri, 25 Apr 2014 12:33:45 +0000 (14:33 +0200)
committerGunnar Beutner <gunnar.beutner@netways.de>
Fri, 25 Apr 2014 13:07:13 +0000 (15:07 +0200)
Refs #5467

12 files changed:
components/cluster/CMakeLists.txt
components/cluster/clusterlink.cpp [new file with mode: 0644]
components/cluster/clusterlink.h [new file with mode: 0644]
components/cluster/clusterlistener.cpp
components/cluster/clusterlistener.h
doc/4.3-object-types.md
lib/base/tlsstream.cpp
lib/remote/endpoint.cpp
lib/remote/endpoint.h
lib/remote/endpoint.ti
lib/remote/jsonrpc.cpp
lib/remote/remote-type.conf

index 387873957559a9fe38a2d577b62b3c5b288a0c04..e592754a0cd93188ae2ab062cbef4d5f16ee09c5 100644 (file)
@@ -20,7 +20,7 @@ mkclass_target(clusterlistener.ti clusterlistener.th)
 mkembedconfig_target(cluster-type.conf cluster-type.cpp)
 
 add_library(cluster SHARED
-  clusterchecktask.cpp clusterlistener.cpp clusterlistener.th
+  clusterchecktask.cpp clusterlink.cpp clusterlistener.cpp clusterlistener.th
   cluster-type.cpp
 )
 
diff --git a/components/cluster/clusterlink.cpp b/components/cluster/clusterlink.cpp
new file mode 100644 (file)
index 0000000..b52ad8b
--- /dev/null
@@ -0,0 +1,69 @@
+/******************************************************************************
+ * Icinga 2                                                                   *
+ * Copyright (C) 2012-2014 Icinga Development Team (http://www.icinga.org)    *
+ *                                                                            *
+ * This program is free software; you can redistribute it and/or              *
+ * modify it under the terms of the GNU General Public License                *
+ * as published by the Free Software Foundation; either version 2             *
+ * of the License, or (at your option) any later version.                     *
+ *                                                                            *
+ * This program is distributed in the hope that it will be useful,            *
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
+ * GNU General Public License for more details.                               *
+ *                                                                            *
+ * You should have received a copy of the GNU General Public License          *
+ * along with this program; if not, write to the Free Software Foundation     *
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
+ ******************************************************************************/
+
+#include "cluster/clusterlink.h"
+
+using namespace icinga;
+
+ClusterLink::ClusterLink(const String& from, const String& to)
+{
+       if (from < to) {
+               From = from;
+               To = to;
+       } else {
+               From = to;
+               To = from;
+       }
+}
+
+int ClusterLink::GetMetric(void) const
+{
+       int metric = 0;
+
+       Endpoint::Ptr fromEp = Endpoint::GetByName(From);
+       if (fromEp)
+               metric += fromEp->GetMetric();
+
+       Endpoint::Ptr toEp = Endpoint::GetByName(To);
+       if (toEp)
+               metric += toEp->GetMetric();
+
+       return metric;
+}
+
+bool ClusterLink::operator<(const ClusterLink& other) const
+{
+       if (From < other.From)
+               return true;
+       else
+               return To < other.To;
+}
+
+bool ClusterLinkMetricLessComparer::operator()(const ClusterLink& a, const ClusterLink& b) const
+{
+       int metricA = a.GetMetric();
+       int metricB = b.GetMetric();
+
+       if (metricA < metricB)
+               return true;
+       else if (metricB > metricA)
+               return false;
+       else
+               return a < b;
+}
diff --git a/components/cluster/clusterlink.h b/components/cluster/clusterlink.h
new file mode 100644 (file)
index 0000000..9c9e744
--- /dev/null
@@ -0,0 +1,49 @@
+/******************************************************************************
+ * Icinga 2                                                                   *
+ * Copyright (C) 2012-2014 Icinga Development Team (http://www.icinga.org)    *
+ *                                                                            *
+ * This program is free software; you can redistribute it and/or              *
+ * modify it under the terms of the GNU General Public License                *
+ * as published by the Free Software Foundation; either version 2             *
+ * of the License, or (at your option) any later version.                     *
+ *                                                                            *
+ * This program is distributed in the hope that it will be useful,            *
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
+ * GNU General Public License for more details.                               *
+ *                                                                            *
+ * You should have received a copy of the GNU General Public License          *
+ * along with this program; if not, write to the Free Software Foundation     *
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
+ ******************************************************************************/
+
+#ifndef CLUSTERLINK_H
+#define CLUSTERLINK_H
+
+#include "remote/endpoint.h"
+
+namespace icinga
+{
+
+/**
+ * @ingroup cluster
+ */
+struct ClusterLink
+{
+       String From;
+       String To;
+
+       ClusterLink(const String& from, const String& to);
+
+       int GetMetric(void) const;
+       bool operator<(const ClusterLink& other) const;
+};
+
+struct ClusterLinkMetricLessComparer
+{
+       bool operator()(const ClusterLink& a, const ClusterLink& b) const;
+};
+
+}
+
+#endif /* CLUSTERLINK_H */
index d7f2f8648afffa53a10a5fd059243da2696c7809..16501ff9b205985b64db3f26cb365175ffacf5ba 100644 (file)
@@ -222,9 +222,9 @@ void ClusterListener::AddConnection(const String& node, const String& service) {
        Utility::QueueAsyncCallback(boost::bind(&ClusterListener::NewClientHandler, this, client, TlsRoleClient));
 }
 
-void ClusterListener::AsyncRelayMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message, bool persistent)
+void ClusterListener::AsyncRelayMessage(const Endpoint::Ptr& source, const Endpoint::Ptr& destination, const Dictionary::Ptr& message, bool persistent)
 {
-       m_RelayQueue.Enqueue(boost::bind(&ClusterListener::RelayMessage, this, source, message, persistent));
+       m_RelayQueue.Enqueue(boost::bind(&ClusterListener::RelayMessage, this, source, destination, message, persistent));
 }
 
 void ClusterListener::PersistMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message)
@@ -256,7 +256,7 @@ void ClusterListener::PersistMessage(const Endpoint::Ptr& source, const Dictiona
        }
 }
 
-void ClusterListener::RelayMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message, bool persistent)
+void ClusterListener::RelayMessage(const Endpoint::Ptr& source, const Endpoint::Ptr& destination, const Dictionary::Ptr& message, bool persistent)
 {
        double ts = Utility::GetTime();
        message->Set("ts", ts);
@@ -288,8 +288,16 @@ void ClusterListener::RelayMessage(const Endpoint::Ptr& source, const Dictionary
                privs = security->Get("privs");
        }
 
+       double now = Utility::GetTime();
+
        BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
-               if (!persistent && !endpoint->IsConnected())
+               if (!endpoint->IsConnected())
+                       continue;
+
+               if (destination && endpoint != destination)
+                       continue;
+
+               if (!destination && endpoint->GetBlockedUntil() > now)
                        continue;
 
                if (endpoint == source)
@@ -532,8 +540,6 @@ void ClusterListener::NewClientHandler(const Socket::Ptr& client, TlsRole role)
        shared_ptr<X509> cert = tlsStream->GetPeerCertificate();
        String identity = GetCertificateCN(cert);
 
-       Log(LogInformation, "cluster", "New client connection for identity '" + identity + "'");
-
        Endpoint::Ptr endpoint = Endpoint::GetByName(identity);
 
        if (!endpoint) {
@@ -542,6 +548,13 @@ void ClusterListener::NewClientHandler(const Socket::Ptr& client, TlsRole role)
                return;
        }
 
+       if (endpoint->GetClient()) {
+               tlsStream->Close();
+               return;
+       }
+
+       Log(LogInformation, "cluster", "New client connection for identity '" + identity + "'");
+
        {
                ObjectLock olock(endpoint);
 
@@ -593,26 +606,109 @@ void ClusterListener::NewClientHandler(const Socket::Ptr& client, TlsRole role)
        ReplayLog(endpoint, tlsStream);
 }
 
+void ClusterListener::UpdateLinks(void)
+{
+       ObjectLock olock(this);
+       /* build a set of potential routes */
+       std::set<ClusterLink> links;
+       std::pair<String, EndpointPeerInfo> kv;
+       BOOST_FOREACH(kv, m_VisibleEndpoints) {
+               String endpoint = kv.first;
+               const EndpointPeerInfo& epi = kv.second;
+
+               if (GetIdentity() == endpoint)
+                       continue;
+
+               if (epi.Seen > Utility::GetTime() - 30)
+                       links.insert(ClusterLink(GetIdentity(), endpoint));
+
+               if (!epi.Peers)
+                       continue;
+
+               ObjectLock olock(epi.Peers);
+               BOOST_FOREACH(const String& peer, epi.Peers)
+                       links.insert(ClusterLink(endpoint, peer));
+       }
+       olock.Unlock();
+
+       /* sort the routes by metric */
+       std::vector<ClusterLink> sortedLinks;
+       std::copy(links.begin(), links.end(), std::back_inserter(sortedLinks));
+       std::sort(sortedLinks.begin(), sortedLinks.end(), ClusterLinkMetricLessComparer());
+
+       /* pick routes */
+       std::set<String> visitedEndpoints;
+       BOOST_FOREACH(const ClusterLink& link, sortedLinks) {
+               Endpoint::Ptr other;
+
+               if (link.From == GetIdentity())
+                       other = Endpoint::GetByName(link.To);
+               else if (link.To == GetIdentity())
+                       other = Endpoint::GetByName(link.From);
+
+               if (visitedEndpoints.find(link.From) != visitedEndpoints.end() &&
+                   visitedEndpoints.find(link.To) != visitedEndpoints.end()) {
+                       if (other) {
+                               Log(LogInformation, "cluster", "Blocking link to '" + other->GetName() + "'");
+
+                               Dictionary::Ptr message = make_shared<Dictionary>();
+                               message->Set("jsonrpc", "2.0");
+                               message->Set("method", "cluster::BlockLink");
+                               message->Set("params", make_shared<Dictionary>());
+
+                               AsyncRelayMessage(Endpoint::Ptr(), other, message, false);
+                       }
+
+                       continue;
+               }
+
+               visitedEndpoints.insert(link.From);
+               visitedEndpoints.insert(link.To);
+       }
+}
+
 void ClusterListener::ClusterTimerHandler(void)
 {
-       /* broadcast a heartbeat message */
-       Dictionary::Ptr params = make_shared<Dictionary>();
-       params->Set("identity", GetIdentity());
+       /* Update endpoint routes */
+       UpdateLinks();
 
        /* Eww. */
        Dictionary::Ptr features = make_shared<Dictionary>();
        features->Set("checker", SupportsChecks());
        features->Set("notification", SupportsNotifications());
-       params->Set("features", features);
 
-       Dictionary::Ptr message = make_shared<Dictionary>();
-       message->Set("jsonrpc", "2.0");
-       message->Set("method", "cluster::HeartBeat");
-       message->Set("params", params);
+       /* broadcast a heartbeat message */
+       BOOST_FOREACH(const Endpoint::Ptr& destination, DynamicType::GetObjects<Endpoint>()) {
+               std::set<String> connected_endpoints;
+
+               BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
+                       if (endpoint->GetName() == GetIdentity())
+                               continue;
+
+                       if (!endpoint->IsConnected())
+                               continue;
+
+                       connected_endpoints.insert(endpoint->GetName());
+               }
+
+               Array::Ptr epnames = make_shared<Array>();
+               BOOST_FOREACH(const String& name, connected_endpoints)
+                       epnames->Add(name);
 
-       Endpoint::GetByName(GetIdentity())->SetFeatures(features);
+               Dictionary::Ptr params = make_shared<Dictionary>();
+               params->Set("identity", GetIdentity());
+               params->Set("features", features);
+               params->Set("connected_endpoints", epnames);
 
-       AsyncRelayMessage(Endpoint::Ptr(), message, false);
+               Dictionary::Ptr message = make_shared<Dictionary>();
+               message->Set("jsonrpc", "2.0");
+               message->Set("method", "cluster::HeartBeat");
+               message->Set("params", params);
+
+               Endpoint::GetByName(GetIdentity())->SetFeatures(features);
+
+               AsyncRelayMessage(Endpoint::Ptr(), destination, message, false);
+       }
 
        {
                ObjectLock olock(this);
@@ -621,6 +717,8 @@ void ClusterListener::ClusterTimerHandler(void)
                        if (endpoint->GetSeen() > Utility::GetTime() - 60)
                                continue;
 
+                       m_VisibleEndpoints.erase(endpoint->GetName());
+
                        Stream::Ptr client = endpoint->GetClient();
 
                        if (client) {
@@ -727,7 +825,7 @@ void ClusterListener::CheckResultHandler(const Checkable::Ptr& checkable, const
 
        SetSecurityInfo(message, checkable, DomainPrivRead);
 
-       AsyncRelayMessage(Endpoint::Ptr(), message, true);
+       AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
 }
 
 void ClusterListener::NextCheckChangedHandler(const Checkable::Ptr& checkable, double nextCheck, const String& authority)
@@ -747,7 +845,7 @@ void ClusterListener::NextCheckChangedHandler(const Checkable::Ptr& checkable, d
 
        SetSecurityInfo(message, checkable, DomainPrivRead);
 
-       AsyncRelayMessage(Endpoint::Ptr(), message, true);
+       AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
 }
 
 void ClusterListener::NextNotificationChangedHandler(const Notification::Ptr& notification, double nextNotification, const String& authority)
@@ -766,7 +864,7 @@ void ClusterListener::NextNotificationChangedHandler(const Notification::Ptr& no
 
        SetSecurityInfo(message, notification->GetCheckable(), DomainPrivRead);
 
-       AsyncRelayMessage(Endpoint::Ptr(), message, true);
+       AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
 }
 
 void ClusterListener::ForceNextCheckChangedHandler(const Checkable::Ptr& checkable, bool forced, const String& authority)
@@ -786,7 +884,7 @@ void ClusterListener::ForceNextCheckChangedHandler(const Checkable::Ptr& checkab
 
        SetSecurityInfo(message, checkable, DomainPrivRead);
 
-       AsyncRelayMessage(Endpoint::Ptr(), message, true);
+       AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
 }
 
 void ClusterListener::ForceNextNotificationChangedHandler(const Checkable::Ptr& checkable, bool forced, const String& authority)
@@ -806,7 +904,7 @@ void ClusterListener::ForceNextNotificationChangedHandler(const Checkable::Ptr&
 
        SetSecurityInfo(message, checkable, DomainPrivRead);
 
-       AsyncRelayMessage(Endpoint::Ptr(), message, true);
+       AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
 }
 
 void ClusterListener::EnableActiveChecksChangedHandler(const Checkable::Ptr& checkable, bool enabled, const String& authority)
@@ -826,7 +924,7 @@ void ClusterListener::EnableActiveChecksChangedHandler(const Checkable::Ptr& che
 
        SetSecurityInfo(message, checkable, DomainPrivRead);
 
-       AsyncRelayMessage(Endpoint::Ptr(), message, true);
+       AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
 }
 
 void ClusterListener::EnablePassiveChecksChangedHandler(const Checkable::Ptr& checkable, bool enabled, const String& authority)
@@ -846,7 +944,7 @@ void ClusterListener::EnablePassiveChecksChangedHandler(const Checkable::Ptr& ch
 
        SetSecurityInfo(message, checkable, DomainPrivRead);
 
-       AsyncRelayMessage(Endpoint::Ptr(), message, true);
+       AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
 }
 
 void ClusterListener::EnableNotificationsChangedHandler(const Checkable::Ptr& checkable, bool enabled, const String& authority)
@@ -866,7 +964,7 @@ void ClusterListener::EnableNotificationsChangedHandler(const Checkable::Ptr& ch
 
        SetSecurityInfo(message, checkable, DomainPrivRead);
 
-       AsyncRelayMessage(Endpoint::Ptr(), message, true);
+       AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
 }
 
 void ClusterListener::EnableFlappingChangedHandler(const Checkable::Ptr& checkable, bool enabled, const String& authority)
@@ -886,7 +984,7 @@ void ClusterListener::EnableFlappingChangedHandler(const Checkable::Ptr& checkab
 
        SetSecurityInfo(message, checkable, DomainPrivRead);
 
-       AsyncRelayMessage(Endpoint::Ptr(), message, true);
+       AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
 }
 
 void ClusterListener::CommentAddedHandler(const Checkable::Ptr& checkable, const Comment::Ptr& comment, const String& authority)
@@ -906,7 +1004,7 @@ void ClusterListener::CommentAddedHandler(const Checkable::Ptr& checkable, const
 
        SetSecurityInfo(message, checkable, DomainPrivRead);
 
-       AsyncRelayMessage(Endpoint::Ptr(), message, true);
+       AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
 }
 
 void ClusterListener::CommentRemovedHandler(const Checkable::Ptr& checkable, const Comment::Ptr& comment, const String& authority)
@@ -926,7 +1024,7 @@ void ClusterListener::CommentRemovedHandler(const Checkable::Ptr& checkable, con
 
        SetSecurityInfo(message, checkable, DomainPrivRead);
 
-       AsyncRelayMessage(Endpoint::Ptr(), message, true);
+       AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
 }
 
 void ClusterListener::DowntimeAddedHandler(const Checkable::Ptr& checkable, const Downtime::Ptr& downtime, const String& authority)
@@ -946,7 +1044,7 @@ void ClusterListener::DowntimeAddedHandler(const Checkable::Ptr& checkable, cons
 
        SetSecurityInfo(message, checkable, DomainPrivRead);
 
-       AsyncRelayMessage(Endpoint::Ptr(), message, true);
+       AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
 }
 
 void ClusterListener::DowntimeRemovedHandler(const Checkable::Ptr& checkable, const Downtime::Ptr& downtime, const String& authority)
@@ -966,7 +1064,7 @@ void ClusterListener::DowntimeRemovedHandler(const Checkable::Ptr& checkable, co
 
        SetSecurityInfo(message, checkable, DomainPrivRead);
 
-       AsyncRelayMessage(Endpoint::Ptr(), message, true);
+       AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
 }
 
 void ClusterListener::AcknowledgementSetHandler(const Checkable::Ptr& checkable, const String& author, const String& comment, AcknowledgementType type, double expiry, const String& authority)
@@ -989,7 +1087,7 @@ void ClusterListener::AcknowledgementSetHandler(const Checkable::Ptr& checkable,
 
        SetSecurityInfo(message, checkable, DomainPrivRead);
 
-       AsyncRelayMessage(Endpoint::Ptr(), message, true);
+       AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
 }
 
 void ClusterListener::AcknowledgementClearedHandler(const Checkable::Ptr& checkable, const String& authority)
@@ -1008,7 +1106,7 @@ void ClusterListener::AcknowledgementClearedHandler(const Checkable::Ptr& checka
 
        SetSecurityInfo(message, checkable, DomainPrivRead);
 
-       AsyncRelayMessage(Endpoint::Ptr(), message, true);
+       AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
 }
 
 void ClusterListener::AsyncMessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message)
@@ -1045,9 +1143,27 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
 
                        sender->SendMessage(lmessage);
 
+                       Log(LogInformation, "cluster", "Acknowledging log position for identity '" + sender->GetName() + "': " + Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", message->Get("ts")));
                        sender->SetRemoteLogPosition(message->Get("ts"));
 
-                       Log(LogInformation, "cluster", "Acknowledging log position for identity '" + sender->GetName() + "': " + Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", message->Get("ts")));
+                       ObjectLock olock(this);
+                       const EndpointPeerInfo& epi = m_VisibleEndpoints[sender->GetName()];
+
+                       if (epi.Peers) {
+                               ObjectLock olock(epi.Peers);
+                               BOOST_FOREACH(const String& epname, epi.Peers) {
+                                       if (epname == GetIdentity())
+                                               continue;
+
+                                       Endpoint::Ptr peer_endpoint = Endpoint::GetByName(epname);
+
+                                       if (!peer_endpoint)
+                                               continue;
+
+                                       Log(LogInformation, "cluster", "Acknowledging log position for identity '" + peer_endpoint->GetName() + "' (via '" + sender->GetName() + "'): " + Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", message->Get("ts")));
+                                       peer_endpoint->SetRemoteLogPosition(message->Get("ts"));
+                               }
+                       }
                }
        }
 
@@ -1059,6 +1175,14 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
 
                String identity = params->Get("identity");
 
+               {
+                       ObjectLock olock(this);
+                       EndpointPeerInfo epi;
+                       epi.Seen = Utility::GetTime();
+                       epi.Peers = params->Get("connected_endpoints");
+                       m_VisibleEndpoints[identity] = epi;
+               }
+
                Endpoint::Ptr endpoint = Endpoint::GetByName(identity);
 
                if (endpoint) {
@@ -1066,7 +1190,10 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
                        endpoint->SetFeatures(params->Get("features"));
                }
 
-               AsyncRelayMessage(sender, message, true);
+               AsyncRelayMessage(sender, Endpoint::Ptr(), message, false);
+       } else if (message->Get("method") == "cluster::BlockLink") {
+               Log(LogDebug, "cluster", "Got cluster::BlockLink message. Blocking direct link for '" + sender->GetName() + "'");
+               sender->SetBlockedUntil(Utility::GetTime() + 30);
        } else if (message->Get("method") == "cluster::CheckResult") {
                if (!params)
                        return;
@@ -1077,9 +1204,9 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
                Checkable::Ptr checkable;
 
                if (type == "Host")
-                       checkable = DynamicObject::GetObject<Host>(chk);
+                       checkable = Host::GetByName(chk);
                else if (type == "Service")
-                       checkable = DynamicObject::GetObject<Service>(chk);
+                       checkable = Service::GetByName(chk);
                else
                        return;
 
@@ -1098,7 +1225,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
 
                checkable->ProcessCheckResult(cr, sender->GetName());
 
-               AsyncRelayMessage(sender, message, true);
+               AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
        } else if (message->Get("method") == "cluster::SetNextCheck") {
                if (!params)
                        return;
@@ -1127,7 +1254,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
 
                checkable->SetNextCheck(nextCheck, sender->GetName());
 
-               AsyncRelayMessage(sender, message, true);
+               AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
        } else if (message->Get("method") == "cluster::SetForceNextCheck") {
                if (!params)
                        return;
@@ -1156,7 +1283,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
 
                checkable->SetForceNextCheck(forced, sender->GetName());
 
-               AsyncRelayMessage(sender, message, true);
+               AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
        } else if (message->Get("method") == "cluster::SetForceNextNotification") {
                if (!params)
                        return;
@@ -1185,7 +1312,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
 
                checkable->SetForceNextNotification(forced, sender->GetName());
 
-               AsyncRelayMessage(sender, message, true);
+               AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
        } else if (message->Get("method") == "cluster::SetEnableActiveChecks") {
                if (!params)
                        return;
@@ -1214,7 +1341,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
 
                checkable->SetEnableActiveChecks(enabled, sender->GetName());
 
-               AsyncRelayMessage(sender, message, true);
+               AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
        } else if (message->Get("method") == "cluster::SetEnablePassiveChecks") {
                if (!params)
                        return;
@@ -1243,7 +1370,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
 
                checkable->SetEnablePassiveChecks(enabled, sender->GetName());
 
-               AsyncRelayMessage(sender, message, true);
+               AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
        } else if (message->Get("method") == "cluster::SetEnableNotifications") {
                if (!params)
                        return;
@@ -1272,7 +1399,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
 
                checkable->SetEnableNotifications(enabled, sender->GetName());
 
-               AsyncRelayMessage(sender, message, true);
+               AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
        } else if (message->Get("method") == "cluster::SetEnableFlapping") {
                if (!params)
                        return;
@@ -1301,7 +1428,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
 
                checkable->SetEnableFlapping(enabled, sender->GetName());
 
-               AsyncRelayMessage(sender, message, true);
+               AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
        } else if (message->Get("method") == "cluster::SetNextNotification") {
                if (!params)
                        return;
@@ -1324,7 +1451,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
 
                notification->SetNextNotification(nextNotification, sender->GetName());
 
-               AsyncRelayMessage(sender, message, true);
+               AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
        } else if (message->Get("method") == "cluster::AddComment") {
                if (!params)
                        return;
@@ -1354,7 +1481,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
                checkable->AddComment(comment->GetEntryType(), comment->GetAuthor(),
                    comment->GetText(), comment->GetExpireTime(), comment->GetId(), sender->GetName());
 
-               AsyncRelayMessage(sender, message, true);
+               AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
        } else if (message->Get("method") == "cluster::RemoveComment") {
                if (!params)
                        return;
@@ -1383,7 +1510,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
 
                checkable->RemoveComment(id, sender->GetName());
 
-               AsyncRelayMessage(sender, message, true);
+               AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
        } else if (message->Get("method") == "cluster::AddDowntime") {
                if (!params)
                        return;
@@ -1416,7 +1543,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
                    downtime->GetDuration(), downtime->GetScheduledBy(),
                    downtime->GetId(), sender->GetName());
 
-               AsyncRelayMessage(sender, message, true);
+               AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
        } else if (message->Get("method") == "cluster::RemoveDowntime") {
                if (!params)
                        return;
@@ -1445,7 +1572,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
 
                checkable->RemoveDowntime(id, false, sender->GetName());
 
-               AsyncRelayMessage(sender, message, true);
+               AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
        } else if (message->Get("method") == "cluster::SetAcknowledgement") {
                if (!params)
                        return;
@@ -1477,7 +1604,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
 
                checkable->AcknowledgeProblem(author, comment, static_cast<AcknowledgementType>(acktype), expiry, sender->GetName());
 
-               AsyncRelayMessage(sender, message, true);
+               AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
        } else if (message->Get("method") == "cluster::ClearAcknowledgement") {
                if (!params)
                        return;
@@ -1507,7 +1634,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
                        checkable->ClearAcknowledgement(sender->GetName());
                }
 
-               AsyncRelayMessage(sender, message, true);
+               AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
        } else if (message->Get("method") == "cluster::SetLogPosition") {
                if (!params)
                        return;
@@ -1614,19 +1741,21 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
                        Application::RequestRestart();
                }
 
-               AsyncRelayMessage(sender, message, true);
+               AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
        }
 }
 
 bool ClusterListener::IsAuthority(const DynamicObject::Ptr& object, const String& type)
 {
+       double now = Utility::GetTime();
+
        Array::Ptr authorities = object->GetAuthorities();
        std::vector<String> endpoints;
 
        BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
                bool match = false;
 
-               if ((!endpoint->IsConnected() && endpoint->GetName() != GetIdentity()) || !endpoint->HasFeature(type))
+               if ((endpoint->GetSeen() < now - 30 && endpoint->GetName() != GetIdentity()) || !endpoint->HasFeature(type))
                        continue;
 
                if (authorities) {
@@ -1723,9 +1852,9 @@ std::pair<Dictionary::Ptr, Dictionary::Ptr> ClusterListener::GetClusterStatus(vo
        BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
                count_endpoints++;
 
-               if(!endpoint->IsConnected() && endpoint->GetName() != GetIdentity())
+               if(!endpoint->IsAvailable() && endpoint->GetName() != GetIdentity())
                        not_connected_endpoints->Add(endpoint->GetName());
-               else if(endpoint->IsConnected() && endpoint->GetName() != GetIdentity())
+               else if(endpoint->IsAvailable() && endpoint->GetName() != GetIdentity())
                        connected_endpoints->Add(endpoint->GetName());
        }
 
index 941c1f9f68292163f553027b6c3e4e7ccb6d3d07..51d3eaa3bd3aaf6376533cbe0edf0fb19bf98252 100644 (file)
@@ -21,6 +21,7 @@
 #define CLUSTERLISTENER_H
 
 #include "cluster/clusterlistener.th"
+#include "cluster/clusterlink.h"
 #include "base/dynamicobject.h"
 #include "base/timer.h"
 #include "base/array.h"
 namespace icinga
 {
 
+/**
+ * @ingroup cluster
+ */
+struct EndpointPeerInfo
+{
+       double Seen;
+       Array::Ptr Peers;
+};
+
 /**
  * @ingroup cluster
  */
@@ -75,8 +85,12 @@ private:
        void NewClientHandler(const Socket::Ptr& client, TlsRole role);
        void ListenerThreadProc(const Socket::Ptr& server);
 
-       void AsyncRelayMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message, bool persistent);
-       void RelayMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message, bool persistent);
+       std::map<String, EndpointPeerInfo> m_VisibleEndpoints;
+
+       void UpdateLinks(void);
+
+       void AsyncRelayMessage(const Endpoint::Ptr& source, const Endpoint::Ptr& destination, const Dictionary::Ptr& message, bool persistent);
+       void RelayMessage(const Endpoint::Ptr& source, const Endpoint::Ptr& destination, const Dictionary::Ptr& message, bool persistent);
 
        void OpenLogFile(void);
        void RotateLogFile(void);
index a70916e39bb53e38aa926e5115d8527a3dd02a84..fa0563bcc93ed0de6ef1dccb61bdb48e947f043b 100644 (file)
@@ -942,6 +942,8 @@ Example:
       host = "192.168.5.46"
       port = 7777
 
+      metric = 0
+
       config_files = [ "/etc/icinga2/cluster.d/*" ]
 
       config_files_recursive = [
@@ -956,6 +958,7 @@ Attributes:
   ----------------|----------------
   host            |**Required.** The hostname/IP address of the remote Icinga 2 instance.
   port            |**Required.** The service name/port of the remote Icinga 2 instance.
+  metric          |**Optional.** The link metric for this endpoint. Defaults to 0.
   config\_files   |**Optional.** A list of configuration files sent to remote peers (wildcards possible).
   config_files_recursive |**Optional.** A list of configuration files sent to remote peers. Array elements can either be a string (in which case all files in that directory matching the pattern *.conf are included) or a dictionary with elements "path" and "pattern".
   accept\_config  |**Optional.** A list of endpoint names from which this endpoint accepts configuration files.
index 934be2cd003ae5e03233baf2972ba113765c7d21..7ec3216e6523eb19e2e7794422c1ba2d7d2b669d 100644 (file)
@@ -105,7 +105,8 @@ void TlsStream::Handshake(void)
                if (rc > 0)
                        break;
 
-               switch (SSL_get_error(m_SSL.get(), rc)) {
+               int err = SSL_get_error(m_SSL.get(), rc);
+               switch (err) {
                        case SSL_ERROR_WANT_READ:
                                m_Socket->Poll(true, false);
                                continue;
@@ -141,7 +142,8 @@ size_t TlsStream::Read(void *buffer, size_t count)
                }
 
                if (rc <= 0) {
-                       switch (SSL_get_error(m_SSL.get(), rc)) {
+                       int err = SSL_get_error(m_SSL.get(), rc);
+                       switch (err) {
                                case SSL_ERROR_WANT_READ:
                                        m_Socket->Poll(true, false);
                                        continue;
@@ -179,7 +181,8 @@ void TlsStream::Write(const void *buffer, size_t count)
                }
 
                if (rc <= 0) {
-                       switch (SSL_get_error(m_SSL.get(), rc)) {
+                       int err = SSL_get_error(m_SSL.get(), rc);
+                       switch (err) {
                                case SSL_ERROR_WANT_READ:
                                        m_Socket->Poll(true, false);
                                        continue;
index ca8ee4a8a7171d2c4ad88dddb141f0f9d0553eae..d44d9bd8f4d610520fb281fe21deb19b6ae1128a 100644 (file)
@@ -45,6 +45,11 @@ bool Endpoint::IsConnected(void) const
        return GetClient() != NULL;
 }
 
+bool Endpoint::IsAvailable(void) const
+{
+       return GetSeen() > Utility::GetTime() - 30;
+}
+
 Stream::Ptr Endpoint::GetClient(void) const
 {
        return m_Client;
@@ -52,6 +57,8 @@ Stream::Ptr Endpoint::GetClient(void) const
 
 void Endpoint::SetClient(const Stream::Ptr& client)
 {
+       SetBlockedUntil(Utility::GetTime() + 15);
+
        if (m_Client)
                m_Client->Close();
 
@@ -62,10 +69,10 @@ void Endpoint::SetClient(const Stream::Ptr& client)
                thread.detach();
 
                OnConnected(GetSelf());
-               Log(LogWarning, "remote", "Endpoint connected: " + GetName());
+               Log(LogInformation, "remote", "Endpoint connected: " + GetName());
        } else {
                OnDisconnected(GetSelf());
-               Log(LogWarning, "remote", "Endpoint disconnected: " + GetName());
+               Log(LogInformation, "remote", "Endpoint disconnected: " + GetName());
        }
 }
 
index 72e027d43b3ef7bf379b79f8823714124ead0ee2..f7ae173478c1f6667cc48007307b7859541c0049 100644 (file)
@@ -50,6 +50,7 @@ public:
        void SetClient(const Stream::Ptr& client);
 
        bool IsConnected(void) const;
+       bool IsAvailable(void) const;
 
        void SendMessage(const Dictionary::Ptr& request);
 
@@ -58,6 +59,7 @@ public:
 private:
        Stream::Ptr m_Client;
        boost::thread m_Thread;
+       Array::Ptr m_ConnectedEndpoints;
 
        void MessageThreadProc(const Stream::Ptr& stream);
 };
index 4b4e7757e348a4d990114e4a2e52253397b526e1..36d52a4d2f094d9044836d7ed7bf3d520473c69c 100644 (file)
@@ -10,15 +10,15 @@ class Endpoint : DynamicObject
        [config] Array::Ptr config_files;
        [config] Array::Ptr config_files_recursive;
        [config] Array::Ptr accept_config;
+       [config] int metric;
 
        [state] double seen;
        [state] double local_log_position;
        [state] double remote_log_position;
        [state] Dictionary::Ptr features;
 
-       bool syncing {
-               default {{{ return false; }}}
-       };
+       bool syncing;
+       double blocked_until;
 };
 
 }
index 1a3788ea5e7574ae09146a92cb49c612c54445c9..628209d060764e3f9dea08651f52ed2e915f3aca 100644 (file)
@@ -34,7 +34,7 @@ using namespace icinga;
 void JsonRpc::SendMessage(const Stream::Ptr& stream, const Dictionary::Ptr& message)
 {
        String json = JsonSerialize(message);
-//     std::cerr << ">> " << json << std::endl;
+       //std::cerr << ">> " << json << std::endl;
        NetString::WriteStringToStream(stream, json);
 }
 
@@ -44,7 +44,7 @@ Dictionary::Ptr JsonRpc::ReadMessage(const Stream::Ptr& stream)
        if (!NetString::ReadStringFromStream(stream, &jsonString))
                BOOST_THROW_EXCEPTION(std::runtime_error("ReadStringFromStream signalled EOF."));
 
-//     std::cerr << "<< " << jsonString << std::endl;
+       //std::cerr << "<< " << jsonString << std::endl;
        Value value = JsonDeserialize(jsonString);
 
        if (!value.IsObjectType<Dictionary>()) {
index e007068c1cb4f63430d1c142146d57f2906d073e..e75f017fcdbe01758d773d912c16edb82b327e20 100644 (file)
@@ -24,6 +24,8 @@
        %require "port",
        %attribute %string "port",
 
+       %attribute %number "metric",
+
        %attribute %array "config_files" {
                %attribute %string "*"
        },