Utility::QueueAsyncCallback(boost::bind(&ClusterListener::NewClientHandler, this, client, TlsRoleClient));
}
-void ClusterListener::AsyncRelayMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message, bool persistent)
+void ClusterListener::AsyncRelayMessage(const Endpoint::Ptr& source, const Endpoint::Ptr& destination, const Dictionary::Ptr& message, bool persistent)
{
- m_RelayQueue.Enqueue(boost::bind(&ClusterListener::RelayMessage, this, source, message, persistent));
+ m_RelayQueue.Enqueue(boost::bind(&ClusterListener::RelayMessage, this, source, destination, message, persistent));
}
void ClusterListener::PersistMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message)
}
}
-void ClusterListener::RelayMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message, bool persistent)
+void ClusterListener::RelayMessage(const Endpoint::Ptr& source, const Endpoint::Ptr& destination, const Dictionary::Ptr& message, bool persistent)
{
double ts = Utility::GetTime();
message->Set("ts", ts);
privs = security->Get("privs");
}
+ double now = Utility::GetTime();
+
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
- if (!persistent && !endpoint->IsConnected())
+ if (!endpoint->IsConnected())
+ continue;
+
+ if (destination && endpoint != destination)
+ continue;
+
+ if (!destination && endpoint->GetBlockedUntil() > now)
continue;
if (endpoint == source)
shared_ptr<X509> cert = tlsStream->GetPeerCertificate();
String identity = GetCertificateCN(cert);
- Log(LogInformation, "cluster", "New client connection for identity '" + identity + "'");
-
Endpoint::Ptr endpoint = Endpoint::GetByName(identity);
if (!endpoint) {
return;
}
+ if (endpoint->GetClient()) {
+ tlsStream->Close();
+ return;
+ }
+
+ Log(LogInformation, "cluster", "New client connection for identity '" + identity + "'");
+
{
ObjectLock olock(endpoint);
ReplayLog(endpoint, tlsStream);
}
+void ClusterListener::UpdateLinks(void)
+{
+ ObjectLock olock(this);
+ /* build a set of potential routes */
+ std::set<ClusterLink> links;
+ std::pair<String, EndpointPeerInfo> kv;
+ BOOST_FOREACH(kv, m_VisibleEndpoints) {
+ String endpoint = kv.first;
+ const EndpointPeerInfo& epi = kv.second;
+
+ if (GetIdentity() == endpoint)
+ continue;
+
+ if (epi.Seen > Utility::GetTime() - 30)
+ links.insert(ClusterLink(GetIdentity(), endpoint));
+
+ if (!epi.Peers)
+ continue;
+
+ ObjectLock olock(epi.Peers);
+ BOOST_FOREACH(const String& peer, epi.Peers)
+ links.insert(ClusterLink(endpoint, peer));
+ }
+ olock.Unlock();
+
+ /* sort the routes by metric */
+ std::vector<ClusterLink> sortedLinks;
+ std::copy(links.begin(), links.end(), std::back_inserter(sortedLinks));
+ std::sort(sortedLinks.begin(), sortedLinks.end(), ClusterLinkMetricLessComparer());
+
+ /* pick routes */
+ std::set<String> visitedEndpoints;
+ BOOST_FOREACH(const ClusterLink& link, sortedLinks) {
+ Endpoint::Ptr other;
+
+ if (link.From == GetIdentity())
+ other = Endpoint::GetByName(link.To);
+ else if (link.To == GetIdentity())
+ other = Endpoint::GetByName(link.From);
+
+ if (visitedEndpoints.find(link.From) != visitedEndpoints.end() &&
+ visitedEndpoints.find(link.To) != visitedEndpoints.end()) {
+ if (other) {
+ Log(LogInformation, "cluster", "Blocking link to '" + other->GetName() + "'");
+
+ Dictionary::Ptr message = make_shared<Dictionary>();
+ message->Set("jsonrpc", "2.0");
+ message->Set("method", "cluster::BlockLink");
+ message->Set("params", make_shared<Dictionary>());
+
+ AsyncRelayMessage(Endpoint::Ptr(), other, message, false);
+ }
+
+ continue;
+ }
+
+ visitedEndpoints.insert(link.From);
+ visitedEndpoints.insert(link.To);
+ }
+}
+
void ClusterListener::ClusterTimerHandler(void)
{
- /* broadcast a heartbeat message */
- Dictionary::Ptr params = make_shared<Dictionary>();
- params->Set("identity", GetIdentity());
+ /* Update endpoint routes */
+ UpdateLinks();
/* Eww. */
Dictionary::Ptr features = make_shared<Dictionary>();
features->Set("checker", SupportsChecks());
features->Set("notification", SupportsNotifications());
- params->Set("features", features);
- Dictionary::Ptr message = make_shared<Dictionary>();
- message->Set("jsonrpc", "2.0");
- message->Set("method", "cluster::HeartBeat");
- message->Set("params", params);
+ /* broadcast a heartbeat message */
+ BOOST_FOREACH(const Endpoint::Ptr& destination, DynamicType::GetObjects<Endpoint>()) {
+ std::set<String> connected_endpoints;
+
+ BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
+ if (endpoint->GetName() == GetIdentity())
+ continue;
+
+ if (!endpoint->IsConnected())
+ continue;
+
+ connected_endpoints.insert(endpoint->GetName());
+ }
+
+ Array::Ptr epnames = make_shared<Array>();
+ BOOST_FOREACH(const String& name, connected_endpoints)
+ epnames->Add(name);
- Endpoint::GetByName(GetIdentity())->SetFeatures(features);
+ Dictionary::Ptr params = make_shared<Dictionary>();
+ params->Set("identity", GetIdentity());
+ params->Set("features", features);
+ params->Set("connected_endpoints", epnames);
- AsyncRelayMessage(Endpoint::Ptr(), message, false);
+ Dictionary::Ptr message = make_shared<Dictionary>();
+ message->Set("jsonrpc", "2.0");
+ message->Set("method", "cluster::HeartBeat");
+ message->Set("params", params);
+
+ Endpoint::GetByName(GetIdentity())->SetFeatures(features);
+
+ AsyncRelayMessage(Endpoint::Ptr(), destination, message, false);
+ }
{
ObjectLock olock(this);
if (endpoint->GetSeen() > Utility::GetTime() - 60)
continue;
+ m_VisibleEndpoints.erase(endpoint->GetName());
+
Stream::Ptr client = endpoint->GetClient();
if (client) {
SetSecurityInfo(message, checkable, DomainPrivRead);
- AsyncRelayMessage(Endpoint::Ptr(), message, true);
+ AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
}
void ClusterListener::NextCheckChangedHandler(const Checkable::Ptr& checkable, double nextCheck, const String& authority)
SetSecurityInfo(message, checkable, DomainPrivRead);
- AsyncRelayMessage(Endpoint::Ptr(), message, true);
+ AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
}
void ClusterListener::NextNotificationChangedHandler(const Notification::Ptr& notification, double nextNotification, const String& authority)
SetSecurityInfo(message, notification->GetCheckable(), DomainPrivRead);
- AsyncRelayMessage(Endpoint::Ptr(), message, true);
+ AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
}
void ClusterListener::ForceNextCheckChangedHandler(const Checkable::Ptr& checkable, bool forced, const String& authority)
SetSecurityInfo(message, checkable, DomainPrivRead);
- AsyncRelayMessage(Endpoint::Ptr(), message, true);
+ AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
}
void ClusterListener::ForceNextNotificationChangedHandler(const Checkable::Ptr& checkable, bool forced, const String& authority)
SetSecurityInfo(message, checkable, DomainPrivRead);
- AsyncRelayMessage(Endpoint::Ptr(), message, true);
+ AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
}
void ClusterListener::EnableActiveChecksChangedHandler(const Checkable::Ptr& checkable, bool enabled, const String& authority)
SetSecurityInfo(message, checkable, DomainPrivRead);
- AsyncRelayMessage(Endpoint::Ptr(), message, true);
+ AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
}
void ClusterListener::EnablePassiveChecksChangedHandler(const Checkable::Ptr& checkable, bool enabled, const String& authority)
SetSecurityInfo(message, checkable, DomainPrivRead);
- AsyncRelayMessage(Endpoint::Ptr(), message, true);
+ AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
}
void ClusterListener::EnableNotificationsChangedHandler(const Checkable::Ptr& checkable, bool enabled, const String& authority)
SetSecurityInfo(message, checkable, DomainPrivRead);
- AsyncRelayMessage(Endpoint::Ptr(), message, true);
+ AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
}
void ClusterListener::EnableFlappingChangedHandler(const Checkable::Ptr& checkable, bool enabled, const String& authority)
SetSecurityInfo(message, checkable, DomainPrivRead);
- AsyncRelayMessage(Endpoint::Ptr(), message, true);
+ AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
}
void ClusterListener::CommentAddedHandler(const Checkable::Ptr& checkable, const Comment::Ptr& comment, const String& authority)
SetSecurityInfo(message, checkable, DomainPrivRead);
- AsyncRelayMessage(Endpoint::Ptr(), message, true);
+ AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
}
void ClusterListener::CommentRemovedHandler(const Checkable::Ptr& checkable, const Comment::Ptr& comment, const String& authority)
SetSecurityInfo(message, checkable, DomainPrivRead);
- AsyncRelayMessage(Endpoint::Ptr(), message, true);
+ AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
}
void ClusterListener::DowntimeAddedHandler(const Checkable::Ptr& checkable, const Downtime::Ptr& downtime, const String& authority)
SetSecurityInfo(message, checkable, DomainPrivRead);
- AsyncRelayMessage(Endpoint::Ptr(), message, true);
+ AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
}
void ClusterListener::DowntimeRemovedHandler(const Checkable::Ptr& checkable, const Downtime::Ptr& downtime, const String& authority)
SetSecurityInfo(message, checkable, DomainPrivRead);
- AsyncRelayMessage(Endpoint::Ptr(), message, true);
+ AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
}
void ClusterListener::AcknowledgementSetHandler(const Checkable::Ptr& checkable, const String& author, const String& comment, AcknowledgementType type, double expiry, const String& authority)
SetSecurityInfo(message, checkable, DomainPrivRead);
- AsyncRelayMessage(Endpoint::Ptr(), message, true);
+ AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
}
void ClusterListener::AcknowledgementClearedHandler(const Checkable::Ptr& checkable, const String& authority)
SetSecurityInfo(message, checkable, DomainPrivRead);
- AsyncRelayMessage(Endpoint::Ptr(), message, true);
+ AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
}
void ClusterListener::AsyncMessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message)
sender->SendMessage(lmessage);
+ Log(LogInformation, "cluster", "Acknowledging log position for identity '" + sender->GetName() + "': " + Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", message->Get("ts")));
sender->SetRemoteLogPosition(message->Get("ts"));
- Log(LogInformation, "cluster", "Acknowledging log position for identity '" + sender->GetName() + "': " + Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", message->Get("ts")));
+ ObjectLock olock(this);
+ const EndpointPeerInfo& epi = m_VisibleEndpoints[sender->GetName()];
+
+ if (epi.Peers) {
+ ObjectLock olock(epi.Peers);
+ BOOST_FOREACH(const String& epname, epi.Peers) {
+ if (epname == GetIdentity())
+ continue;
+
+ Endpoint::Ptr peer_endpoint = Endpoint::GetByName(epname);
+
+ if (!peer_endpoint)
+ continue;
+
+ Log(LogInformation, "cluster", "Acknowledging log position for identity '" + peer_endpoint->GetName() + "' (via '" + sender->GetName() + "'): " + Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", message->Get("ts")));
+ peer_endpoint->SetRemoteLogPosition(message->Get("ts"));
+ }
+ }
}
}
String identity = params->Get("identity");
+ {
+ ObjectLock olock(this);
+ EndpointPeerInfo epi;
+ epi.Seen = Utility::GetTime();
+ epi.Peers = params->Get("connected_endpoints");
+ m_VisibleEndpoints[identity] = epi;
+ }
+
Endpoint::Ptr endpoint = Endpoint::GetByName(identity);
if (endpoint) {
endpoint->SetFeatures(params->Get("features"));
}
- AsyncRelayMessage(sender, message, true);
+ AsyncRelayMessage(sender, Endpoint::Ptr(), message, false);
+ } else if (message->Get("method") == "cluster::BlockLink") {
+ Log(LogDebug, "cluster", "Got cluster::BlockLink message. Blocking direct link for '" + sender->GetName() + "'");
+ sender->SetBlockedUntil(Utility::GetTime() + 30);
} else if (message->Get("method") == "cluster::CheckResult") {
if (!params)
return;
Checkable::Ptr checkable;
if (type == "Host")
- checkable = DynamicObject::GetObject<Host>(chk);
+ checkable = Host::GetByName(chk);
else if (type == "Service")
- checkable = DynamicObject::GetObject<Service>(chk);
+ checkable = Service::GetByName(chk);
else
return;
checkable->ProcessCheckResult(cr, sender->GetName());
- AsyncRelayMessage(sender, message, true);
+ AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
} else if (message->Get("method") == "cluster::SetNextCheck") {
if (!params)
return;
checkable->SetNextCheck(nextCheck, sender->GetName());
- AsyncRelayMessage(sender, message, true);
+ AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
} else if (message->Get("method") == "cluster::SetForceNextCheck") {
if (!params)
return;
checkable->SetForceNextCheck(forced, sender->GetName());
- AsyncRelayMessage(sender, message, true);
+ AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
} else if (message->Get("method") == "cluster::SetForceNextNotification") {
if (!params)
return;
checkable->SetForceNextNotification(forced, sender->GetName());
- AsyncRelayMessage(sender, message, true);
+ AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
} else if (message->Get("method") == "cluster::SetEnableActiveChecks") {
if (!params)
return;
checkable->SetEnableActiveChecks(enabled, sender->GetName());
- AsyncRelayMessage(sender, message, true);
+ AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
} else if (message->Get("method") == "cluster::SetEnablePassiveChecks") {
if (!params)
return;
checkable->SetEnablePassiveChecks(enabled, sender->GetName());
- AsyncRelayMessage(sender, message, true);
+ AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
} else if (message->Get("method") == "cluster::SetEnableNotifications") {
if (!params)
return;
checkable->SetEnableNotifications(enabled, sender->GetName());
- AsyncRelayMessage(sender, message, true);
+ AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
} else if (message->Get("method") == "cluster::SetEnableFlapping") {
if (!params)
return;
checkable->SetEnableFlapping(enabled, sender->GetName());
- AsyncRelayMessage(sender, message, true);
+ AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
} else if (message->Get("method") == "cluster::SetNextNotification") {
if (!params)
return;
notification->SetNextNotification(nextNotification, sender->GetName());
- AsyncRelayMessage(sender, message, true);
+ AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
} else if (message->Get("method") == "cluster::AddComment") {
if (!params)
return;
checkable->AddComment(comment->GetEntryType(), comment->GetAuthor(),
comment->GetText(), comment->GetExpireTime(), comment->GetId(), sender->GetName());
- AsyncRelayMessage(sender, message, true);
+ AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
} else if (message->Get("method") == "cluster::RemoveComment") {
if (!params)
return;
checkable->RemoveComment(id, sender->GetName());
- AsyncRelayMessage(sender, message, true);
+ AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
} else if (message->Get("method") == "cluster::AddDowntime") {
if (!params)
return;
downtime->GetDuration(), downtime->GetScheduledBy(),
downtime->GetId(), sender->GetName());
- AsyncRelayMessage(sender, message, true);
+ AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
} else if (message->Get("method") == "cluster::RemoveDowntime") {
if (!params)
return;
checkable->RemoveDowntime(id, false, sender->GetName());
- AsyncRelayMessage(sender, message, true);
+ AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
} else if (message->Get("method") == "cluster::SetAcknowledgement") {
if (!params)
return;
checkable->AcknowledgeProblem(author, comment, static_cast<AcknowledgementType>(acktype), expiry, sender->GetName());
- AsyncRelayMessage(sender, message, true);
+ AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
} else if (message->Get("method") == "cluster::ClearAcknowledgement") {
if (!params)
return;
checkable->ClearAcknowledgement(sender->GetName());
}
- AsyncRelayMessage(sender, message, true);
+ AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
} else if (message->Get("method") == "cluster::SetLogPosition") {
if (!params)
return;
Application::RequestRestart();
}
- AsyncRelayMessage(sender, message, true);
+ AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
}
}
bool ClusterListener::IsAuthority(const DynamicObject::Ptr& object, const String& type)
{
+ double now = Utility::GetTime();
+
Array::Ptr authorities = object->GetAuthorities();
std::vector<String> endpoints;
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
bool match = false;
- if ((!endpoint->IsConnected() && endpoint->GetName() != GetIdentity()) || !endpoint->HasFeature(type))
+ if ((endpoint->GetSeen() < now - 30 && endpoint->GetName() != GetIdentity()) || !endpoint->HasFeature(type))
continue;
if (authorities) {
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
count_endpoints++;
- if(!endpoint->IsConnected() && endpoint->GetName() != GetIdentity())
+ if(!endpoint->IsAvailable() && endpoint->GetName() != GetIdentity())
not_connected_endpoints->Add(endpoint->GetName());
- else if(endpoint->IsConnected() && endpoint->GetName() != GetIdentity())
+ else if(endpoint->IsAvailable() && endpoint->GetName() != GetIdentity())
connected_endpoints->Add(endpoint->GetName());
}