m_ReconnectTimer->SetInterval(5);
m_ReconnectTimer->Start();
- Service::OnNewCheckResult.connect(bind(&ClusterComponent::CheckResultHandler, this, _1, _2));
+ Service::OnNewCheckResult.connect(bind(&ClusterComponent::CheckResultHandler, this, _1, _2, _3));
+ Service::OnNextCheckChanged.connect(bind(&ClusterComponent::NextCheckChangedHandler, this, _1, _2, _3));
+ Service::OnForceNextCheckChanged.connect(bind(&ClusterComponent::ForceNextCheckChangedHandler, this, _1, _2, _3));
+ Service::OnEnableActiveChecksChanged.connect(bind(&ClusterComponent::EnableActiveChecksChangedHandler, this, _1, _2, _3));
+ Service::OnEnablePassiveChecksChanged.connect(bind(&ClusterComponent::EnablePassiveChecksChangedHandler, this, _1, _2, _3));
+
Endpoint::OnMessageReceived.connect(bind(&ClusterComponent::MessageHandler, this, _1, _2));
}
}
}
-void ClusterComponent::CheckResultHandler(const Service::Ptr& service, const Dictionary::Ptr& cr)
+void ClusterComponent::CheckResultHandler(const Service::Ptr& service, const Dictionary::Ptr& cr, const String& authority)
{
- if (cr->Contains("source") && cr->Get("source") != GetIdentity())
+ if (!authority.IsEmpty() && authority != GetIdentity())
return;
- cr->Set("source", GetIdentity());
-
Dictionary::Ptr params = boost::make_shared<Dictionary>();
params->Set("service", service->GetName());
params->Set("check_result", cr);
}
}
-void ClusterComponent::MessageHandler(const Endpoint::Ptr& endpoint, const Dictionary::Ptr& message)
+void ClusterComponent::NextCheckChangedHandler(const Service::Ptr& service, double nextCheck, const String& authority)
+{
+ if (!authority.IsEmpty() && authority != GetIdentity())
+ return;
+
+ Dictionary::Ptr params = boost::make_shared<Dictionary>();
+ params->Set("service", service->GetName());
+ params->Set("next_check", nextCheck);
+
+ Dictionary::Ptr message = boost::make_shared<Dictionary>();
+ message->Set("jsonrpc", "2.0");
+ message->Set("method", "cluster::SetNextCheck");
+ message->Set("params", params);
+
+ BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
+ endpoint->SendMessage(message);
+ }
+}
+
+void ClusterComponent::ForceNextCheckChangedHandler(const Service::Ptr& service, bool forced, const String& authority)
+{
+ if (!authority.IsEmpty() && authority != GetIdentity())
+ return;
+
+ Dictionary::Ptr params = boost::make_shared<Dictionary>();
+ params->Set("service", service->GetName());
+ params->Set("forced", forced);
+
+ Dictionary::Ptr message = boost::make_shared<Dictionary>();
+ message->Set("jsonrpc", "2.0");
+ message->Set("method", "cluster::SetForceNextCheck");
+ message->Set("params", params);
+
+ BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
+ endpoint->SendMessage(message);
+ }
+}
+
+void ClusterComponent::EnableActiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority)
+{
+ if (!authority.IsEmpty() && authority != GetIdentity())
+ return;
+
+ Dictionary::Ptr params = boost::make_shared<Dictionary>();
+ params->Set("service", service->GetName());
+ params->Set("enabled", enabled);
+
+ Dictionary::Ptr message = boost::make_shared<Dictionary>();
+ message->Set("jsonrpc", "2.0");
+ message->Set("method", "cluster::SetEnableActiveChecks");
+ message->Set("params", params);
+
+ BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
+ endpoint->SendMessage(message);
+ }
+}
+
+void ClusterComponent::EnablePassiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority)
+{
+ if (!authority.IsEmpty() && authority != GetIdentity())
+ return;
+
+ Dictionary::Ptr params = boost::make_shared<Dictionary>();
+ params->Set("service", service->GetName());
+ params->Set("enabled", enabled);
+
+ Dictionary::Ptr message = boost::make_shared<Dictionary>();
+ message->Set("jsonrpc", "2.0");
+ message->Set("method", "cluster::SetEnablePassiveChecks");
+ message->Set("params", params);
+
+ BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
+ endpoint->SendMessage(message);
+ }
+}
+
+void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message)
{
+ BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
+ if (sender != endpoint)
+ endpoint->SendMessage(message);
+ }
+
+ Dictionary::Ptr params = message->Get("params");
+
+ if (!params)
+ return;
+
if (message->Get("method") == "cluster::CheckResult") {
- Dictionary::Ptr params = message->Get("params");
+ String svc = params->Get("service");
- if (!params)
+ Service::Ptr service = Service::GetByName(svc);
+
+ if (!service)
return;
Dictionary::Ptr cr = params->Get("check_result");
if (!cr)
return;
+ service->ProcessCheckResult(cr, sender->GetName());
+ } else if (message->Get("method") == "cluster::SetNextCheck") {
+ String svc = params->Get("service");
+
+ Service::Ptr service = Service::GetByName(svc);
+
+ if (!service)
+ return;
+
+ double nextCheck = params->Get("next_check");
+
+ service->SetNextCheck(nextCheck, sender->GetName());
+ } else if (message->Get("method") == "cluster::SetForceNextCheck") {
+ String svc = params->Get("service");
+
+ Service::Ptr service = Service::GetByName(svc);
+
+ if (!service)
+ return;
+
+ bool forced = params->Get("forced");
+
+ service->SetForceNextCheck(forced, sender->GetName());
+ } else if (message->Get("method") == "cluster::SetEnableActiveChecks") {
+ String svc = params->Get("service");
+
+ Service::Ptr service = Service::GetByName(svc);
+
+ if (!service)
+ return;
+
+ bool enabled = params->Get("enabled");
+
+ service->SetEnableActiveChecks(enabled, sender->GetName());
+ } else if (message->Get("method") == "cluster::SetEnablePassiveChecks") {
String svc = params->Get("service");
Service::Ptr service = Service::GetByName(svc);
if (!service)
return;
- service->ProcessCheckResult(cr);
+ bool enabled = params->Get("enabled");
- /* Reschedule the next check. The side effect of this is that for as long
- * as we receive results for a service we won't execute any
- * active checks. */
- service->SetNextCheck(Utility::GetTime() + service->GetCheckInterval());
+ service->SetEnablePassiveChecks(enabled, sender->GetName());
}
}
void NewClientHandler(const Socket::Ptr& client, TlsRole role);
void ListenerThreadProc(const Socket::Ptr& server);
- void CheckResultHandler(const Service::Ptr& service, const Dictionary::Ptr& cr);
- void MessageHandler(const Endpoint::Ptr& endpoint, const Dictionary::Ptr& message);
+ void CheckResultHandler(const Service::Ptr& service, const Dictionary::Ptr& cr, const String& authority);
+ void NextCheckChangedHandler(const Service::Ptr& service, double nextCheck, const String& authority);
+ void ForceNextCheckChangedHandler(const Service::Ptr& service, bool forced, const String& authority);
+ void EnableActiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority);
+ void EnablePassiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority);
+ void MessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message);
};
${top_builddir}/lib/config/libconfig.la \
-dlopen ${top_builddir}/lib/icinga/libicinga.la \
-dlopen ${top_builddir}/components/checker/libchecker.la \
+ -dlopen ${top_builddir}/components/cluster/libcluster.la \
-dlopen ${top_builddir}/components/compat/libcompat.la \
-dlopen ${top_builddir}/components/demo/libdemo.la \
-dlopen ${top_builddir}/components/livestatus/liblivestatus.la \
ConfigItem::Ptr serviceItem = builder->Compile();
DynamicObject::Ptr dobj = serviceItem->Commit();
+ dobj->Start();
}
}
const double Service::DefaultCheckInterval = 5 * 60;
const double Service::CheckIntervalDivisor = 5.0;
-boost::signals2::signal<void (const Service::Ptr&, const Dictionary::Ptr&)> Service::OnNewCheckResult;
+boost::signals2::signal<void (const Service::Ptr&, const Dictionary::Ptr&, const String&)> Service::OnNewCheckResult;
boost::signals2::signal<void (const Service::Ptr&, NotificationType, const Dictionary::Ptr&, const String&, const String&)> Service::OnNotificationsRequested;
-boost::signals2::signal<void (const Service::Ptr&)> Service::OnNextCheckChanged;
+boost::signals2::signal<void (const Service::Ptr&, double, const String&)> Service::OnNextCheckChanged;
+boost::signals2::signal<void (const Service::Ptr&, bool, const String&)> Service::OnForceNextCheckChanged;
+boost::signals2::signal<void (const Service::Ptr&, bool, const String&)> Service::OnEnableActiveChecksChanged;
+boost::signals2::signal<void (const Service::Ptr&, bool, const String&)> Service::OnEnablePassiveChecksChanged;
boost::signals2::signal<void (const Service::Ptr&, FlappingState)> Service::OnFlappingChanged;
CheckCommand::Ptr Service::GetCheckCommand(void) const
return m_SchedulingOffset;
}
-void Service::SetNextCheck(double nextCheck)
+void Service::SetNextCheck(double nextCheck, const String& authority)
{
m_NextCheck = nextCheck;
- Utility::QueueAsyncCallback(bind(boost::ref(Service::OnNextCheckChanged), GetSelf()));
+ Utility::QueueAsyncCallback(bind(boost::ref(Service::OnNextCheckChanged), GetSelf(), nextCheck, authority));
}
double Service::GetNextCheck(void)
return m_EnableActiveChecks;
}
-void Service::SetEnableActiveChecks(bool enabled)
+void Service::SetEnableActiveChecks(bool enabled, const String& authority)
{
m_EnableActiveChecks = enabled ? 1 : 0;
+
+ Utility::QueueAsyncCallback(bind(boost::ref(OnEnableActiveChecksChanged), GetSelf(), enabled, authority));
}
bool Service::GetEnablePassiveChecks(void) const
return m_EnablePassiveChecks;
}
-void Service::SetEnablePassiveChecks(bool enabled)
+void Service::SetEnablePassiveChecks(bool enabled, const String& authority)
{
m_EnablePassiveChecks = enabled ? 1 : 0;
+
+ Utility::QueueAsyncCallback(bind(boost::ref(OnEnablePassiveChecksChanged), GetSelf(), enabled, authority));
}
bool Service::GetForceNextCheck(void) const
return static_cast<bool>(m_ForceNextCheck);
}
-void Service::SetForceNextCheck(bool forced)
+void Service::SetForceNextCheck(bool forced, const String& authority)
{
m_ForceNextCheck = forced ? 1 : 0;
+
+ Utility::QueueAsyncCallback(bind(boost::ref(OnForceNextCheckChanged), GetSelf(), forced, authority));
}
-void Service::ProcessCheckResult(const Dictionary::Ptr& cr)
+void Service::ProcessCheckResult(const Dictionary::Ptr& cr, const String& authority)
{
double now = Utility::GetTime();
" threshold: " + Convert::ToString(GetFlappingThreshold()) +
"% current: " + Convert::ToString(GetFlappingCurrent()) + "%.");
- OnNewCheckResult(GetSelf(), cr);
+ OnNewCheckResult(GetSelf(), cr, authority);
OnStateChanged(GetSelf());
if (call_eventhandler)
builder->AddExpressionList(nfc_exprl);
ConfigItem::Ptr notificationItem = builder->Compile();
- notificationItem->Commit();
+ DynamicObject::Ptr dobj = notificationItem->Commit();
+ dobj->Start();
}
}
}
long GetSchedulingOffset(void);
void SetSchedulingOffset(long offset);
- void SetNextCheck(double nextCheck);
+ void SetNextCheck(double nextCheck, const String& authority = String());
double GetNextCheck(void);
void UpdateNextCheck(void);
bool GetLastReachable(void) const;
bool GetEnableActiveChecks(void) const;
- void SetEnableActiveChecks(bool enabled);
+ void SetEnableActiveChecks(bool enabled, const String& authority = String());
bool GetEnablePassiveChecks(void) const;
- void SetEnablePassiveChecks(bool enabled);
+ void SetEnablePassiveChecks(bool enabled, const String& authority = String());
bool GetForceNextCheck(void) const;
- void SetForceNextCheck(bool forced);
+ void SetForceNextCheck(bool forced, const String& authority = String());
double GetAcknowledgementExpiry(void) const;
void SetAcknowledgementExpiry(double timestamp);
void ClearAcknowledgement(void);
void ExecuteCheck(void);
- void ProcessCheckResult(const Dictionary::Ptr& cr);
+ void ProcessCheckResult(const Dictionary::Ptr& cr, const String& authority = String());
static double CalculateExecutionTime(const Dictionary::Ptr& cr);
static double CalculateLatency(const Dictionary::Ptr& cr);
static StateType StateTypeFromString(const String& state);
static String StateTypeToString(StateType state);
- static boost::signals2::signal<void (const Service::Ptr&)> OnNextCheckChanged;
- static boost::signals2::signal<void (const Service::Ptr&, const Dictionary::Ptr&)> OnNewCheckResult;
+ static boost::signals2::signal<void (const Service::Ptr&, double, const String&)> OnNextCheckChanged;
+ static boost::signals2::signal<void (const Service::Ptr&, bool, const String&)> OnForceNextCheckChanged;
+ static boost::signals2::signal<void (const Service::Ptr&, bool, const String&)> OnEnableActiveChecksChanged;
+ static boost::signals2::signal<void (const Service::Ptr&, bool, const String&)> OnEnablePassiveChecksChanged;
+ static boost::signals2::signal<void (const Service::Ptr&, const Dictionary::Ptr&, const String&)> OnNewCheckResult;
static boost::signals2::signal<void (const Service::Ptr&, NotificationType, const Dictionary::Ptr&, const String&, const String&)> OnNotificationsRequested;
static boost::signals2::signal<void (const Service::Ptr&, const User::Ptr&, const NotificationType&, const Dictionary::Ptr&, const String&, const String&)> OnNotificationSentChanged;
static boost::signals2::signal<void (const Service::Ptr&, DowntimeState)> OnDowntimeChanged;