]> granicus.if.org Git - icinga2/commitdiff
Implement cluster events for comments.
authorGunnar Beutner <gunnar.beutner@netways.de>
Wed, 28 Aug 2013 12:59:41 +0000 (14:59 +0200)
committerGunnar Beutner <gunnar.beutner@netways.de>
Wed, 28 Aug 2013 12:59:41 +0000 (14:59 +0200)
components/cluster/clustercomponent.cpp
components/cluster/clustercomponent.h
components/cluster/jsonrpc.cpp
lib/icinga/notification.cpp
lib/icinga/notification.h
lib/icinga/service-comment.cpp
lib/icinga/service.h
lib/ido/servicedbobject.cpp
lib/ido/servicedbobject.h

index bcb2572eb3a523fff8380312fdbfce77de97982e..33cb172e4787eb8931afdc60c8cfef5253df1c31 100644 (file)
@@ -54,9 +54,12 @@ void ClusterComponent::Start(void)
 
        Service::OnNewCheckResult.connect(bind(&ClusterComponent::CheckResultHandler, this, _1, _2, _3));
        Service::OnNextCheckChanged.connect(bind(&ClusterComponent::NextCheckChangedHandler, this, _1, _2, _3));
+       Notification::OnNextNotificationChanged.connect(bind(&ClusterComponent::NextNotificationChangedHandler, this, _1, _2, _3));
        Service::OnForceNextCheckChanged.connect(bind(&ClusterComponent::ForceNextCheckChangedHandler, this, _1, _2, _3));
        Service::OnEnableActiveChecksChanged.connect(bind(&ClusterComponent::EnableActiveChecksChangedHandler, this, _1, _2, _3));
        Service::OnEnablePassiveChecksChanged.connect(bind(&ClusterComponent::EnablePassiveChecksChangedHandler, this, _1, _2, _3));
+       Service::OnCommentAdded.connect(bind(&ClusterComponent::CommentAddedHandler, this, _1, _2, _3));
+       Service::OnCommentRemoved.connect(bind(&ClusterComponent::CommentRemovedHandler, this, _1, _2, _3));
 
        Endpoint::OnMessageReceived.connect(bind(&ClusterComponent::MessageHandler, this, _1, _2));
 }
@@ -287,6 +290,25 @@ void ClusterComponent::NextCheckChangedHandler(const Service::Ptr& service, doub
        }
 }
 
+void ClusterComponent::NextNotificationChangedHandler(const Notification::Ptr& notification, double nextNotification, const String& authority)
+{
+       if (!authority.IsEmpty() && authority != GetIdentity())
+               return;
+
+       Dictionary::Ptr params = boost::make_shared<Dictionary>();
+       params->Set("notification", notification->GetName());
+       params->Set("next_notification", nextNotification);
+
+       Dictionary::Ptr message = boost::make_shared<Dictionary>();
+       message->Set("jsonrpc", "2.0");
+       message->Set("method", "cluster::SetNextNotification");
+       message->Set("params", params);
+
+       BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
+               endpoint->SendMessage(message);
+       }
+}
+
 void ClusterComponent::ForceNextCheckChangedHandler(const Service::Ptr& service, bool forced, const String& authority)
 {
        if (!authority.IsEmpty() && authority != GetIdentity())
@@ -344,6 +366,44 @@ void ClusterComponent::EnablePassiveChecksChangedHandler(const Service::Ptr& ser
        }
 }
 
+void ClusterComponent::CommentAddedHandler(const Service::Ptr& service, const Dictionary::Ptr& comment, const String& authority)
+{
+       if (!authority.IsEmpty() && authority != GetIdentity())
+               return;
+
+       Dictionary::Ptr params = boost::make_shared<Dictionary>();
+       params->Set("service", service->GetName());
+       params->Set("comment", comment);
+
+       Dictionary::Ptr message = boost::make_shared<Dictionary>();
+       message->Set("jsonrpc", "2.0");
+       message->Set("method", "cluster::AddComment");
+       message->Set("params", params);
+
+       BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
+               endpoint->SendMessage(message);
+       }
+}
+
+void ClusterComponent::CommentRemovedHandler(const Service::Ptr& service, const Dictionary::Ptr& comment, const String& authority)
+{
+       if (!authority.IsEmpty() && authority != GetIdentity())
+               return;
+
+       Dictionary::Ptr params = boost::make_shared<Dictionary>();
+       params->Set("service", service->GetName());
+       params->Set("id", comment->Get("id"));
+
+       Dictionary::Ptr message = boost::make_shared<Dictionary>();
+       message->Set("jsonrpc", "2.0");
+       message->Set("method", "cluster::RemoveComment");
+       message->Set("params", params);
+
+       BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
+               endpoint->SendMessage(message);
+       }
+}
+
 void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message)
 {
        BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
@@ -414,6 +474,41 @@ void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Diction
                bool enabled = params->Get("enabled");
 
                service->SetEnablePassiveChecks(enabled, sender->GetName());
+       } else if (message->Get("method") == "cluster::SetNextNotification") {
+               String nfc = params->Get("notification");
+
+               Notification::Ptr notification = Notification::GetByName(nfc);
+
+               if (!notification)
+                       return;
+
+               bool nextNotification = params->Get("next_notification");
+
+               notification->SetNextNotification(nextNotification, sender->GetName());
+       } else if (message->Get("method") == "cluster::AddComment") {
+               String svc = params->Get("service");
+
+               Service::Ptr service = Service::GetByName(svc);
+
+               if (!service)
+                       return;
+
+               Dictionary::Ptr comment = params->Get("comment");
+
+               long type = static_cast<long>(comment->Get("entry_type"));
+               service->AddComment(static_cast<CommentType>(type), comment->Get("author"),
+                   comment->Get("text"), comment->Get("expire_time"), comment->Get("id"), sender->GetName());
+       } else if (message->Get("method") == "cluster::RemoveComment") {
+               String svc = params->Get("service");
+
+               Service::Ptr service = Service::GetByName(svc);
+
+               if (!service)
+                       return;
+
+               String id = params->Get("id");
+
+               service->RemoveComment(id, sender->GetName());
        }
 }
 
index 58533ce7588a1d572438f78242e1dbebfc9029e7..1abee36e54e6bc230ae2c0820e470e0e3be4dba1 100644 (file)
@@ -81,9 +81,12 @@ private:
 
        void CheckResultHandler(const Service::Ptr& service, const Dictionary::Ptr& cr, const String& authority);
        void NextCheckChangedHandler(const Service::Ptr& service, double nextCheck, const String& authority);
+       void NextNotificationChangedHandler(const Notification::Ptr& notification, double nextCheck, const String& authority);
        void ForceNextCheckChangedHandler(const Service::Ptr& service, bool forced, const String& authority);
        void EnableActiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority);
        void EnablePassiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority);
+       void CommentAddedHandler(const Service::Ptr& service, const Dictionary::Ptr& comment, const String& authority);
+       void CommentRemovedHandler(const Service::Ptr& service, const Dictionary::Ptr& comment, const String& authority);
        void MessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message);
 
 };
index 2cc19f8288b0f3ef65ac221621e7086413a78567..edd84d01b50aa210ff73984b5f85bd9d16832893 100644 (file)
@@ -34,7 +34,7 @@ using namespace icinga;
 void JsonRpc::SendMessage(const Stream::Ptr& stream, const Dictionary::Ptr& message)
 {
        String json = Value(message).Serialize();
-       //std::cerr << ">> " << json << std::endl;
+       std::cerr << ">> " << json << std::endl;
        NetString::WriteStringToStream(stream, json);
 }
 
@@ -44,7 +44,7 @@ Dictionary::Ptr JsonRpc::ReadMessage(const Stream::Ptr& stream)
        if (!NetString::ReadStringFromStream(stream, &jsonString))
                BOOST_THROW_EXCEPTION(std::runtime_error("ReadStringFromStream signalled EOF."));
 
-       //std::cerr << "<< " << jsonString << std::endl;
+       std::cerr << "<< " << jsonString << std::endl;
        Value value = Value::Deserialize(jsonString);
 
        if (!value.IsObjectType<Dictionary>()) {
index a8370c53e94e06c88512033253a0e8dd9db828ea..8b19d22038bc8da430f0bdd4c4a22aae7cf4b4a4 100644 (file)
@@ -34,6 +34,8 @@ using namespace icinga;
 
 REGISTER_TYPE(Notification);
 
+boost::signals2::signal<void (const Notification::Ptr&, double, const String&)> Notification::OnNextNotificationChanged;
+
 void Notification::Start(void)
 {
        DynamicObject::Start();
@@ -182,9 +184,11 @@ double Notification::GetNextNotification(void) const
  * Sets the timestamp when the next periodical notification should be sent.
  * This does not affect notifications that are sent for state changes.
  */
-void Notification::SetNextNotification(double time)
+void Notification::SetNextNotification(double time, const String& authority)
 {
        m_NextNotification = time;
+
+       Utility::QueueAsyncCallback(bind(boost::ref(OnNextNotificationChanged), GetSelf(), time, authority));
 }
 
 long Notification::GetNotificationNumber(void) const
index 77a05fa2618561bc9468e842f1d2ad674a5def92..6be3d23d76a5bdffe1cefce091e964bd0649402f 100644 (file)
@@ -77,7 +77,7 @@ public:
        void SetLastNotification(double time);
 
        double GetNextNotification(void) const;
-       void SetNextNotification(double time);
+       void SetNextNotification(double time, const String& authority = String());
 
        long GetNotificationNumber(void) const;
        void UpdateNotificationNumber(void);
@@ -89,6 +89,8 @@ public:
 
        virtual bool ResolveMacro(const String& macro, const Dictionary::Ptr& cr, String *result) const;
 
+       static boost::signals2::signal<void (const Notification::Ptr&, double, const String&)> OnNextNotificationChanged;
+
 protected:
        virtual void Start(void);
        virtual void Stop(void);
index c1fef430fd9feb827af5b89937171db280206c78..92807a1aff625ecacc29702fda09eaf6a0d153cb 100644 (file)
@@ -33,11 +33,10 @@ static int l_NextCommentID = 1;
 static boost::mutex l_CommentMutex;
 static std::map<int, String> l_LegacyCommentsCache;
 static std::map<String, Service::WeakPtr> l_CommentsCache;
-//static bool l_CommentsCacheNeedsUpdate = false;
-//static Timer::Ptr l_CommentsCacheTimer;
 static Timer::Ptr l_CommentsExpireTimer;
 
-boost::signals2::signal<void (const Service::Ptr&, const String&, CommentChangedType)> Service::OnCommentsChanged;
+boost::signals2::signal<void (const Service::Ptr&, const Dictionary::Ptr&, const String&)> Service::OnCommentAdded;
+boost::signals2::signal<void (const Service::Ptr&, const Dictionary::Ptr&, const String&)> Service::OnCommentRemoved;
 
 int Service::GetNextCommentID(void)
 {
@@ -52,9 +51,17 @@ Dictionary::Ptr Service::GetComments(void) const
 }
 
 String Service::AddComment(CommentType entryType, const String& author,
-    const String& text, double expireTime)
+    const String& text, double expireTime, const String& id, const String& authority)
 {
+       String uid;
+
+       if (id.IsEmpty())
+               uid = Utility::NewUniqueID();
+       else
+               uid = id;
+
        Dictionary::Ptr comment = boost::make_shared<Dictionary>();
+       comment->Set("id", uid);
        comment->Set("entry_time", Utility::GetTime());
        comment->Set("entry_type", entryType);
        comment->Set("author", author);
@@ -83,39 +90,43 @@ String Service::AddComment(CommentType entryType, const String& author,
                m_Comments = comments;
        }
 
-       String id = Utility::NewUniqueID();
-
        {
                ObjectLock olock(this);
 
-               comments->Set(id, comment);
+               comments->Set(uid, comment);
        }
 
        {
                boost::mutex::scoped_lock lock(l_CommentMutex);
-               l_LegacyCommentsCache[legacy_id] = id;
-               l_CommentsCache[id] = GetSelf();
+               l_LegacyCommentsCache[legacy_id] = uid;
+               l_CommentsCache[uid] = GetSelf();
        }
 
-       OnCommentsChanged(GetSelf(), id, CommentChangedAdded);
+       Utility::QueueAsyncCallback(bind(boost::ref(OnCommentAdded), GetSelf(), comment, authority));
 
-       return id;
+       return uid;
 }
 
 void Service::RemoveAllComments(void)
 {
-       m_Comments = Empty;
+       std::vector<String> ids;
+       Dictionary::Ptr comments = m_Comments;
 
-       {
-               boost::mutex::scoped_lock lock(l_CommentMutex);
-               l_LegacyCommentsCache.clear();
-               l_CommentsCache.clear();
+       if (!comments)
+               return;
+
+       ObjectLock olock(comments);
+       String id;
+       BOOST_FOREACH(boost::tie(id, boost::tuples::ignore), comments) {
+               ids.push_back(id);
        }
 
-       OnCommentsChanged(GetSelf(), Empty, CommentChangedDeleted);
+       BOOST_FOREACH(id, ids) {
+               RemoveComment(id);
+       }
 }
 
-void Service::RemoveComment(const String& id)
+void Service::RemoveComment(const String& id, const String& authority)
 {
        Service::Ptr owner = GetOwnerByCommentID(id);
 
@@ -124,26 +135,27 @@ void Service::RemoveComment(const String& id)
 
        Dictionary::Ptr comments = owner->GetComments();
 
-       if (comments) {
-               ObjectLock olock(owner);
+       if (!comments)
+               return;
 
-               Dictionary::Ptr comment = comments->Get(id);
+       ObjectLock olock(owner);
 
-               if (!comment)
-                       return;
+       Dictionary::Ptr comment = comments->Get(id);
 
-               int legacy_id = comment->Get("legacy_id");
+       if (!comment)
+               return;
 
-               comments->Remove(id);
+       int legacy_id = comment->Get("legacy_id");
 
-               {
-                       boost::mutex::scoped_lock lock(l_CommentMutex);
-                       l_LegacyCommentsCache.erase(legacy_id);
-                       l_CommentsCache.erase(id);
-               }
+       comments->Remove(id);
 
-               OnCommentsChanged(owner, id, CommentChangedDeleted);
+       {
+               boost::mutex::scoped_lock lock(l_CommentMutex);
+               l_LegacyCommentsCache.erase(legacy_id);
+               l_CommentsCache.erase(id);
        }
+
+       Utility::QueueAsyncCallback(bind(boost::ref(OnCommentRemoved), owner, comment, authority));
 }
 
 String Service::GetCommentIDFromLegacyID(int id)
@@ -233,10 +245,8 @@ void Service::RemoveCommentsByType(int type)
                }
        }
 
-       if (!removedComments.empty()) {
-               BOOST_FOREACH(const String& id, removedComments) {
-                       RemoveComment(id);
-               }
+       BOOST_FOREACH(const String& id, removedComments) {
+               RemoveComment(id);
        }
 }
 
@@ -260,12 +270,9 @@ void Service::RemoveExpiredComments(void)
                }
        }
 
-       if (!expiredComments.empty()) {
-               BOOST_FOREACH(const String& id, expiredComments) {
-                       RemoveComment(id);
-               }
+       BOOST_FOREACH(const String& id, expiredComments) {
+               RemoveComment(id);
        }
-
 }
 
 void Service::CommentsExpireTimerHandler(void)
index be7eab30d2eaf0c33e0367bdf8fa749b93ab66e8..f835ac26a320c87811b64111c3138ad97571b9fb 100644 (file)
@@ -84,18 +84,6 @@ enum FlappingState
        FlappingEnabled = 3
 };
 
-/**
- * The state of a changed comment
- *
- * @ingroup icinga
- */
-enum CommentChangedType
-{
-       CommentChangedAdded = 0,
-       CommentChangedUpdated = 1,
-       CommentChangedDeleted = 2
-};
-
 /**
  * The state of a changed downtime
  *
@@ -255,7 +243,8 @@ public:
        static boost::signals2::signal<void (const Service::Ptr&, const User::Ptr&, const NotificationType&, const Dictionary::Ptr&, const String&, const String&)> OnNotificationSentChanged;
        static boost::signals2::signal<void (const Service::Ptr&, DowntimeState)> OnDowntimeChanged;
        static boost::signals2::signal<void (const Service::Ptr&, FlappingState)> OnFlappingChanged;
-       static boost::signals2::signal<void (const Service::Ptr&, const String&, CommentChangedType)> OnCommentsChanged;
+       static boost::signals2::signal<void (const Service::Ptr&, const Dictionary::Ptr&, const String&)> OnCommentAdded;
+       static boost::signals2::signal<void (const Service::Ptr&, const Dictionary::Ptr&, const String&)> OnCommentRemoved;
        static boost::signals2::signal<void (const Service::Ptr&, const String&, DowntimeChangedType)> OnDowntimesChanged;
 
        virtual bool ResolveMacro(const String& macro, const Dictionary::Ptr& cr, String *result) const;
@@ -291,11 +280,11 @@ public:
        Dictionary::Ptr GetComments(void) const;
 
        String AddComment(CommentType entryType, const String& author,
-           const String& text, double expireTime);
+           const String& text, double expireTime, const String& id = String(), const String& authority = String());
 
        void RemoveAllComments(void);
        void RemoveCommentsByType(int type);
-       static void RemoveComment(const String& id);
+       static void RemoveComment(const String& id, const String& authority = String());
 
        static String GetCommentIDFromLegacyID(int id);
        static Service::Ptr GetOwnerByCommentID(const String& id);
index 644a902df169e72cc73c952f88af4704d0b57809..5e6e773b3924ea58eb7f9e6c0bf8295e66d9c4bd 100644 (file)
@@ -39,7 +39,8 @@ INITIALIZE_ONCE(ServiceDbObject, &ServiceDbObject::StaticInitialize);
 
 void ServiceDbObject::StaticInitialize(void)
 {
-       Service::OnCommentsChanged.connect(boost::bind(&ServiceDbObject::CommentsChangedHandler, _1, _2, _3));
+       Service::OnCommentAdded.connect(boost::bind(&ServiceDbObject::AddComment, _1, _2));
+       Service::OnCommentRemoved.connect(boost::bind(&ServiceDbObject::RemoveComment, _1, _2));
        Service::OnDowntimesChanged.connect(boost::bind(&ServiceDbObject::DowntimesChangedHandler, _1, _2, _3));
 }
 
@@ -248,7 +249,7 @@ void ServiceDbObject::OnConfigUpdate(void)
        }
 
        /* update comments and downtimes on config change */
-       CommentsChangedHandler(service, Empty, CommentChangedUpdated);
+       AddComments(service);
        DowntimesChangedHandler(service, Empty, DowntimeChangedUpdated);
 
        /* service host config update */
@@ -288,35 +289,6 @@ void ServiceDbObject::OnStatusUpdate(void)
        dbobj->SendStatusUpdate();
 }
 
-void ServiceDbObject::CommentsChangedHandler(const Service::Ptr& svcfilter, const String& id, CommentChangedType type)
-{
-       if (type == CommentChangedUpdated || type == CommentChangedDeleted) {
-               /* we cannot determine which comment id is deleted
-                * id cache may not be in sync
-                */
-               BOOST_FOREACH(const Service::Ptr& service, DynamicType::GetObjects<Service>()) {
-                       if (svcfilter && svcfilter != service)
-                               continue;
-
-                       Host::Ptr host = service->GetHost();
-
-                       if (!host)
-                               continue;
-
-                       /* delete all comments associated for this host/service */
-                       DeleteComments(service);
-
-                       /* dump all comments */
-                       AddComments(service);
-               }
-       } else if (type == CommentChangedAdded) {
-               Dictionary::Ptr comment = Service::GetCommentByID(id);
-               AddComment(svcfilter, comment);
-       } else {
-               Log(LogDebug, "ido", "invalid comment change type: " + type);
-       }
-}
-
 void ServiceDbObject::AddComments(const Service::Ptr& service)
 {
        /* dump all comments */
@@ -401,10 +373,10 @@ void ServiceDbObject::AddCommentByType(const DynamicObject::Ptr& object, const D
        OnQuery(query1);
 }
 
-void ServiceDbObject::DeleteComments(const Service::Ptr& service)
+void ServiceDbObject::RemoveComments(const Service::Ptr& service)
 {
-       /* delete all comments associated for this host/service */
-       Log(LogDebug, "ido", "delete comments for '" + service->GetName() + "'");
+       /* remove all comments associated for this host/service */
+       Log(LogDebug, "ido", "remove comments for '" + service->GetName() + "'");
 
        Host::Ptr host = service->GetHost();
 
@@ -418,7 +390,7 @@ void ServiceDbObject::DeleteComments(const Service::Ptr& service)
        query1.WhereCriteria->Set("object_id", service);
        OnQuery(query1);
 
-       /* delete hostcheck service's host comments */
+       /* remove hostcheck service's host comments */
        if (host->GetHostCheckService() == service) {
                DbQuery query2;
                query2.Table = "comments";
@@ -429,6 +401,11 @@ void ServiceDbObject::DeleteComments(const Service::Ptr& service)
        }
 }
 
+void ServiceDbObject::RemoveComment(const Service::Ptr& service, const Dictionary::Ptr& comment)
+{
+       /* TODO: implement */
+}
+
 void ServiceDbObject::DowntimesChangedHandler(const Service::Ptr& svcfilter, const String& id, DowntimeChangedType type)
 {
        if (type == DowntimeChangedUpdated || type == DowntimeChangedDeleted) {
index 080cd90799563c61f3deeebad7f9f313a19970c0..8691ba0799ace293b3f3771555528003e1bafaf6 100644 (file)
@@ -51,11 +51,11 @@ protected:
        virtual void OnStatusUpdate(void);
 
 private:
-       static void CommentsChangedHandler(const Service::Ptr& service, const String& id, CommentChangedType type);
        static void AddComments(const Service::Ptr& service);
        static void AddComment(const Service::Ptr& service, const Dictionary::Ptr& comment);
        static void AddCommentByType(const DynamicObject::Ptr& object, const Dictionary::Ptr& comment);
-       static void DeleteComments(const Service::Ptr& service);
+       static void RemoveComments(const Service::Ptr& service);
+       static void RemoveComment(const Service::Ptr& service, const Dictionary::Ptr& comment);
 
        static void DowntimesChangedHandler(const Service::Ptr& service, const String& id, DowntimeChangedType type);
        static void AddDowntimes(const Service::Ptr& service);