From: Gunnar Beutner Date: Mon, 2 Sep 2013 13:12:20 +0000 (+0200) Subject: cluster: Implement persistent messages. X-Git-Tag: v0.0.3~620 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=754dbfb8ef6a5a560d7f22d71e5ddaed4287417b;p=icinga2 cluster: Implement persistent messages. --- diff --git a/Makefile.am b/Makefile.am index 6439f595c..61a43e237 100644 --- a/Makefile.am +++ b/Makefile.am @@ -29,6 +29,7 @@ install-data-local: $(MKDIR_P) $(DESTDIR)${localstatedir}/log/${PACKAGE}/compat/archives $(MKDIR_P) $(DESTDIR)${localstatedir}/cache/${PACKAGE} $(MKDIR_P) $(DESTDIR)${localstatedir}/spool/${PACKAGE}/perfdata + $(MKDIR_P) $(DESTDIR)${localstatedir}/spool/${PACKAGE}/cluster $(MKDIR_P) $(DESTDIR)${localstatedir}/lib/${PACKAGE} $(MKDIR_P) $(DESTDIR)${localstatedir}/run/${PACKAGE} diff --git a/components/cluster/clustercomponent.cpp b/components/cluster/clustercomponent.cpp index 2f595f9d2..bb0578fe9 100644 --- a/components/cluster/clustercomponent.cpp +++ b/components/cluster/clustercomponent.cpp @@ -190,6 +190,22 @@ void ClusterComponent::AddConnection(const String& node, const String& service) Utility::QueueAsyncCallback(boost::bind(&ClusterComponent::NewClientHandler, this, client, TlsRoleClient)); } +void ClusterComponent::RelayMessage(const Endpoint::Ptr& except, const Dictionary::Ptr& message, bool persistent) +{ + BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { + if (!persistent && !endpoint->IsConnected()) + continue; + + if (endpoint == except) + continue; + + if (endpoint->GetName() == GetIdentity()) + continue; + + endpoint->SendMessage(message); + } +} + /** * Processes a new client connection. * @@ -225,9 +241,7 @@ void ClusterComponent::ClusterTimerHandler(void) message->Set("jsonrpc", "2.0"); message->Set("method", "cluster::HeartBeat"); - BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { - endpoint->SendMessage(message); - } + RelayMessage(Endpoint::Ptr(), message, false); Array::Ptr peers = GetPeers(); @@ -274,9 +288,7 @@ void ClusterComponent::CheckResultHandler(const Service::Ptr& service, const Dic message->Set("method", "cluster::CheckResult"); message->Set("params", params); - BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { - endpoint->SendMessage(message); - } + RelayMessage(Endpoint::Ptr(), message, true); } void ClusterComponent::NextCheckChangedHandler(const Service::Ptr& service, double nextCheck, const String& authority) @@ -293,9 +305,7 @@ void ClusterComponent::NextCheckChangedHandler(const Service::Ptr& service, doub message->Set("method", "cluster::SetNextCheck"); message->Set("params", params); - BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { - endpoint->SendMessage(message); - } + RelayMessage(Endpoint::Ptr(), message, true); } void ClusterComponent::NextNotificationChangedHandler(const Notification::Ptr& notification, double nextNotification, const String& authority) @@ -312,9 +322,7 @@ void ClusterComponent::NextNotificationChangedHandler(const Notification::Ptr& n message->Set("method", "cluster::SetNextNotification"); message->Set("params", params); - BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { - endpoint->SendMessage(message); - } + RelayMessage(Endpoint::Ptr(), message, true); } void ClusterComponent::ForceNextCheckChangedHandler(const Service::Ptr& service, bool forced, const String& authority) @@ -331,9 +339,7 @@ void ClusterComponent::ForceNextCheckChangedHandler(const Service::Ptr& service, message->Set("method", "cluster::SetForceNextCheck"); message->Set("params", params); - BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { - endpoint->SendMessage(message); - } + RelayMessage(Endpoint::Ptr(), message, true); } void ClusterComponent::ForceNextNotificationChangedHandler(const Service::Ptr& service, bool forced, const String& authority) @@ -350,9 +356,7 @@ void ClusterComponent::ForceNextNotificationChangedHandler(const Service::Ptr& s message->Set("method", "cluster::SetForceNextNotification"); message->Set("params", params); - BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { - endpoint->SendMessage(message); - } + RelayMessage(Endpoint::Ptr(), message, true); } void ClusterComponent::EnableActiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority) @@ -369,9 +373,7 @@ void ClusterComponent::EnableActiveChecksChangedHandler(const Service::Ptr& serv message->Set("method", "cluster::SetEnableActiveChecks"); message->Set("params", params); - BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { - endpoint->SendMessage(message); - } + RelayMessage(Endpoint::Ptr(), message, true); } void ClusterComponent::EnablePassiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority) @@ -388,9 +390,7 @@ void ClusterComponent::EnablePassiveChecksChangedHandler(const Service::Ptr& ser message->Set("method", "cluster::SetEnablePassiveChecks"); message->Set("params", params); - BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { - endpoint->SendMessage(message); - } + RelayMessage(Endpoint::Ptr(), message, true); } void ClusterComponent::EnableNotificationsChangedHandler(const Service::Ptr& service, bool enabled, const String& authority) @@ -407,9 +407,7 @@ void ClusterComponent::EnableNotificationsChangedHandler(const Service::Ptr& ser message->Set("method", "cluster::SetEnableNotifications"); message->Set("params", params); - BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { - endpoint->SendMessage(message); - } + RelayMessage(Endpoint::Ptr(), message, true); } void ClusterComponent::EnableFlappingChangedHandler(const Service::Ptr& service, bool enabled, const String& authority) @@ -426,9 +424,7 @@ void ClusterComponent::EnableFlappingChangedHandler(const Service::Ptr& service, message->Set("method", "cluster::SetEnableFlapping"); message->Set("params", params); - BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { - endpoint->SendMessage(message); - } + RelayMessage(Endpoint::Ptr(), message, true); } void ClusterComponent::CommentAddedHandler(const Service::Ptr& service, const Dictionary::Ptr& comment, const String& authority) @@ -445,9 +441,7 @@ void ClusterComponent::CommentAddedHandler(const Service::Ptr& service, const Di message->Set("method", "cluster::AddComment"); message->Set("params", params); - BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { - endpoint->SendMessage(message); - } + RelayMessage(Endpoint::Ptr(), message, true); } void ClusterComponent::CommentRemovedHandler(const Service::Ptr& service, const Dictionary::Ptr& comment, const String& authority) @@ -464,9 +458,7 @@ void ClusterComponent::CommentRemovedHandler(const Service::Ptr& service, const message->Set("method", "cluster::RemoveComment"); message->Set("params", params); - BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { - endpoint->SendMessage(message); - } + RelayMessage(Endpoint::Ptr(), message, true); } void ClusterComponent::DowntimeAddedHandler(const Service::Ptr& service, const Dictionary::Ptr& downtime, const String& authority) @@ -483,9 +475,7 @@ void ClusterComponent::DowntimeAddedHandler(const Service::Ptr& service, const D message->Set("method", "cluster::AddDowntime"); message->Set("params", params); - BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { - endpoint->SendMessage(message); - } + RelayMessage(Endpoint::Ptr(), message, true); } void ClusterComponent::DowntimeRemovedHandler(const Service::Ptr& service, const Dictionary::Ptr& downtime, const String& authority) @@ -502,9 +492,7 @@ void ClusterComponent::DowntimeRemovedHandler(const Service::Ptr& service, const message->Set("method", "cluster::RemoveDowntime"); message->Set("params", params); - BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { - endpoint->SendMessage(message); - } + RelayMessage(Endpoint::Ptr(), message, true); } void ClusterComponent::AcknowledgementSetHandler(const Service::Ptr& service, const String& author, const String& comment, AcknowledgementType type, double expiry, const String& authority) @@ -524,9 +512,7 @@ void ClusterComponent::AcknowledgementSetHandler(const Service::Ptr& service, co message->Set("method", "cluster::SetAcknowledgement"); message->Set("params", params); - BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { - endpoint->SendMessage(message); - } + RelayMessage(Endpoint::Ptr(), message, true); } void ClusterComponent::AcknowledgementClearedHandler(const Service::Ptr& service, const String& authority) @@ -542,17 +528,12 @@ void ClusterComponent::AcknowledgementClearedHandler(const Service::Ptr& service message->Set("method", "cluster::ClearAcknowledgement"); message->Set("params", params); - BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { - endpoint->SendMessage(message); - } + RelayMessage(Endpoint::Ptr(), message, true); } void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message) { - BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { - if (sender != endpoint) - endpoint->SendMessage(message); - } + RelayMessage(sender, message, true); if (message->Get("method") == "cluster::HeartBeat") { sender->SetSeen(Utility::GetTime()); diff --git a/components/cluster/clustercomponent.h b/components/cluster/clustercomponent.h index 7254feb03..e1242095b 100644 --- a/components/cluster/clustercomponent.h +++ b/components/cluster/clustercomponent.h @@ -79,6 +79,8 @@ private: void NewClientHandler(const Socket::Ptr& client, TlsRole role); void ListenerThreadProc(const Socket::Ptr& server); + void RelayMessage(const Endpoint::Ptr& except, const Dictionary::Ptr& message, bool persistent); + void CheckResultHandler(const Service::Ptr& service, const Dictionary::Ptr& cr, const String& authority); void NextCheckChangedHandler(const Service::Ptr& service, double nextCheck, const String& authority); void NextNotificationChangedHandler(const Notification::Ptr& notification, double nextCheck, const String& authority); diff --git a/components/cluster/endpoint.cpp b/components/cluster/endpoint.cpp index 261c4b8af..01ba7b9d0 100644 --- a/components/cluster/endpoint.cpp +++ b/components/cluster/endpoint.cpp @@ -26,6 +26,7 @@ #include "base/logger_fwd.h" #include "config/configitembuilder.h" #include +#include using namespace icinga; @@ -34,6 +35,41 @@ REGISTER_TYPE(Endpoint); boost::signals2::signal Endpoint::OnConnected; boost::signals2::signal Endpoint::OnMessageReceived; +void Endpoint::OnConfigLoaded(void) +{ + ObjectLock olock(this); + + OpenSpoolFile(); +} + +void Endpoint::Stop(void) +{ + ObjectLock olock(this); + + CloseSpoolFile(); +} + +void Endpoint::OpenSpoolFile(void) +{ + ASSERT(OwnsLock()); + + String path = GetSpoolPath(); + std::fstream *fp = new std::fstream(path.CStr(), std::fstream::out | std::ofstream::app); + + if (!fp->good()) + Log(LogWarning, "cluster", "Could not open spool file: " + path); + + m_SpoolFile = boost::make_shared(fp, true); +} + +void Endpoint::CloseSpoolFile(void) +{ + ASSERT(OwnsLock()); + + m_SpoolFile->Close(); + m_SpoolFile.reset(); +} + /** * Checks whether this endpoint is connected. * @@ -56,6 +92,24 @@ void Endpoint::SetClient(const Stream::Ptr& client) { ObjectLock olock(this); + CloseSpoolFile(); + + String path = GetSpoolPath(); + std::ifstream *fp = new std::ifstream(path.CStr(), std::ifstream::in); + + while (fp && !fp->eof()) { + char data[1024]; + fp->read(data, sizeof(data)); + client->Write(data, fp->gcount()); + } + + fp->close(); + delete fp; + + (void) unlink(path.CStr()); + + OpenSpoolFile(); + m_Client = client; } @@ -67,13 +121,15 @@ void Endpoint::SetClient(const Stream::Ptr& client) void Endpoint::SendMessage(const Dictionary::Ptr& message) { - if (!IsConnected()) { - // TODO: persist the message - return; - } + Stream::Ptr destination; + + if (!IsConnected()) + destination = m_SpoolFile; + else + destination = GetClient(); try { - JsonRpc::SendMessage(GetClient(), message); + JsonRpc::SendMessage(destination, message); } catch (const std::exception& ex) { std::ostringstream msgbuf; msgbuf << "Error while sending JSON-RPC message for endpoint '" << GetName() << "': " << boost::diagnostic_information(ex); @@ -136,6 +192,11 @@ void Endpoint::SetSeen(double ts) m_Seen = ts; } +String Endpoint::GetSpoolPath(void) const +{ + return Application::GetLocalStateDir() + "/spool/icinga2/cluster/" + GetName() + ".ns"; +} + void Endpoint::InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) const { DynamicObject::InternalSerialize(bag, attributeTypes); diff --git a/components/cluster/endpoint.h b/components/cluster/endpoint.h index b52dba08c..3530d671b 100644 --- a/components/cluster/endpoint.h +++ b/components/cluster/endpoint.h @@ -22,6 +22,7 @@ #include "base/dynamicobject.h" #include "base/stream.h" +#include "base/stdiostream.h" #include namespace icinga @@ -56,10 +57,15 @@ public: double GetSeen(void) const; void SetSeen(double ts); + String GetSpoolPath(void) const; + protected: virtual void InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) const; virtual void InternalDeserialize(const Dictionary::Ptr& bag, int attributeTypes); + virtual void OnConfigLoaded(void); + virtual void Stop(void); + private: Dictionary::Ptr m_Subscriptions; String m_Host; @@ -67,8 +73,12 @@ private: Stream::Ptr m_Client; double m_Seen; + StdioStream::Ptr m_SpoolFile; void MessageThreadProc(const Stream::Ptr& stream); + + void OpenSpoolFile(void); + void CloseSpoolFile(void); }; }