Utility::QueueAsyncCallback(boost::bind(&ClusterComponent::NewClientHandler, this, client, TlsRoleClient));
}
+void ClusterComponent::RelayMessage(const Endpoint::Ptr& except, const Dictionary::Ptr& message, bool persistent)
+{
+ BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
+ if (!persistent && !endpoint->IsConnected())
+ continue;
+
+ if (endpoint == except)
+ continue;
+
+ if (endpoint->GetName() == GetIdentity())
+ continue;
+
+ endpoint->SendMessage(message);
+ }
+}
+
/**
* Processes a new client connection.
*
message->Set("jsonrpc", "2.0");
message->Set("method", "cluster::HeartBeat");
- BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
- endpoint->SendMessage(message);
- }
+ RelayMessage(Endpoint::Ptr(), message, false);
Array::Ptr peers = GetPeers();
message->Set("method", "cluster::CheckResult");
message->Set("params", params);
- BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
- endpoint->SendMessage(message);
- }
+ RelayMessage(Endpoint::Ptr(), message, true);
}
void ClusterComponent::NextCheckChangedHandler(const Service::Ptr& service, double nextCheck, const String& authority)
message->Set("method", "cluster::SetNextCheck");
message->Set("params", params);
- BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
- endpoint->SendMessage(message);
- }
+ RelayMessage(Endpoint::Ptr(), message, true);
}
void ClusterComponent::NextNotificationChangedHandler(const Notification::Ptr& notification, double nextNotification, const String& authority)
message->Set("method", "cluster::SetNextNotification");
message->Set("params", params);
- BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
- endpoint->SendMessage(message);
- }
+ RelayMessage(Endpoint::Ptr(), message, true);
}
void ClusterComponent::ForceNextCheckChangedHandler(const Service::Ptr& service, bool forced, const String& authority)
message->Set("method", "cluster::SetForceNextCheck");
message->Set("params", params);
- BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
- endpoint->SendMessage(message);
- }
+ RelayMessage(Endpoint::Ptr(), message, true);
}
void ClusterComponent::ForceNextNotificationChangedHandler(const Service::Ptr& service, bool forced, const String& authority)
message->Set("method", "cluster::SetForceNextNotification");
message->Set("params", params);
- BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
- endpoint->SendMessage(message);
- }
+ RelayMessage(Endpoint::Ptr(), message, true);
}
void ClusterComponent::EnableActiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority)
message->Set("method", "cluster::SetEnableActiveChecks");
message->Set("params", params);
- BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
- endpoint->SendMessage(message);
- }
+ RelayMessage(Endpoint::Ptr(), message, true);
}
void ClusterComponent::EnablePassiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority)
message->Set("method", "cluster::SetEnablePassiveChecks");
message->Set("params", params);
- BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
- endpoint->SendMessage(message);
- }
+ RelayMessage(Endpoint::Ptr(), message, true);
}
void ClusterComponent::EnableNotificationsChangedHandler(const Service::Ptr& service, bool enabled, const String& authority)
message->Set("method", "cluster::SetEnableNotifications");
message->Set("params", params);
- BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
- endpoint->SendMessage(message);
- }
+ RelayMessage(Endpoint::Ptr(), message, true);
}
void ClusterComponent::EnableFlappingChangedHandler(const Service::Ptr& service, bool enabled, const String& authority)
message->Set("method", "cluster::SetEnableFlapping");
message->Set("params", params);
- BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
- endpoint->SendMessage(message);
- }
+ RelayMessage(Endpoint::Ptr(), message, true);
}
void ClusterComponent::CommentAddedHandler(const Service::Ptr& service, const Dictionary::Ptr& comment, const String& authority)
message->Set("method", "cluster::AddComment");
message->Set("params", params);
- BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
- endpoint->SendMessage(message);
- }
+ RelayMessage(Endpoint::Ptr(), message, true);
}
void ClusterComponent::CommentRemovedHandler(const Service::Ptr& service, const Dictionary::Ptr& comment, const String& authority)
message->Set("method", "cluster::RemoveComment");
message->Set("params", params);
- BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
- endpoint->SendMessage(message);
- }
+ RelayMessage(Endpoint::Ptr(), message, true);
}
void ClusterComponent::DowntimeAddedHandler(const Service::Ptr& service, const Dictionary::Ptr& downtime, const String& authority)
message->Set("method", "cluster::AddDowntime");
message->Set("params", params);
- BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
- endpoint->SendMessage(message);
- }
+ RelayMessage(Endpoint::Ptr(), message, true);
}
void ClusterComponent::DowntimeRemovedHandler(const Service::Ptr& service, const Dictionary::Ptr& downtime, const String& authority)
message->Set("method", "cluster::RemoveDowntime");
message->Set("params", params);
- BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
- endpoint->SendMessage(message);
- }
+ RelayMessage(Endpoint::Ptr(), message, true);
}
void ClusterComponent::AcknowledgementSetHandler(const Service::Ptr& service, const String& author, const String& comment, AcknowledgementType type, double expiry, const String& authority)
message->Set("method", "cluster::SetAcknowledgement");
message->Set("params", params);
- BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
- endpoint->SendMessage(message);
- }
+ RelayMessage(Endpoint::Ptr(), message, true);
}
void ClusterComponent::AcknowledgementClearedHandler(const Service::Ptr& service, const String& authority)
message->Set("method", "cluster::ClearAcknowledgement");
message->Set("params", params);
- BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
- endpoint->SendMessage(message);
- }
+ RelayMessage(Endpoint::Ptr(), message, true);
}
void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message)
{
- BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
- if (sender != endpoint)
- endpoint->SendMessage(message);
- }
+ RelayMessage(sender, message, true);
if (message->Get("method") == "cluster::HeartBeat") {
sender->SetSeen(Utility::GetTime());
#include "base/logger_fwd.h"
#include "config/configitembuilder.h"
#include <boost/smart_ptr/make_shared.hpp>
+#include <fstream>
using namespace icinga;
boost::signals2::signal<void (const Endpoint::Ptr&)> Endpoint::OnConnected;
boost::signals2::signal<void (const Endpoint::Ptr&, const Dictionary::Ptr&)> Endpoint::OnMessageReceived;
+void Endpoint::OnConfigLoaded(void)
+{
+ ObjectLock olock(this);
+
+ OpenSpoolFile();
+}
+
+void Endpoint::Stop(void)
+{
+ ObjectLock olock(this);
+
+ CloseSpoolFile();
+}
+
+void Endpoint::OpenSpoolFile(void)
+{
+ ASSERT(OwnsLock());
+
+ String path = GetSpoolPath();
+ std::fstream *fp = new std::fstream(path.CStr(), std::fstream::out | std::ofstream::app);
+
+ if (!fp->good())
+ Log(LogWarning, "cluster", "Could not open spool file: " + path);
+
+ m_SpoolFile = boost::make_shared<StdioStream>(fp, true);
+}
+
+void Endpoint::CloseSpoolFile(void)
+{
+ ASSERT(OwnsLock());
+
+ m_SpoolFile->Close();
+ m_SpoolFile.reset();
+}
+
/**
* Checks whether this endpoint is connected.
*
{
ObjectLock olock(this);
+ CloseSpoolFile();
+
+ String path = GetSpoolPath();
+ std::ifstream *fp = new std::ifstream(path.CStr(), std::ifstream::in);
+
+ while (fp && !fp->eof()) {
+ char data[1024];
+ fp->read(data, sizeof(data));
+ client->Write(data, fp->gcount());
+ }
+
+ fp->close();
+ delete fp;
+
+ (void) unlink(path.CStr());
+
+ OpenSpoolFile();
+
m_Client = client;
}
void Endpoint::SendMessage(const Dictionary::Ptr& message)
{
- if (!IsConnected()) {
- // TODO: persist the message
- return;
- }
+ Stream::Ptr destination;
+
+ if (!IsConnected())
+ destination = m_SpoolFile;
+ else
+ destination = GetClient();
try {
- JsonRpc::SendMessage(GetClient(), message);
+ JsonRpc::SendMessage(destination, message);
} catch (const std::exception& ex) {
std::ostringstream msgbuf;
msgbuf << "Error while sending JSON-RPC message for endpoint '" << GetName() << "': " << boost::diagnostic_information(ex);
m_Seen = ts;
}
+String Endpoint::GetSpoolPath(void) const
+{
+ return Application::GetLocalStateDir() + "/spool/icinga2/cluster/" + GetName() + ".ns";
+}
+
void Endpoint::InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) const
{
DynamicObject::InternalSerialize(bag, attributeTypes);