From 0d24b941f5a4d9dc22d807863d2524ab28158f87 Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Wed, 28 Aug 2013 11:12:20 +0200 Subject: [PATCH] Implement some cluster events. --- components/cluster/clustercomponent.cpp | 149 +++++++++++++++++++++--- components/cluster/clustercomponent.h | 8 +- icinga-app/Makefile.am | 1 + lib/icinga/host.cpp | 1 + lib/icinga/service-check.cpp | 27 +++-- lib/icinga/service-notification.cpp | 3 +- lib/icinga/service.h | 17 +-- 7 files changed, 174 insertions(+), 32 deletions(-) diff --git a/components/cluster/clustercomponent.cpp b/components/cluster/clustercomponent.cpp index 790e9d340..bcb2572eb 100644 --- a/components/cluster/clustercomponent.cpp +++ b/components/cluster/clustercomponent.cpp @@ -52,7 +52,12 @@ void ClusterComponent::Start(void) m_ReconnectTimer->SetInterval(5); m_ReconnectTimer->Start(); - Service::OnNewCheckResult.connect(bind(&ClusterComponent::CheckResultHandler, this, _1, _2)); + Service::OnNewCheckResult.connect(bind(&ClusterComponent::CheckResultHandler, this, _1, _2, _3)); + Service::OnNextCheckChanged.connect(bind(&ClusterComponent::NextCheckChangedHandler, this, _1, _2, _3)); + Service::OnForceNextCheckChanged.connect(bind(&ClusterComponent::ForceNextCheckChangedHandler, this, _1, _2, _3)); + Service::OnEnableActiveChecksChanged.connect(bind(&ClusterComponent::EnableActiveChecksChangedHandler, this, _1, _2, _3)); + Service::OnEnablePassiveChecksChanged.connect(bind(&ClusterComponent::EnablePassiveChecksChangedHandler, this, _1, _2, _3)); + Endpoint::OnMessageReceived.connect(bind(&ClusterComponent::MessageHandler, this, _1, _2)); } @@ -244,13 +249,11 @@ void ClusterComponent::ReconnectTimerHandler(void) } } -void ClusterComponent::CheckResultHandler(const Service::Ptr& service, const Dictionary::Ptr& cr) +void ClusterComponent::CheckResultHandler(const Service::Ptr& service, const Dictionary::Ptr& cr, const String& authority) { - if (cr->Contains("source") && cr->Get("source") != GetIdentity()) + if (!authority.IsEmpty() && authority != GetIdentity()) return; - cr->Set("source", GetIdentity()); - Dictionary::Ptr params = boost::make_shared(); params->Set("service", service->GetName()); params->Set("check_result", cr); @@ -265,12 +268,100 @@ void ClusterComponent::CheckResultHandler(const Service::Ptr& service, const Dic } } -void ClusterComponent::MessageHandler(const Endpoint::Ptr& endpoint, const Dictionary::Ptr& message) +void ClusterComponent::NextCheckChangedHandler(const Service::Ptr& service, double nextCheck, const String& authority) +{ + if (!authority.IsEmpty() && authority != GetIdentity()) + return; + + Dictionary::Ptr params = boost::make_shared(); + params->Set("service", service->GetName()); + params->Set("next_check", nextCheck); + + Dictionary::Ptr message = boost::make_shared(); + message->Set("jsonrpc", "2.0"); + message->Set("method", "cluster::SetNextCheck"); + message->Set("params", params); + + BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { + endpoint->SendMessage(message); + } +} + +void ClusterComponent::ForceNextCheckChangedHandler(const Service::Ptr& service, bool forced, const String& authority) +{ + if (!authority.IsEmpty() && authority != GetIdentity()) + return; + + Dictionary::Ptr params = boost::make_shared(); + params->Set("service", service->GetName()); + params->Set("forced", forced); + + Dictionary::Ptr message = boost::make_shared(); + message->Set("jsonrpc", "2.0"); + message->Set("method", "cluster::SetForceNextCheck"); + message->Set("params", params); + + BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { + endpoint->SendMessage(message); + } +} + +void ClusterComponent::EnableActiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority) +{ + if (!authority.IsEmpty() && authority != GetIdentity()) + return; + + Dictionary::Ptr params = boost::make_shared(); + params->Set("service", service->GetName()); + params->Set("enabled", enabled); + + Dictionary::Ptr message = boost::make_shared(); + message->Set("jsonrpc", "2.0"); + message->Set("method", "cluster::SetEnableActiveChecks"); + message->Set("params", params); + + BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { + endpoint->SendMessage(message); + } +} + +void ClusterComponent::EnablePassiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority) +{ + if (!authority.IsEmpty() && authority != GetIdentity()) + return; + + Dictionary::Ptr params = boost::make_shared(); + params->Set("service", service->GetName()); + params->Set("enabled", enabled); + + Dictionary::Ptr message = boost::make_shared(); + message->Set("jsonrpc", "2.0"); + message->Set("method", "cluster::SetEnablePassiveChecks"); + message->Set("params", params); + + BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { + endpoint->SendMessage(message); + } +} + +void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message) { + BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { + if (sender != endpoint) + endpoint->SendMessage(message); + } + + Dictionary::Ptr params = message->Get("params"); + + if (!params) + return; + if (message->Get("method") == "cluster::CheckResult") { - Dictionary::Ptr params = message->Get("params"); + String svc = params->Get("service"); - if (!params) + Service::Ptr service = Service::GetByName(svc); + + if (!service) return; Dictionary::Ptr cr = params->Get("check_result"); @@ -278,6 +369,41 @@ void ClusterComponent::MessageHandler(const Endpoint::Ptr& endpoint, const Dicti if (!cr) return; + service->ProcessCheckResult(cr, sender->GetName()); + } else if (message->Get("method") == "cluster::SetNextCheck") { + String svc = params->Get("service"); + + Service::Ptr service = Service::GetByName(svc); + + if (!service) + return; + + double nextCheck = params->Get("next_check"); + + service->SetNextCheck(nextCheck, sender->GetName()); + } else if (message->Get("method") == "cluster::SetForceNextCheck") { + String svc = params->Get("service"); + + Service::Ptr service = Service::GetByName(svc); + + if (!service) + return; + + bool forced = params->Get("forced"); + + service->SetForceNextCheck(forced, sender->GetName()); + } else if (message->Get("method") == "cluster::SetEnableActiveChecks") { + String svc = params->Get("service"); + + Service::Ptr service = Service::GetByName(svc); + + if (!service) + return; + + bool enabled = params->Get("enabled"); + + service->SetEnableActiveChecks(enabled, sender->GetName()); + } else if (message->Get("method") == "cluster::SetEnablePassiveChecks") { String svc = params->Get("service"); Service::Ptr service = Service::GetByName(svc); @@ -285,12 +411,9 @@ void ClusterComponent::MessageHandler(const Endpoint::Ptr& endpoint, const Dicti if (!service) return; - service->ProcessCheckResult(cr); + bool enabled = params->Get("enabled"); - /* Reschedule the next check. The side effect of this is that for as long - * as we receive results for a service we won't execute any - * active checks. */ - service->SetNextCheck(Utility::GetTime() + service->GetCheckInterval()); + service->SetEnablePassiveChecks(enabled, sender->GetName()); } } diff --git a/components/cluster/clustercomponent.h b/components/cluster/clustercomponent.h index 06b752c97..58533ce75 100644 --- a/components/cluster/clustercomponent.h +++ b/components/cluster/clustercomponent.h @@ -79,8 +79,12 @@ private: void NewClientHandler(const Socket::Ptr& client, TlsRole role); void ListenerThreadProc(const Socket::Ptr& server); - void CheckResultHandler(const Service::Ptr& service, const Dictionary::Ptr& cr); - void MessageHandler(const Endpoint::Ptr& endpoint, const Dictionary::Ptr& message); + void CheckResultHandler(const Service::Ptr& service, const Dictionary::Ptr& cr, const String& authority); + void NextCheckChangedHandler(const Service::Ptr& service, double nextCheck, const String& authority); + void ForceNextCheckChangedHandler(const Service::Ptr& service, bool forced, const String& authority); + void EnableActiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority); + void EnablePassiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority); + void MessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message); }; diff --git a/icinga-app/Makefile.am b/icinga-app/Makefile.am index 13fccb178..28be48fbf 100644 --- a/icinga-app/Makefile.am +++ b/icinga-app/Makefile.am @@ -27,6 +27,7 @@ icinga2_LDADD = \ ${top_builddir}/lib/config/libconfig.la \ -dlopen ${top_builddir}/lib/icinga/libicinga.la \ -dlopen ${top_builddir}/components/checker/libchecker.la \ + -dlopen ${top_builddir}/components/cluster/libcluster.la \ -dlopen ${top_builddir}/components/compat/libcompat.la \ -dlopen ${top_builddir}/components/demo/libdemo.la \ -dlopen ${top_builddir}/components/livestatus/liblivestatus.la \ diff --git a/lib/icinga/host.cpp b/lib/icinga/host.cpp index e5bd63295..8e67483fa 100644 --- a/lib/icinga/host.cpp +++ b/lib/icinga/host.cpp @@ -243,6 +243,7 @@ void Host::UpdateSlaveServices(void) ConfigItem::Ptr serviceItem = builder->Compile(); DynamicObject::Ptr dobj = serviceItem->Commit(); + dobj->Start(); } } diff --git a/lib/icinga/service-check.cpp b/lib/icinga/service-check.cpp index 6c0112626..00f51268b 100644 --- a/lib/icinga/service-check.cpp +++ b/lib/icinga/service-check.cpp @@ -37,9 +37,12 @@ const int Service::DefaultMaxCheckAttempts = 3; const double Service::DefaultCheckInterval = 5 * 60; const double Service::CheckIntervalDivisor = 5.0; -boost::signals2::signal Service::OnNewCheckResult; +boost::signals2::signal Service::OnNewCheckResult; boost::signals2::signal Service::OnNotificationsRequested; -boost::signals2::signal Service::OnNextCheckChanged; +boost::signals2::signal Service::OnNextCheckChanged; +boost::signals2::signal Service::OnForceNextCheckChanged; +boost::signals2::signal Service::OnEnableActiveChecksChanged; +boost::signals2::signal Service::OnEnablePassiveChecksChanged; boost::signals2::signal Service::OnFlappingChanged; CheckCommand::Ptr Service::GetCheckCommand(void) const @@ -91,11 +94,11 @@ long Service::GetSchedulingOffset(void) return m_SchedulingOffset; } -void Service::SetNextCheck(double nextCheck) +void Service::SetNextCheck(double nextCheck, const String& authority) { m_NextCheck = nextCheck; - Utility::QueueAsyncCallback(bind(boost::ref(Service::OnNextCheckChanged), GetSelf())); + Utility::QueueAsyncCallback(bind(boost::ref(Service::OnNextCheckChanged), GetSelf(), nextCheck, authority)); } double Service::GetNextCheck(void) @@ -401,9 +404,11 @@ bool Service::GetEnableActiveChecks(void) const return m_EnableActiveChecks; } -void Service::SetEnableActiveChecks(bool enabled) +void Service::SetEnableActiveChecks(bool enabled, const String& authority) { m_EnableActiveChecks = enabled ? 1 : 0; + + Utility::QueueAsyncCallback(bind(boost::ref(OnEnableActiveChecksChanged), GetSelf(), enabled, authority)); } bool Service::GetEnablePassiveChecks(void) const @@ -414,9 +419,11 @@ bool Service::GetEnablePassiveChecks(void) const return m_EnablePassiveChecks; } -void Service::SetEnablePassiveChecks(bool enabled) +void Service::SetEnablePassiveChecks(bool enabled, const String& authority) { m_EnablePassiveChecks = enabled ? 1 : 0; + + Utility::QueueAsyncCallback(bind(boost::ref(OnEnablePassiveChecksChanged), GetSelf(), enabled, authority)); } bool Service::GetForceNextCheck(void) const @@ -427,12 +434,14 @@ bool Service::GetForceNextCheck(void) const return static_cast(m_ForceNextCheck); } -void Service::SetForceNextCheck(bool forced) +void Service::SetForceNextCheck(bool forced, const String& authority) { m_ForceNextCheck = forced ? 1 : 0; + + Utility::QueueAsyncCallback(bind(boost::ref(OnForceNextCheckChanged), GetSelf(), forced, authority)); } -void Service::ProcessCheckResult(const Dictionary::Ptr& cr) +void Service::ProcessCheckResult(const Dictionary::Ptr& cr, const String& authority) { double now = Utility::GetTime(); @@ -607,7 +616,7 @@ void Service::ProcessCheckResult(const Dictionary::Ptr& cr) " threshold: " + Convert::ToString(GetFlappingThreshold()) + "% current: " + Convert::ToString(GetFlappingCurrent()) + "%."); - OnNewCheckResult(GetSelf(), cr); + OnNewCheckResult(GetSelf(), cr, authority); OnStateChanged(GetSelf()); if (call_eventhandler) diff --git a/lib/icinga/service-notification.cpp b/lib/icinga/service-notification.cpp index d4f1e81a6..a2ada86b7 100644 --- a/lib/icinga/service-notification.cpp +++ b/lib/icinga/service-notification.cpp @@ -192,7 +192,8 @@ void Service::UpdateSlaveNotifications(void) builder->AddExpressionList(nfc_exprl); ConfigItem::Ptr notificationItem = builder->Compile(); - notificationItem->Commit(); + DynamicObject::Ptr dobj = notificationItem->Commit(); + dobj->Start(); } } } diff --git a/lib/icinga/service.h b/lib/icinga/service.h index b0347917d..be7eab30d 100644 --- a/lib/icinga/service.h +++ b/lib/icinga/service.h @@ -160,7 +160,7 @@ public: long GetSchedulingOffset(void); void SetSchedulingOffset(long offset); - void SetNextCheck(double nextCheck); + void SetNextCheck(double nextCheck, const String& authority = String()); double GetNextCheck(void); void UpdateNextCheck(void); @@ -218,13 +218,13 @@ public: bool GetLastReachable(void) const; bool GetEnableActiveChecks(void) const; - void SetEnableActiveChecks(bool enabled); + void SetEnableActiveChecks(bool enabled, const String& authority = String()); bool GetEnablePassiveChecks(void) const; - void SetEnablePassiveChecks(bool enabled); + void SetEnablePassiveChecks(bool enabled, const String& authority = String()); bool GetForceNextCheck(void) const; - void SetForceNextCheck(bool forced); + void SetForceNextCheck(bool forced, const String& authority = String()); double GetAcknowledgementExpiry(void) const; void SetAcknowledgementExpiry(double timestamp); @@ -235,7 +235,7 @@ public: void ClearAcknowledgement(void); void ExecuteCheck(void); - void ProcessCheckResult(const Dictionary::Ptr& cr); + void ProcessCheckResult(const Dictionary::Ptr& cr, const String& authority = String()); static double CalculateExecutionTime(const Dictionary::Ptr& cr); static double CalculateLatency(const Dictionary::Ptr& cr); @@ -246,8 +246,11 @@ public: static StateType StateTypeFromString(const String& state); static String StateTypeToString(StateType state); - static boost::signals2::signal OnNextCheckChanged; - static boost::signals2::signal OnNewCheckResult; + static boost::signals2::signal OnNextCheckChanged; + static boost::signals2::signal OnForceNextCheckChanged; + static boost::signals2::signal OnEnableActiveChecksChanged; + static boost::signals2::signal OnEnablePassiveChecksChanged; + static boost::signals2::signal OnNewCheckResult; static boost::signals2::signal OnNotificationsRequested; static boost::signals2::signal OnNotificationSentChanged; static boost::signals2::signal OnDowntimeChanged; -- 2.40.0