From a49d298b9c2fcd089b69a943cf238dc635279793 Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Wed, 28 Aug 2013 14:59:41 +0200 Subject: [PATCH] Implement cluster events for comments. --- components/cluster/clustercomponent.cpp | 95 +++++++++++++++++++++++++ components/cluster/clustercomponent.h | 3 + components/cluster/jsonrpc.cpp | 4 +- lib/icinga/notification.cpp | 6 +- lib/icinga/notification.h | 4 +- lib/icinga/service-comment.cpp | 87 +++++++++++----------- lib/icinga/service.h | 19 ++--- lib/ido/servicedbobject.cpp | 47 ++++-------- lib/ido/servicedbobject.h | 4 +- 9 files changed, 173 insertions(+), 96 deletions(-) diff --git a/components/cluster/clustercomponent.cpp b/components/cluster/clustercomponent.cpp index bcb2572eb..33cb172e4 100644 --- a/components/cluster/clustercomponent.cpp +++ b/components/cluster/clustercomponent.cpp @@ -54,9 +54,12 @@ void ClusterComponent::Start(void) Service::OnNewCheckResult.connect(bind(&ClusterComponent::CheckResultHandler, this, _1, _2, _3)); Service::OnNextCheckChanged.connect(bind(&ClusterComponent::NextCheckChangedHandler, this, _1, _2, _3)); + Notification::OnNextNotificationChanged.connect(bind(&ClusterComponent::NextNotificationChangedHandler, this, _1, _2, _3)); Service::OnForceNextCheckChanged.connect(bind(&ClusterComponent::ForceNextCheckChangedHandler, this, _1, _2, _3)); Service::OnEnableActiveChecksChanged.connect(bind(&ClusterComponent::EnableActiveChecksChangedHandler, this, _1, _2, _3)); Service::OnEnablePassiveChecksChanged.connect(bind(&ClusterComponent::EnablePassiveChecksChangedHandler, this, _1, _2, _3)); + Service::OnCommentAdded.connect(bind(&ClusterComponent::CommentAddedHandler, this, _1, _2, _3)); + Service::OnCommentRemoved.connect(bind(&ClusterComponent::CommentRemovedHandler, this, _1, _2, _3)); Endpoint::OnMessageReceived.connect(bind(&ClusterComponent::MessageHandler, this, _1, _2)); } @@ -287,6 +290,25 @@ void ClusterComponent::NextCheckChangedHandler(const Service::Ptr& service, doub } } +void ClusterComponent::NextNotificationChangedHandler(const Notification::Ptr& notification, double nextNotification, const String& authority) +{ + if (!authority.IsEmpty() && authority != GetIdentity()) + return; + + Dictionary::Ptr params = boost::make_shared(); + params->Set("notification", notification->GetName()); + params->Set("next_notification", nextNotification); + + Dictionary::Ptr message = boost::make_shared(); + message->Set("jsonrpc", "2.0"); + message->Set("method", "cluster::SetNextNotification"); + message->Set("params", params); + + BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { + endpoint->SendMessage(message); + } +} + void ClusterComponent::ForceNextCheckChangedHandler(const Service::Ptr& service, bool forced, const String& authority) { if (!authority.IsEmpty() && authority != GetIdentity()) @@ -344,6 +366,44 @@ void ClusterComponent::EnablePassiveChecksChangedHandler(const Service::Ptr& ser } } +void ClusterComponent::CommentAddedHandler(const Service::Ptr& service, const Dictionary::Ptr& comment, const String& authority) +{ + if (!authority.IsEmpty() && authority != GetIdentity()) + return; + + Dictionary::Ptr params = boost::make_shared(); + params->Set("service", service->GetName()); + params->Set("comment", comment); + + Dictionary::Ptr message = boost::make_shared(); + message->Set("jsonrpc", "2.0"); + message->Set("method", "cluster::AddComment"); + message->Set("params", params); + + BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { + endpoint->SendMessage(message); + } +} + +void ClusterComponent::CommentRemovedHandler(const Service::Ptr& service, const Dictionary::Ptr& comment, const String& authority) +{ + if (!authority.IsEmpty() && authority != GetIdentity()) + return; + + Dictionary::Ptr params = boost::make_shared(); + params->Set("service", service->GetName()); + params->Set("id", comment->Get("id")); + + Dictionary::Ptr message = boost::make_shared(); + message->Set("jsonrpc", "2.0"); + message->Set("method", "cluster::RemoveComment"); + message->Set("params", params); + + BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { + endpoint->SendMessage(message); + } +} + void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message) { BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { @@ -414,6 +474,41 @@ void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Diction bool enabled = params->Get("enabled"); service->SetEnablePassiveChecks(enabled, sender->GetName()); + } else if (message->Get("method") == "cluster::SetNextNotification") { + String nfc = params->Get("notification"); + + Notification::Ptr notification = Notification::GetByName(nfc); + + if (!notification) + return; + + bool nextNotification = params->Get("next_notification"); + + notification->SetNextNotification(nextNotification, sender->GetName()); + } else if (message->Get("method") == "cluster::AddComment") { + String svc = params->Get("service"); + + Service::Ptr service = Service::GetByName(svc); + + if (!service) + return; + + Dictionary::Ptr comment = params->Get("comment"); + + long type = static_cast(comment->Get("entry_type")); + service->AddComment(static_cast(type), comment->Get("author"), + comment->Get("text"), comment->Get("expire_time"), comment->Get("id"), sender->GetName()); + } else if (message->Get("method") == "cluster::RemoveComment") { + String svc = params->Get("service"); + + Service::Ptr service = Service::GetByName(svc); + + if (!service) + return; + + String id = params->Get("id"); + + service->RemoveComment(id, sender->GetName()); } } diff --git a/components/cluster/clustercomponent.h b/components/cluster/clustercomponent.h index 58533ce75..1abee36e5 100644 --- a/components/cluster/clustercomponent.h +++ b/components/cluster/clustercomponent.h @@ -81,9 +81,12 @@ private: void CheckResultHandler(const Service::Ptr& service, const Dictionary::Ptr& cr, const String& authority); void NextCheckChangedHandler(const Service::Ptr& service, double nextCheck, const String& authority); + void NextNotificationChangedHandler(const Notification::Ptr& notification, double nextCheck, const String& authority); void ForceNextCheckChangedHandler(const Service::Ptr& service, bool forced, const String& authority); void EnableActiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority); void EnablePassiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority); + void CommentAddedHandler(const Service::Ptr& service, const Dictionary::Ptr& comment, const String& authority); + void CommentRemovedHandler(const Service::Ptr& service, const Dictionary::Ptr& comment, const String& authority); void MessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message); }; diff --git a/components/cluster/jsonrpc.cpp b/components/cluster/jsonrpc.cpp index 2cc19f828..edd84d01b 100644 --- a/components/cluster/jsonrpc.cpp +++ b/components/cluster/jsonrpc.cpp @@ -34,7 +34,7 @@ using namespace icinga; void JsonRpc::SendMessage(const Stream::Ptr& stream, const Dictionary::Ptr& message) { String json = Value(message).Serialize(); - //std::cerr << ">> " << json << std::endl; + std::cerr << ">> " << json << std::endl; NetString::WriteStringToStream(stream, json); } @@ -44,7 +44,7 @@ Dictionary::Ptr JsonRpc::ReadMessage(const Stream::Ptr& stream) if (!NetString::ReadStringFromStream(stream, &jsonString)) BOOST_THROW_EXCEPTION(std::runtime_error("ReadStringFromStream signalled EOF.")); - //std::cerr << "<< " << jsonString << std::endl; + std::cerr << "<< " << jsonString << std::endl; Value value = Value::Deserialize(jsonString); if (!value.IsObjectType()) { diff --git a/lib/icinga/notification.cpp b/lib/icinga/notification.cpp index a8370c53e..8b19d2203 100644 --- a/lib/icinga/notification.cpp +++ b/lib/icinga/notification.cpp @@ -34,6 +34,8 @@ using namespace icinga; REGISTER_TYPE(Notification); +boost::signals2::signal Notification::OnNextNotificationChanged; + void Notification::Start(void) { DynamicObject::Start(); @@ -182,9 +184,11 @@ double Notification::GetNextNotification(void) const * Sets the timestamp when the next periodical notification should be sent. * This does not affect notifications that are sent for state changes. */ -void Notification::SetNextNotification(double time) +void Notification::SetNextNotification(double time, const String& authority) { m_NextNotification = time; + + Utility::QueueAsyncCallback(bind(boost::ref(OnNextNotificationChanged), GetSelf(), time, authority)); } long Notification::GetNotificationNumber(void) const diff --git a/lib/icinga/notification.h b/lib/icinga/notification.h index 77a05fa26..6be3d23d7 100644 --- a/lib/icinga/notification.h +++ b/lib/icinga/notification.h @@ -77,7 +77,7 @@ public: void SetLastNotification(double time); double GetNextNotification(void) const; - void SetNextNotification(double time); + void SetNextNotification(double time, const String& authority = String()); long GetNotificationNumber(void) const; void UpdateNotificationNumber(void); @@ -89,6 +89,8 @@ public: virtual bool ResolveMacro(const String& macro, const Dictionary::Ptr& cr, String *result) const; + static boost::signals2::signal OnNextNotificationChanged; + protected: virtual void Start(void); virtual void Stop(void); diff --git a/lib/icinga/service-comment.cpp b/lib/icinga/service-comment.cpp index c1fef430f..92807a1af 100644 --- a/lib/icinga/service-comment.cpp +++ b/lib/icinga/service-comment.cpp @@ -33,11 +33,10 @@ static int l_NextCommentID = 1; static boost::mutex l_CommentMutex; static std::map l_LegacyCommentsCache; static std::map l_CommentsCache; -//static bool l_CommentsCacheNeedsUpdate = false; -//static Timer::Ptr l_CommentsCacheTimer; static Timer::Ptr l_CommentsExpireTimer; -boost::signals2::signal Service::OnCommentsChanged; +boost::signals2::signal Service::OnCommentAdded; +boost::signals2::signal Service::OnCommentRemoved; int Service::GetNextCommentID(void) { @@ -52,9 +51,17 @@ Dictionary::Ptr Service::GetComments(void) const } String Service::AddComment(CommentType entryType, const String& author, - const String& text, double expireTime) + const String& text, double expireTime, const String& id, const String& authority) { + String uid; + + if (id.IsEmpty()) + uid = Utility::NewUniqueID(); + else + uid = id; + Dictionary::Ptr comment = boost::make_shared(); + comment->Set("id", uid); comment->Set("entry_time", Utility::GetTime()); comment->Set("entry_type", entryType); comment->Set("author", author); @@ -83,39 +90,43 @@ String Service::AddComment(CommentType entryType, const String& author, m_Comments = comments; } - String id = Utility::NewUniqueID(); - { ObjectLock olock(this); - comments->Set(id, comment); + comments->Set(uid, comment); } { boost::mutex::scoped_lock lock(l_CommentMutex); - l_LegacyCommentsCache[legacy_id] = id; - l_CommentsCache[id] = GetSelf(); + l_LegacyCommentsCache[legacy_id] = uid; + l_CommentsCache[uid] = GetSelf(); } - OnCommentsChanged(GetSelf(), id, CommentChangedAdded); + Utility::QueueAsyncCallback(bind(boost::ref(OnCommentAdded), GetSelf(), comment, authority)); - return id; + return uid; } void Service::RemoveAllComments(void) { - m_Comments = Empty; + std::vector ids; + Dictionary::Ptr comments = m_Comments; - { - boost::mutex::scoped_lock lock(l_CommentMutex); - l_LegacyCommentsCache.clear(); - l_CommentsCache.clear(); + if (!comments) + return; + + ObjectLock olock(comments); + String id; + BOOST_FOREACH(boost::tie(id, boost::tuples::ignore), comments) { + ids.push_back(id); } - OnCommentsChanged(GetSelf(), Empty, CommentChangedDeleted); + BOOST_FOREACH(id, ids) { + RemoveComment(id); + } } -void Service::RemoveComment(const String& id) +void Service::RemoveComment(const String& id, const String& authority) { Service::Ptr owner = GetOwnerByCommentID(id); @@ -124,26 +135,27 @@ void Service::RemoveComment(const String& id) Dictionary::Ptr comments = owner->GetComments(); - if (comments) { - ObjectLock olock(owner); + if (!comments) + return; - Dictionary::Ptr comment = comments->Get(id); + ObjectLock olock(owner); - if (!comment) - return; + Dictionary::Ptr comment = comments->Get(id); - int legacy_id = comment->Get("legacy_id"); + if (!comment) + return; - comments->Remove(id); + int legacy_id = comment->Get("legacy_id"); - { - boost::mutex::scoped_lock lock(l_CommentMutex); - l_LegacyCommentsCache.erase(legacy_id); - l_CommentsCache.erase(id); - } + comments->Remove(id); - OnCommentsChanged(owner, id, CommentChangedDeleted); + { + boost::mutex::scoped_lock lock(l_CommentMutex); + l_LegacyCommentsCache.erase(legacy_id); + l_CommentsCache.erase(id); } + + Utility::QueueAsyncCallback(bind(boost::ref(OnCommentRemoved), owner, comment, authority)); } String Service::GetCommentIDFromLegacyID(int id) @@ -233,10 +245,8 @@ void Service::RemoveCommentsByType(int type) } } - if (!removedComments.empty()) { - BOOST_FOREACH(const String& id, removedComments) { - RemoveComment(id); - } + BOOST_FOREACH(const String& id, removedComments) { + RemoveComment(id); } } @@ -260,12 +270,9 @@ void Service::RemoveExpiredComments(void) } } - if (!expiredComments.empty()) { - BOOST_FOREACH(const String& id, expiredComments) { - RemoveComment(id); - } + BOOST_FOREACH(const String& id, expiredComments) { + RemoveComment(id); } - } void Service::CommentsExpireTimerHandler(void) diff --git a/lib/icinga/service.h b/lib/icinga/service.h index be7eab30d..f835ac26a 100644 --- a/lib/icinga/service.h +++ b/lib/icinga/service.h @@ -84,18 +84,6 @@ enum FlappingState FlappingEnabled = 3 }; -/** - * The state of a changed comment - * - * @ingroup icinga - */ -enum CommentChangedType -{ - CommentChangedAdded = 0, - CommentChangedUpdated = 1, - CommentChangedDeleted = 2 -}; - /** * The state of a changed downtime * @@ -255,7 +243,8 @@ public: static boost::signals2::signal OnNotificationSentChanged; static boost::signals2::signal OnDowntimeChanged; static boost::signals2::signal OnFlappingChanged; - static boost::signals2::signal OnCommentsChanged; + static boost::signals2::signal OnCommentAdded; + static boost::signals2::signal OnCommentRemoved; static boost::signals2::signal OnDowntimesChanged; virtual bool ResolveMacro(const String& macro, const Dictionary::Ptr& cr, String *result) const; @@ -291,11 +280,11 @@ public: Dictionary::Ptr GetComments(void) const; String AddComment(CommentType entryType, const String& author, - const String& text, double expireTime); + const String& text, double expireTime, const String& id = String(), const String& authority = String()); void RemoveAllComments(void); void RemoveCommentsByType(int type); - static void RemoveComment(const String& id); + static void RemoveComment(const String& id, const String& authority = String()); static String GetCommentIDFromLegacyID(int id); static Service::Ptr GetOwnerByCommentID(const String& id); diff --git a/lib/ido/servicedbobject.cpp b/lib/ido/servicedbobject.cpp index 644a902df..5e6e773b3 100644 --- a/lib/ido/servicedbobject.cpp +++ b/lib/ido/servicedbobject.cpp @@ -39,7 +39,8 @@ INITIALIZE_ONCE(ServiceDbObject, &ServiceDbObject::StaticInitialize); void ServiceDbObject::StaticInitialize(void) { - Service::OnCommentsChanged.connect(boost::bind(&ServiceDbObject::CommentsChangedHandler, _1, _2, _3)); + Service::OnCommentAdded.connect(boost::bind(&ServiceDbObject::AddComment, _1, _2)); + Service::OnCommentRemoved.connect(boost::bind(&ServiceDbObject::RemoveComment, _1, _2)); Service::OnDowntimesChanged.connect(boost::bind(&ServiceDbObject::DowntimesChangedHandler, _1, _2, _3)); } @@ -248,7 +249,7 @@ void ServiceDbObject::OnConfigUpdate(void) } /* update comments and downtimes on config change */ - CommentsChangedHandler(service, Empty, CommentChangedUpdated); + AddComments(service); DowntimesChangedHandler(service, Empty, DowntimeChangedUpdated); /* service host config update */ @@ -288,35 +289,6 @@ void ServiceDbObject::OnStatusUpdate(void) dbobj->SendStatusUpdate(); } -void ServiceDbObject::CommentsChangedHandler(const Service::Ptr& svcfilter, const String& id, CommentChangedType type) -{ - if (type == CommentChangedUpdated || type == CommentChangedDeleted) { - /* we cannot determine which comment id is deleted - * id cache may not be in sync - */ - BOOST_FOREACH(const Service::Ptr& service, DynamicType::GetObjects()) { - if (svcfilter && svcfilter != service) - continue; - - Host::Ptr host = service->GetHost(); - - if (!host) - continue; - - /* delete all comments associated for this host/service */ - DeleteComments(service); - - /* dump all comments */ - AddComments(service); - } - } else if (type == CommentChangedAdded) { - Dictionary::Ptr comment = Service::GetCommentByID(id); - AddComment(svcfilter, comment); - } else { - Log(LogDebug, "ido", "invalid comment change type: " + type); - } -} - void ServiceDbObject::AddComments(const Service::Ptr& service) { /* dump all comments */ @@ -401,10 +373,10 @@ void ServiceDbObject::AddCommentByType(const DynamicObject::Ptr& object, const D OnQuery(query1); } -void ServiceDbObject::DeleteComments(const Service::Ptr& service) +void ServiceDbObject::RemoveComments(const Service::Ptr& service) { - /* delete all comments associated for this host/service */ - Log(LogDebug, "ido", "delete comments for '" + service->GetName() + "'"); + /* remove all comments associated for this host/service */ + Log(LogDebug, "ido", "remove comments for '" + service->GetName() + "'"); Host::Ptr host = service->GetHost(); @@ -418,7 +390,7 @@ void ServiceDbObject::DeleteComments(const Service::Ptr& service) query1.WhereCriteria->Set("object_id", service); OnQuery(query1); - /* delete hostcheck service's host comments */ + /* remove hostcheck service's host comments */ if (host->GetHostCheckService() == service) { DbQuery query2; query2.Table = "comments"; @@ -429,6 +401,11 @@ void ServiceDbObject::DeleteComments(const Service::Ptr& service) } } +void ServiceDbObject::RemoveComment(const Service::Ptr& service, const Dictionary::Ptr& comment) +{ + /* TODO: implement */ +} + void ServiceDbObject::DowntimesChangedHandler(const Service::Ptr& svcfilter, const String& id, DowntimeChangedType type) { if (type == DowntimeChangedUpdated || type == DowntimeChangedDeleted) { diff --git a/lib/ido/servicedbobject.h b/lib/ido/servicedbobject.h index 080cd9079..8691ba079 100644 --- a/lib/ido/servicedbobject.h +++ b/lib/ido/servicedbobject.h @@ -51,11 +51,11 @@ protected: virtual void OnStatusUpdate(void); private: - static void CommentsChangedHandler(const Service::Ptr& service, const String& id, CommentChangedType type); static void AddComments(const Service::Ptr& service); static void AddComment(const Service::Ptr& service, const Dictionary::Ptr& comment); static void AddCommentByType(const DynamicObject::Ptr& object, const Dictionary::Ptr& comment); - static void DeleteComments(const Service::Ptr& service); + static void RemoveComments(const Service::Ptr& service); + static void RemoveComment(const Service::Ptr& service, const Dictionary::Ptr& comment); static void DowntimesChangedHandler(const Service::Ptr& service, const String& id, DowntimeChangedType type); static void AddDowntimes(const Service::Ptr& service); -- 2.40.0