From: Gunnar Beutner Date: Fri, 3 Aug 2012 21:03:58 +0000 (+0200) Subject: Updated the checker and delegation components to use replication for the "checker... X-Git-Tag: v0.0.1~169 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=22dabfc60d825000d2ca6921fe3f806679db1aee;p=icinga2 Updated the checker and delegation components to use replication for the "checker" property. --- diff --git a/base/application.cpp b/base/application.cpp index 496a719a2..0d0cd1a9b 100644 --- a/base/application.cpp +++ b/base/application.cpp @@ -82,6 +82,8 @@ Application::Ptr Application::GetInstance(void) */ void Application::RunEventLoop(void) { + double nextProfile = 0; + while (!m_ShuttingDown) { Object::ClearHeldObjects(); @@ -96,11 +98,15 @@ void Application::RunEventLoop(void) DynamicObject::BeginTx(); #ifdef _DEBUG - stringstream msgbuf; - msgbuf << "Active objects: " << Object::GetAliveObjects(); - Logger::Write(LogInformation, "base", msgbuf.str()); + if (nextProfile < Utility::GetTime()) { + stringstream msgbuf; + msgbuf << "Active objects: " << Object::GetAliveObjects(); + Logger::Write(LogInformation, "base", msgbuf.str()); + + Object::PrintMemoryProfile(); - Object::PrintMemoryProfile(); + nextProfile = Utility::GetTime() + 15.0; + } #endif /* _DEBUG */ } } diff --git a/base/dynamicobject.cpp b/base/dynamicobject.cpp index 95f66f6fd..da9ec8468 100644 --- a/base/dynamicobject.cpp +++ b/base/dynamicobject.cpp @@ -25,7 +25,6 @@ map, Dictionary::Ptr> DynamicObject::m_PersistentUpdates; double DynamicObject::m_CurrentTx = 0; set DynamicObject::m_ModifiedObjects; -boost::signal DynamicObject::OnAttributeChanged; boost::signal DynamicObject::OnRegistered; boost::signal DynamicObject::OnUnregistered; boost::signal&)> DynamicObject::OnTransactionClosing; @@ -43,18 +42,10 @@ DynamicObject::DynamicObject(const Dictionary::Ptr& serializedObject) if (!serializedObject->Contains("configTx")) throw invalid_argument("Serialized object must contain a config snapshot."); - /* apply state from the config item/remote update */ - ApplyUpdate(serializedObject, true); - - /* restore the object's persistent state */ - map, Dictionary::Ptr>::iterator it; - it = m_PersistentUpdates.find(make_pair(GetType(), GetName())); - if (it != m_PersistentUpdates.end()) { - Logger::Write(LogDebug, "base", "Restoring persistent state " - "for object " + GetType() + ":" + GetName()); - ApplyUpdate(it->second, true); - m_PersistentUpdates.erase(it); - } + /* apply config state from the config item/remote update; + * The DynamicObject::Create function takes care of restoring + * non-config state after the object has been fully constructed */ + InternalApplyUpdate(serializedObject, Attribute_Config, true); } Dictionary::Ptr DynamicObject::BuildUpdate(double sinceTx, int attributeTypes) const @@ -95,10 +86,15 @@ Dictionary::Ptr DynamicObject::BuildUpdate(double sinceTx, int attributeTypes) c return update; } -void DynamicObject::ApplyUpdate(const Dictionary::Ptr& serializedUpdate, bool suppressEvents) +void DynamicObject::ApplyUpdate(const Dictionary::Ptr& serializedUpdate, int allowedTypes) +{ + InternalApplyUpdate(serializedUpdate, allowedTypes, false); +} + +void DynamicObject::InternalApplyUpdate(const Dictionary::Ptr& serializedUpdate, int allowedTypes, bool suppressEvents) { double configTx = 0; - if (serializedUpdate->Contains("configTx")) { + if ((allowedTypes & Attribute_Config) != 0 && serializedUpdate->Contains("configTx")) { configTx = serializedUpdate->Get("configTx"); if (configTx > m_ConfigTx) @@ -114,8 +110,12 @@ void DynamicObject::ApplyUpdate(const Dictionary::Ptr& serializedUpdate, bool su Dictionary::Ptr attr = it->second; - Value data = attr->Get("data"); int type = attr->Get("type"); + + if ((type & ~allowedTypes) != 0) + continue; + + Value data = attr->Get("data"); double tx = attr->Get("tx"); if (type & Attribute_Config) @@ -128,33 +128,6 @@ void DynamicObject::ApplyUpdate(const Dictionary::Ptr& serializedUpdate, bool su } } -void DynamicObject::SanitizeUpdate(const Dictionary::Ptr& serializedUpdate, int allowedTypes) -{ - if ((allowedTypes & Attribute_Config) == 0) - serializedUpdate->Remove("configTx"); - - Dictionary::Ptr attrs = serializedUpdate->Get("attrs"); - - Dictionary::Iterator prev, it; - for (it = attrs->Begin(); it != attrs->End(); ) { - if (!it->second.IsObjectType()) - continue; - - Dictionary::Ptr attr = it->second; - - int type = attr->Get("type"); - - if (type == 0 || type & ~allowedTypes) { - prev = it; - it++; - attrs->Remove(prev); - continue; - } - - it++; - } -} - void DynamicObject::RegisterAttribute(const String& name, DynamicAttributeType type) { DynamicAttribute attr; @@ -188,7 +161,10 @@ void DynamicObject::InternalSetAttribute(const String& name, const Value& data, pair tt; tt = m_Attributes.insert(make_pair(name, attr)); + Value oldValue; + if (!tt.second && tx >= tt.first->second.Tx) { + oldValue = tt.first->second.Data; tt.first->second.Data = data; tt.first->second.Tx = tx; } @@ -198,7 +174,7 @@ void DynamicObject::InternalSetAttribute(const String& name, const Value& data, if (!suppressEvent) { m_ModifiedObjects.insert(GetSelf()); - DynamicObject::OnAttributeChanged(GetSelf(), name); + OnAttributeChanged(name, oldValue); } } @@ -494,13 +470,31 @@ void DynamicObject::RegisterClass(const String& type, DynamicObject::Factory fac DynamicObject::Ptr DynamicObject::Create(const String& type, const Dictionary::Ptr& properties) { - DynamicObject::ClassMap::iterator it; - it = GetClasses().find(type); + DynamicObject::ClassMap::iterator ct; + ct = GetClasses().find(type); + + DynamicObject::Ptr obj; + if (ct != GetClasses().end()) { + obj = ct->second(properties); + } else { + obj = boost::make_shared(properties); - if (it != GetClasses().end()) - return it->second(properties); - else - return boost::make_shared(properties); + Logger::Write(LogCritical, "base", "Creating generic DynamicObject for type '" + type + "'"); + } + + /* restore the object's persistent non-config attributes */ + map, Dictionary::Ptr>::iterator st; + st = m_PersistentUpdates.find(make_pair(obj->GetType(), obj->GetName())); + if (st != m_PersistentUpdates.end()) { + Logger::Write(LogDebug, "base", "Restoring persistent state " + "for object " + obj->GetType() + ":" + obj->GetName()); + obj->ApplyUpdate(st->second, Attribute_All & ~Attribute_Config); + + /* we're done with this update, remove it */ + m_PersistentUpdates.erase(st); + } + + return obj; } double DynamicObject::GetCurrentTx(void) @@ -522,3 +516,6 @@ void DynamicObject::FinishTx(void) m_CurrentTx = 0; } + +void DynamicObject::OnAttributeChanged(const String& name, const Value& oldValue) +{ } \ No newline at end of file diff --git a/base/dynamicobject.h b/base/dynamicobject.h index 5a663e1a3..ab3fb6f6a 100644 --- a/base/dynamicobject.h +++ b/base/dynamicobject.h @@ -38,6 +38,9 @@ enum DynamicAttributeType /* Attributes read from the config file are implicitly marked * as config attributes. */ Attribute_Config = 8, + + /* Combination of all attribute types */ + Attribute_All = Attribute_Transient | Attribute_Local | Attribute_Replicated | Attribute_Config }; struct DynamicAttribute @@ -71,8 +74,7 @@ public: DynamicObject(const Dictionary::Ptr& serializedObject); Dictionary::Ptr BuildUpdate(double sinceTx, int attributeTypes) const; - void ApplyUpdate(const Dictionary::Ptr& serializedUpdate, bool suppressEvents = false); - static void SanitizeUpdate(const Dictionary::Ptr& serializedUpdate, int allowedTypes); + void ApplyUpdate(const Dictionary::Ptr& serializedUpdate, int allowedTypes); void RegisterAttribute(const String& name, DynamicAttributeType type); @@ -86,7 +88,6 @@ public: AttributeConstIterator AttributeBegin(void) const; AttributeConstIterator AttributeEnd(void) const; - static boost::signal OnAttributeChanged; static boost::signal OnRegistered; static boost::signal OnUnregistered; static boost::signal&)> OnTransactionClosing; @@ -123,6 +124,9 @@ public: static void BeginTx(void); static void FinishTx(void); +protected: + virtual void OnAttributeChanged(const String& name, const Value& oldValue); + private: void InternalSetAttribute(const String& name, const Value& data, double tx, bool suppressEvent = false); Value InternalGetAttribute(const String& name) const; @@ -137,6 +141,8 @@ private: static double m_CurrentTx; static set m_ModifiedObjects; + + void InternalApplyUpdate(const Dictionary::Ptr& serializedUpdate, int allowedTypes, bool suppressEvents); }; class RegisterClassHelper diff --git a/base/i2-base.h b/base/i2-base.h index 2d96f1062..e8731b1b1 100644 --- a/base/i2-base.h +++ b/base/i2-base.h @@ -95,6 +95,7 @@ using std::map; using std::list; using std::set; using std::multimap; +using std::multiset; using std::pair; using std::deque; using std::make_pair; diff --git a/cib/service.cpp b/cib/service.cpp index 16ace8eb3..b0beac7b5 100644 --- a/cib/service.cpp +++ b/cib/service.cpp @@ -24,6 +24,7 @@ using namespace icinga; REGISTER_CLASS(Service); boost::signal Service::OnCheckResultReceived; +boost::signal Service::OnCheckerChanged; Service::Service(const Dictionary::Ptr& serializedObject) : DynamicObject(serializedObject) @@ -474,3 +475,9 @@ Dictionary::Ptr Service::ResolveDependencies(const Host::Ptr& host, const Dictio return result; } + +void Service::OnAttributeChanged(const String& name, const Value& oldValue) +{ + if (name == "checker") + OnCheckerChanged(GetSelf(), oldValue); +} diff --git a/cib/service.h b/cib/service.h index 6283ed97a..7db4f6235 100644 --- a/cib/service.h +++ b/cib/service.h @@ -107,6 +107,10 @@ public: static Dictionary::Ptr ResolveDependencies(const Host::Ptr& host, const Dictionary::Ptr& dependencies); static boost::signal OnCheckResultReceived; + static boost::signal OnCheckerChanged; + +protected: + virtual void OnAttributeChanged(const String& name, const Value& oldValue); }; } diff --git a/components/checker/checkercomponent.cpp b/components/checker/checkercomponent.cpp index 992d84bf6..feb809194 100644 --- a/components/checker/checkercomponent.cpp +++ b/components/checker/checkercomponent.cpp @@ -24,13 +24,12 @@ using namespace icinga; void CheckerComponent::Start(void) { m_Endpoint = boost::make_shared(); - m_Endpoint->RegisterTopicHandler("checker::AssignService", - boost::bind(&CheckerComponent::AssignServiceRequestHandler, this, _2, _3)); - m_Endpoint->RegisterTopicHandler("checker::ClearServices", - boost::bind(&CheckerComponent::ClearServicesRequestHandler, this, _2, _3)); m_Endpoint->RegisterPublication("checker::ServiceStateChange"); EndpointManager::GetInstance()->RegisterEndpoint(m_Endpoint); + Service::OnCheckerChanged.connect(bind(&CheckerComponent::CheckerChangedHandler, this, _1)); + DynamicObject::OnUnregistered.connect(bind(&CheckerComponent::ServiceRemovedHandler, this, _1)); + m_CheckTimer = boost::make_shared(); m_CheckTimer->SetInterval(1); m_CheckTimer->OnTimerExpired.connect(boost::bind(&CheckerComponent::CheckTimerHandler, this)); @@ -61,12 +60,13 @@ void CheckerComponent::CheckTimerHandler(void) long tasks = 0; while (!m_Services.empty()) { - Service::Ptr service = m_Services.top(); + CheckerComponent::ServiceMultiSet::iterator it = m_Services.begin(); + Service::Ptr service = *it; if (service->GetNextCheck() > now) break; - m_Services.pop(); + m_Services.erase(it); Logger::Write(LogDebug, "checker", "Executing service check for '" + service->GetName() + "'"); @@ -130,11 +130,11 @@ void CheckerComponent::CheckCompletedHandler(const Service::Ptr& service, const /* remove the service from the list of pending services; if it's not in the * list this was a manual (i.e. forced) check and we must not re-add the * service to the services list because it's already there. */ - set::iterator it; + CheckerComponent::ServiceMultiSet::iterator it; it = m_PendingServices.find(service); if (it != m_PendingServices.end()) { m_PendingServices.erase(it); - m_Services.push(service); + m_Services.insert(service); } Logger::Write(LogDebug, "checker", "Check finished for service '" + service->GetName() + "'"); @@ -149,35 +149,31 @@ void CheckerComponent::ResultTimerHandler(void) Logger::Write(LogInformation, "checker", msgbuf.str()); } -void CheckerComponent::AssignServiceRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request) +void CheckerComponent::CheckerChangedHandler(const Service::Ptr& service) { - MessagePart params; - if (!request.GetParams(¶ms)) - return; + String checker = service->GetChecker(); - String service; - if (!params.Get("service", &service)) - return; + if (checker == EndpointManager::GetInstance()->GetIdentity() || checker == m_Endpoint->GetIdentity()) { + if (m_PendingServices.find(service) != m_PendingServices.end()) + return; - if (!Service::Exists(service)) { - Logger::Write(LogWarning, "checker", "Ignoring delegation request for unknown service '" + service + "'."); - return; + m_Services.insert(service); + } else { + m_Services.erase(service); + m_PendingServices.erase(service); } - - Service::Ptr object = Service::GetByName(service); - - m_Services.push(object); - - Logger::Write(LogDebug, "checker", "Accepted delegation for service '" + service + "'"); } -void CheckerComponent::ClearServicesRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request) +void CheckerComponent::ServiceRemovedHandler(const DynamicObject::Ptr& object) { - Logger::Write(LogInformation, "checker", "Clearing service delegations."); + Service::Ptr service = dynamic_pointer_cast(object); + + /* ignore it if the removed object is not a service */ + if (!service) + return; - /* clear the services lists */ - m_Services = ServiceQueue(); - m_PendingServices.clear(); + m_Services.erase(service); + m_PendingServices.erase(service); } EXPORT_COMPONENT(checker, CheckerComponent); diff --git a/components/checker/checkercomponent.h b/components/checker/checkercomponent.h index 7659753a5..d5b18c464 100644 --- a/components/checker/checkercomponent.h +++ b/components/checker/checkercomponent.h @@ -41,7 +41,7 @@ public: typedef shared_ptr Ptr; typedef weak_ptr WeakPtr; - typedef priority_queue, ServiceNextCheckLessComparer> ServiceQueue; + typedef multiset ServiceMultiSet; virtual void Start(void); virtual void Stop(void); @@ -49,8 +49,8 @@ public: private: VirtualEndpoint::Ptr m_Endpoint; - ServiceQueue m_Services; - set m_PendingServices; + ServiceMultiSet m_Services; + ServiceMultiSet m_PendingServices; Timer::Ptr m_CheckTimer; @@ -63,8 +63,11 @@ private: void AdjustCheckTimer(void); - void AssignServiceRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request); - void ClearServicesRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request); + void CheckerChangedHandler(const Service::Ptr& service); + void ServiceRemovedHandler(const DynamicObject::Ptr& object); + + //void AssignServiceRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request); + //void ClearServicesRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request); }; } diff --git a/components/cibsync/cibsynccomponent.cpp b/components/cibsync/cibsynccomponent.cpp index 1088ff56b..3093ae212 100644 --- a/components/cibsync/cibsynccomponent.cpp +++ b/components/cibsync/cibsynccomponent.cpp @@ -233,7 +233,9 @@ void CIBSyncComponent::RemoteObjectUpdateHandler(const Endpoint::Ptr& sender, co if (object->GetSource().IsEmpty()) object->SetSource(sender->GetIdentity()); - object->ApplyUpdate(update, true); + // TODO: disallow config updates depending on endpoint config + + object->ApplyUpdate(update, Attribute_All); } } diff --git a/components/delegation/delegationcomponent.cpp b/components/delegation/delegationcomponent.cpp index f7bc18a56..e30d0429c 100644 --- a/components/delegation/delegationcomponent.cpp +++ b/components/delegation/delegationcomponent.cpp @@ -24,103 +24,16 @@ using namespace icinga; void DelegationComponent::Start(void) { - DynamicObject::OnRegistered.connect(boost::bind(&DelegationComponent::ServiceCommittedHandler, this, _1)); - DynamicObject::OnUnregistered.connect(boost::bind(&DelegationComponent::ServiceRemovedHandler, this, _1)); - m_DelegationTimer = boost::make_shared(); m_DelegationTimer->SetInterval(30); m_DelegationTimer->OnTimerExpired.connect(boost::bind(&DelegationComponent::DelegationTimerHandler, this)); m_DelegationTimer->Start(); m_DelegationTimer->Reschedule(0); - - m_Endpoint = boost::make_shared(); - m_Endpoint->RegisterPublication("checker::AssignService"); - m_Endpoint->RegisterPublication("checker::ClearServices"); - m_Endpoint->RegisterPublication("delegation::ServiceStatus"); - EndpointManager::GetInstance()->RegisterEndpoint(m_Endpoint); - - EndpointManager::GetInstance()->OnNewEndpoint.connect(bind(&DelegationComponent::NewEndpointHandler, this, _2)); -} - -void DelegationComponent::Stop(void) -{ - EndpointManager::Ptr mgr = EndpointManager::GetInstance(); - - if (mgr) - mgr->UnregisterEndpoint(m_Endpoint); -} - -void DelegationComponent::ServiceCommittedHandler(const DynamicObject::Ptr& object) -{ - Service::Ptr service = dynamic_pointer_cast(object); - - if (!service) - return; - - String checker = service->GetChecker(); - - if (!checker.IsEmpty()) { - /* object was updated, clear its checker to make sure it's re-delegated by the delegation timer */ - service->SetChecker(""); - - /* TODO: figure out a better way to clear individual services */ - Endpoint::Ptr endpoint = EndpointManager::GetInstance()->GetEndpointByIdentity(checker); - - if (endpoint) - ClearServices(endpoint); - } -} - -void DelegationComponent::ServiceRemovedHandler(const DynamicObject::Ptr& object) -{ - Service::Ptr service = dynamic_pointer_cast(object); - - if (!service) - return; - - String checker = service->GetChecker(); - - if (!checker.IsEmpty()) { - /* TODO: figure out a better way to clear individual services */ - Endpoint::Ptr endpoint = EndpointManager::GetInstance()->GetEndpointByIdentity(checker); - - if (endpoint) - ClearServices(endpoint); - } -} - -void DelegationComponent::AssignService(const Endpoint::Ptr& checker, const Service::Ptr& service) -{ - RequestMessage request; - request.SetMethod("checker::AssignService"); - - MessagePart params; - params.Set("service", service->GetName()); - request.SetParams(params); - - Logger::Write(LogDebug, "delegation", "Trying to delegate service '" + service->GetName() + "'"); - - EndpointManager::GetInstance()->SendUnicastMessage(m_Endpoint, checker, request); -} - -void DelegationComponent::ClearServices(const Endpoint::Ptr& checker) -{ - stringstream msgbuf; - msgbuf << "Clearing assigned services for endpoint '" << checker->GetIdentity() << "'"; - Logger::Write(LogInformation, "delegation", msgbuf.str()); - - RequestMessage request; - request.SetMethod("checker::ClearServices"); - - MessagePart params; - request.SetParams(params); - - EndpointManager::GetInstance()->SendUnicastMessage(m_Endpoint, checker, request); } bool DelegationComponent::IsEndpointChecker(const Endpoint::Ptr& endpoint) { - return (endpoint->HasSubscription("checker::AssignService")); + return (endpoint->HasPublication("checker::ServiceStateChange")); } vector DelegationComponent::GetCheckerCandidates(const Service::Ptr& service) const @@ -149,32 +62,6 @@ vector DelegationComponent::GetCheckerCandidates(const Service::P return candidates; } -void DelegationComponent::NewEndpointHandler(const Endpoint::Ptr& endpoint) -{ - endpoint->OnSessionEstablished.connect(bind(&DelegationComponent::SessionEstablishedHandler, this, _1)); -} -void DelegationComponent::SessionEstablishedHandler(const Endpoint::Ptr& endpoint) -{ - /* ignore this endpoint if it's not a checker */ - if (!IsEndpointChecker(endpoint)) - return; - - /* locally clear checker for all services that previously belonged to this endpoint */ - DynamicObject::Ptr object; - BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Service")) { - Service::Ptr service = dynamic_pointer_cast(object); - - if (!service) - continue; - - if (service->GetChecker() == endpoint->GetIdentity()) - service->SetChecker(""); - } - - /* remotely clear services for this endpoint */ - ClearServices(endpoint); -} - void DelegationComponent::DelegationTimerHandler(void) { map histogram; @@ -280,25 +167,6 @@ void DelegationComponent::DelegationTimerHandler(void) Logger::Write(LogInformation, "delegation", msgbuf.str()); } - if (delegated > 0) { - if (need_clear) { - Endpoint::Ptr endpoint; - BOOST_FOREACH(tie(endpoint, tuples::ignore), histogram) { - ClearServices(endpoint); - } - } - - BOOST_FOREACH(const Service::Ptr& service, services) { - String checker = service->GetChecker(); - Endpoint::Ptr endpoint = EndpointManager::GetInstance()->GetEndpointByIdentity(checker); - - if (!endpoint) - continue; - - AssignService(endpoint, service); - } - } - stringstream msgbuf; msgbuf << "Updated delegations for " << delegated << " services"; Logger::Write(LogInformation, "delegation", msgbuf.str()); diff --git a/components/delegation/delegationcomponent.h b/components/delegation/delegationcomponent.h index 4ace3fe35..4241a7d26 100644 --- a/components/delegation/delegationcomponent.h +++ b/components/delegation/delegationcomponent.h @@ -30,27 +30,15 @@ class DelegationComponent : public IComponent { public: virtual void Start(void); - virtual void Stop(void); private: - VirtualEndpoint::Ptr m_Endpoint; Timer::Ptr m_DelegationTimer; - void NewEndpointHandler(const Endpoint::Ptr& endpoint); - void SessionEstablishedHandler(const Endpoint::Ptr& endpoint); - - void ServiceCommittedHandler(const DynamicObject::Ptr& object); - void ServiceRemovedHandler(const DynamicObject::Ptr& object); void DelegationTimerHandler(void); vector GetCheckerCandidates(const Service::Ptr& service) const; - void AssignService(const Endpoint::Ptr& checker, const Service::Ptr& service); - void ClearServices(const Endpoint::Ptr& checker); - static bool IsEndpointChecker(const Endpoint::Ptr& endpoint); - - void CheckResultRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request); }; } diff --git a/dyn/configitem.cpp b/dyn/configitem.cpp index 17341cfd6..c2f00c7e5 100644 --- a/dyn/configitem.cpp +++ b/dyn/configitem.cpp @@ -106,7 +106,7 @@ DynamicObject::Ptr ConfigItem::Commit(void) if (!dobj) dobj = DynamicObject::Create(GetType(), update); else - dobj->ApplyUpdate(update); + dobj->ApplyUpdate(update, Attribute_Config); m_DynamicObject = dobj;