]> granicus.if.org Git - icinga2/commitdiff
Implement some cluster events.
authorGunnar Beutner <gunnar.beutner@netways.de>
Wed, 28 Aug 2013 09:12:20 +0000 (11:12 +0200)
committerGunnar Beutner <gunnar.beutner@netways.de>
Wed, 28 Aug 2013 09:12:20 +0000 (11:12 +0200)
components/cluster/clustercomponent.cpp
components/cluster/clustercomponent.h
icinga-app/Makefile.am
lib/icinga/host.cpp
lib/icinga/service-check.cpp
lib/icinga/service-notification.cpp
lib/icinga/service.h

index 790e9d3407595b8c69dd4208c6fc00f16d69f72d..bcb2572eb3a523fff8380312fdbfce77de97982e 100644 (file)
@@ -52,7 +52,12 @@ void ClusterComponent::Start(void)
        m_ReconnectTimer->SetInterval(5);
        m_ReconnectTimer->Start();
 
-       Service::OnNewCheckResult.connect(bind(&ClusterComponent::CheckResultHandler, this, _1, _2));
+       Service::OnNewCheckResult.connect(bind(&ClusterComponent::CheckResultHandler, this, _1, _2, _3));
+       Service::OnNextCheckChanged.connect(bind(&ClusterComponent::NextCheckChangedHandler, this, _1, _2, _3));
+       Service::OnForceNextCheckChanged.connect(bind(&ClusterComponent::ForceNextCheckChangedHandler, this, _1, _2, _3));
+       Service::OnEnableActiveChecksChanged.connect(bind(&ClusterComponent::EnableActiveChecksChangedHandler, this, _1, _2, _3));
+       Service::OnEnablePassiveChecksChanged.connect(bind(&ClusterComponent::EnablePassiveChecksChangedHandler, this, _1, _2, _3));
+
        Endpoint::OnMessageReceived.connect(bind(&ClusterComponent::MessageHandler, this, _1, _2));
 }
 
@@ -244,13 +249,11 @@ void ClusterComponent::ReconnectTimerHandler(void)
        }
 }
 
-void ClusterComponent::CheckResultHandler(const Service::Ptr& service, const Dictionary::Ptr& cr)
+void ClusterComponent::CheckResultHandler(const Service::Ptr& service, const Dictionary::Ptr& cr, const String& authority)
 {
-       if (cr->Contains("source") && cr->Get("source") != GetIdentity())
+       if (!authority.IsEmpty() && authority != GetIdentity())
                return;
 
-       cr->Set("source", GetIdentity());
-
        Dictionary::Ptr params = boost::make_shared<Dictionary>();
        params->Set("service", service->GetName());
        params->Set("check_result", cr);
@@ -265,12 +268,100 @@ void ClusterComponent::CheckResultHandler(const Service::Ptr& service, const Dic
        }
 }
 
-void ClusterComponent::MessageHandler(const Endpoint::Ptr& endpoint, const Dictionary::Ptr& message)
+void ClusterComponent::NextCheckChangedHandler(const Service::Ptr& service, double nextCheck, const String& authority)
+{
+       if (!authority.IsEmpty() && authority != GetIdentity())
+               return;
+
+       Dictionary::Ptr params = boost::make_shared<Dictionary>();
+       params->Set("service", service->GetName());
+       params->Set("next_check", nextCheck);
+
+       Dictionary::Ptr message = boost::make_shared<Dictionary>();
+       message->Set("jsonrpc", "2.0");
+       message->Set("method", "cluster::SetNextCheck");
+       message->Set("params", params);
+
+       BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
+               endpoint->SendMessage(message);
+       }
+}
+
+void ClusterComponent::ForceNextCheckChangedHandler(const Service::Ptr& service, bool forced, const String& authority)
+{
+       if (!authority.IsEmpty() && authority != GetIdentity())
+               return;
+
+       Dictionary::Ptr params = boost::make_shared<Dictionary>();
+       params->Set("service", service->GetName());
+       params->Set("forced", forced);
+
+       Dictionary::Ptr message = boost::make_shared<Dictionary>();
+       message->Set("jsonrpc", "2.0");
+       message->Set("method", "cluster::SetForceNextCheck");
+       message->Set("params", params);
+
+       BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
+               endpoint->SendMessage(message);
+       }
+}
+
+void ClusterComponent::EnableActiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority)
+{
+       if (!authority.IsEmpty() && authority != GetIdentity())
+               return;
+
+       Dictionary::Ptr params = boost::make_shared<Dictionary>();
+       params->Set("service", service->GetName());
+       params->Set("enabled", enabled);
+
+       Dictionary::Ptr message = boost::make_shared<Dictionary>();
+       message->Set("jsonrpc", "2.0");
+       message->Set("method", "cluster::SetEnableActiveChecks");
+       message->Set("params", params);
+
+       BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
+               endpoint->SendMessage(message);
+       }
+}
+
+void ClusterComponent::EnablePassiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority)
+{
+       if (!authority.IsEmpty() && authority != GetIdentity())
+               return;
+
+       Dictionary::Ptr params = boost::make_shared<Dictionary>();
+       params->Set("service", service->GetName());
+       params->Set("enabled", enabled);
+
+       Dictionary::Ptr message = boost::make_shared<Dictionary>();
+       message->Set("jsonrpc", "2.0");
+       message->Set("method", "cluster::SetEnablePassiveChecks");
+       message->Set("params", params);
+
+       BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
+               endpoint->SendMessage(message);
+       }
+}
+
+void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message)
 {
+       BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
+               if (sender != endpoint)
+                       endpoint->SendMessage(message);
+       }
+
+       Dictionary::Ptr params = message->Get("params");
+
+       if (!params)
+               return;
+
        if (message->Get("method") == "cluster::CheckResult") {
-               Dictionary::Ptr params = message->Get("params");
+               String svc = params->Get("service");
 
-               if (!params)
+               Service::Ptr service = Service::GetByName(svc);
+
+               if (!service)
                        return;
 
                Dictionary::Ptr cr = params->Get("check_result");
@@ -278,6 +369,41 @@ void ClusterComponent::MessageHandler(const Endpoint::Ptr& endpoint, const Dicti
                if (!cr)
                        return;
 
+               service->ProcessCheckResult(cr, sender->GetName());
+       } else if (message->Get("method") == "cluster::SetNextCheck") {
+               String svc = params->Get("service");
+
+               Service::Ptr service = Service::GetByName(svc);
+
+               if (!service)
+                       return;
+
+               double nextCheck = params->Get("next_check");
+
+               service->SetNextCheck(nextCheck, sender->GetName());
+       } else if (message->Get("method") == "cluster::SetForceNextCheck") {
+               String svc = params->Get("service");
+
+               Service::Ptr service = Service::GetByName(svc);
+
+               if (!service)
+                       return;
+
+               bool forced = params->Get("forced");
+
+               service->SetForceNextCheck(forced, sender->GetName());
+       } else if (message->Get("method") == "cluster::SetEnableActiveChecks") {
+               String svc = params->Get("service");
+
+               Service::Ptr service = Service::GetByName(svc);
+
+               if (!service)
+                       return;
+
+               bool enabled = params->Get("enabled");
+
+               service->SetEnableActiveChecks(enabled, sender->GetName());
+       } else if (message->Get("method") == "cluster::SetEnablePassiveChecks") {
                String svc = params->Get("service");
 
                Service::Ptr service = Service::GetByName(svc);
@@ -285,12 +411,9 @@ void ClusterComponent::MessageHandler(const Endpoint::Ptr& endpoint, const Dicti
                if (!service)
                        return;
 
-               service->ProcessCheckResult(cr);
+               bool enabled = params->Get("enabled");
 
-               /* Reschedule the next check. The side effect of this is that for as long
-                * as we receive results for a service we won't execute any
-                * active checks. */
-               service->SetNextCheck(Utility::GetTime() + service->GetCheckInterval());
+               service->SetEnablePassiveChecks(enabled, sender->GetName());
        }
 }
 
index 06b752c97baefe115ebb2d95dd5fe1acdef6bd75..58533ce7588a1d572438f78242e1dbebfc9029e7 100644 (file)
@@ -79,8 +79,12 @@ private:
        void NewClientHandler(const Socket::Ptr& client, TlsRole role);
        void ListenerThreadProc(const Socket::Ptr& server);
 
-       void CheckResultHandler(const Service::Ptr& service, const Dictionary::Ptr& cr);
-       void MessageHandler(const Endpoint::Ptr& endpoint, const Dictionary::Ptr& message);
+       void CheckResultHandler(const Service::Ptr& service, const Dictionary::Ptr& cr, const String& authority);
+       void NextCheckChangedHandler(const Service::Ptr& service, double nextCheck, const String& authority);
+       void ForceNextCheckChangedHandler(const Service::Ptr& service, bool forced, const String& authority);
+       void EnableActiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority);
+       void EnablePassiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority);
+       void MessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message);
 
 };
 
index 13fccb178f71580d529997fb5bf1947bc63c9189..28be48fbf93c092f81ee1cd5f7239b8f44e2cb78 100644 (file)
@@ -27,6 +27,7 @@ icinga2_LDADD = \
        ${top_builddir}/lib/config/libconfig.la \
        -dlopen ${top_builddir}/lib/icinga/libicinga.la \
        -dlopen ${top_builddir}/components/checker/libchecker.la \
+       -dlopen ${top_builddir}/components/cluster/libcluster.la \
        -dlopen ${top_builddir}/components/compat/libcompat.la \
        -dlopen ${top_builddir}/components/demo/libdemo.la \
        -dlopen ${top_builddir}/components/livestatus/liblivestatus.la \
index e5bd63295ca984890fde875b94e007a83493e39b..8e67483fae67b241958fceec110a0eea3a803c1c 100644 (file)
@@ -243,6 +243,7 @@ void Host::UpdateSlaveServices(void)
 
                ConfigItem::Ptr serviceItem = builder->Compile();
                DynamicObject::Ptr dobj = serviceItem->Commit();
+               dobj->Start();
        }
 }
 
index 6c01126262df1738dc2c1d65cfd75cf3e557709a..00f51268b08376c62018352fb57832675fad2a38 100644 (file)
@@ -37,9 +37,12 @@ const int Service::DefaultMaxCheckAttempts = 3;
 const double Service::DefaultCheckInterval = 5 * 60;
 const double Service::CheckIntervalDivisor = 5.0;
 
-boost::signals2::signal<void (const Service::Ptr&, const Dictionary::Ptr&)> Service::OnNewCheckResult;
+boost::signals2::signal<void (const Service::Ptr&, const Dictionary::Ptr&, const String&)> Service::OnNewCheckResult;
 boost::signals2::signal<void (const Service::Ptr&, NotificationType, const Dictionary::Ptr&, const String&, const String&)> Service::OnNotificationsRequested;
-boost::signals2::signal<void (const Service::Ptr&)> Service::OnNextCheckChanged;
+boost::signals2::signal<void (const Service::Ptr&, double, const String&)> Service::OnNextCheckChanged;
+boost::signals2::signal<void (const Service::Ptr&, bool, const String&)> Service::OnForceNextCheckChanged;
+boost::signals2::signal<void (const Service::Ptr&, bool, const String&)> Service::OnEnableActiveChecksChanged;
+boost::signals2::signal<void (const Service::Ptr&, bool, const String&)> Service::OnEnablePassiveChecksChanged;
 boost::signals2::signal<void (const Service::Ptr&, FlappingState)> Service::OnFlappingChanged;
 
 CheckCommand::Ptr Service::GetCheckCommand(void) const
@@ -91,11 +94,11 @@ long Service::GetSchedulingOffset(void)
        return m_SchedulingOffset;
 }
 
-void Service::SetNextCheck(double nextCheck)
+void Service::SetNextCheck(double nextCheck, const String& authority)
 {
        m_NextCheck = nextCheck;
 
-       Utility::QueueAsyncCallback(bind(boost::ref(Service::OnNextCheckChanged), GetSelf()));
+       Utility::QueueAsyncCallback(bind(boost::ref(Service::OnNextCheckChanged), GetSelf(), nextCheck, authority));
 }
 
 double Service::GetNextCheck(void)
@@ -401,9 +404,11 @@ bool Service::GetEnableActiveChecks(void) const
                return m_EnableActiveChecks;
 }
 
-void Service::SetEnableActiveChecks(bool enabled)
+void Service::SetEnableActiveChecks(bool enabled, const String& authority)
 {
        m_EnableActiveChecks = enabled ? 1 : 0;
+
+       Utility::QueueAsyncCallback(bind(boost::ref(OnEnableActiveChecksChanged), GetSelf(), enabled, authority));
 }
 
 bool Service::GetEnablePassiveChecks(void) const
@@ -414,9 +419,11 @@ bool Service::GetEnablePassiveChecks(void) const
                return m_EnablePassiveChecks;
 }
 
-void Service::SetEnablePassiveChecks(bool enabled)
+void Service::SetEnablePassiveChecks(bool enabled, const String& authority)
 {
        m_EnablePassiveChecks = enabled ? 1 : 0;
+
+       Utility::QueueAsyncCallback(bind(boost::ref(OnEnablePassiveChecksChanged), GetSelf(), enabled, authority));
 }
 
 bool Service::GetForceNextCheck(void) const
@@ -427,12 +434,14 @@ bool Service::GetForceNextCheck(void) const
        return static_cast<bool>(m_ForceNextCheck);
 }
 
-void Service::SetForceNextCheck(bool forced)
+void Service::SetForceNextCheck(bool forced, const String& authority)
 {
        m_ForceNextCheck = forced ? 1 : 0;
+
+       Utility::QueueAsyncCallback(bind(boost::ref(OnForceNextCheckChanged), GetSelf(), forced, authority));
 }
 
-void Service::ProcessCheckResult(const Dictionary::Ptr& cr)
+void Service::ProcessCheckResult(const Dictionary::Ptr& cr, const String& authority)
 {
        double now = Utility::GetTime();
 
@@ -607,7 +616,7 @@ void Service::ProcessCheckResult(const Dictionary::Ptr& cr)
                        " threshold: " + Convert::ToString(GetFlappingThreshold()) +
                        "% current: " + Convert::ToString(GetFlappingCurrent()) + "%.");
 
-       OnNewCheckResult(GetSelf(), cr);
+       OnNewCheckResult(GetSelf(), cr, authority);
        OnStateChanged(GetSelf());
 
        if (call_eventhandler)
index d4f1e81a680ede6b41eb26f9416b94037e9b43c0..a2ada86b78a6c55cbc59ddccebeb6cf80161f217 100644 (file)
@@ -192,7 +192,8 @@ void Service::UpdateSlaveNotifications(void)
                        builder->AddExpressionList(nfc_exprl);
 
                        ConfigItem::Ptr notificationItem = builder->Compile();
-                       notificationItem->Commit();
+                       DynamicObject::Ptr dobj = notificationItem->Commit();
+                       dobj->Start();
                }
        }
 }
index b0347917d06008529d8281afa99fba6280240731..be7eab30d2eaf0c33e0367bdf8fa749b93ab66e8 100644 (file)
@@ -160,7 +160,7 @@ public:
        long GetSchedulingOffset(void);
        void SetSchedulingOffset(long offset);
 
-       void SetNextCheck(double nextCheck);
+       void SetNextCheck(double nextCheck, const String& authority = String());
        double GetNextCheck(void);
        void UpdateNextCheck(void);
 
@@ -218,13 +218,13 @@ public:
        bool GetLastReachable(void) const;
 
        bool GetEnableActiveChecks(void) const;
-       void SetEnableActiveChecks(bool enabled);
+       void SetEnableActiveChecks(bool enabled, const String& authority = String());
 
        bool GetEnablePassiveChecks(void) const;
-       void SetEnablePassiveChecks(bool enabled);
+       void SetEnablePassiveChecks(bool enabled, const String& authority = String());
 
        bool GetForceNextCheck(void) const;
-       void SetForceNextCheck(bool forced);
+       void SetForceNextCheck(bool forced, const String& authority = String());
 
        double GetAcknowledgementExpiry(void) const;
        void SetAcknowledgementExpiry(double timestamp);
@@ -235,7 +235,7 @@ public:
        void ClearAcknowledgement(void);
 
        void ExecuteCheck(void);
-       void ProcessCheckResult(const Dictionary::Ptr& cr);
+       void ProcessCheckResult(const Dictionary::Ptr& cr, const String& authority = String());
 
        static double CalculateExecutionTime(const Dictionary::Ptr& cr);
        static double CalculateLatency(const Dictionary::Ptr& cr);
@@ -246,8 +246,11 @@ public:
        static StateType StateTypeFromString(const String& state);
        static String StateTypeToString(StateType state);
 
-       static boost::signals2::signal<void (const Service::Ptr&)> OnNextCheckChanged;
-       static boost::signals2::signal<void (const Service::Ptr&, const Dictionary::Ptr&)> OnNewCheckResult;
+       static boost::signals2::signal<void (const Service::Ptr&, double, const String&)> OnNextCheckChanged;
+       static boost::signals2::signal<void (const Service::Ptr&, bool, const String&)> OnForceNextCheckChanged;
+       static boost::signals2::signal<void (const Service::Ptr&, bool, const String&)> OnEnableActiveChecksChanged;
+       static boost::signals2::signal<void (const Service::Ptr&, bool, const String&)> OnEnablePassiveChecksChanged;
+       static boost::signals2::signal<void (const Service::Ptr&, const Dictionary::Ptr&, const String&)> OnNewCheckResult;
        static boost::signals2::signal<void (const Service::Ptr&, NotificationType, const Dictionary::Ptr&, const String&, const String&)> OnNotificationsRequested;
        static boost::signals2::signal<void (const Service::Ptr&, const User::Ptr&, const NotificationType&, const Dictionary::Ptr&, const String&, const String&)> OnNotificationSentChanged;
        static boost::signals2::signal<void (const Service::Ptr&, DowntimeState)> OnDowntimeChanged;