]> granicus.if.org Git - icinga2/commitdiff
Bugfixes.
authorGunnar Beutner <gunnar.beutner@netways.de>
Mon, 2 Jul 2012 12:38:37 +0000 (14:38 +0200)
committerGunnar Beutner <gunnar.beutner@netways.de>
Mon, 2 Jul 2012 12:43:51 +0000 (14:43 +0200)
base/configobject.cpp
base/configobject.h
components/configrpc/configrpccomponent.cpp
components/configrpc/configrpccomponent.h
components/delegation/delegationcomponent.cpp

index 49b7e529fc6bf07d1f129add63b4cb011bf80f79..57a062f9626a037a5dd0d22da72c70a4d5dce50e 100644 (file)
@@ -87,6 +87,18 @@ bool ConfigObject::IsAbstract(void) const
        return value;
 }
 
+void ConfigObject::SetSource(const string& value)
+{
+       GetProperties()->SetProperty("__source", value);
+}
+
+string ConfigObject::GetSource(void) const
+{
+       string value;
+       GetProperties()->GetProperty("__source", &value);
+       return value;
+}
+
 void ConfigObject::Commit(void)
 {
        ConfigObject::Ptr dobj = GetObject(GetType(), GetName());
index 8c376a720142f27601dd81cede82c83c280f14d2..84b7b6db4d6d23bf414ee3c955e071845b9659af 100644 (file)
@@ -79,6 +79,9 @@ public:
        void SetAbstract(bool value);
        bool IsAbstract(void) const;
 
+       void SetSource(const string& value);
+       string GetSource(void) const;
+
        void Commit(void);
        void Unregister(void);
 
index 7ac576676af05fdc1ee451a0a5d622f856825d01..931281cedb691e95e9f9ea060cc6ea2e25074a42 100644 (file)
@@ -28,28 +28,27 @@ string ConfigRpcComponent::GetName(void) const
 
 void ConfigRpcComponent::Start(void)
 {
+       m_Syncing = false;
+
        EndpointManager::Ptr endpointManager = EndpointManager::GetInstance();
 
        m_Endpoint = boost::make_shared<VirtualEndpoint>();
 
-       long configSource;
-       if (GetConfig()->GetProperty("configSource", &configSource) && configSource != 0) {
-               m_Endpoint->RegisterTopicHandler("config::FetchObjects",
-                   boost::bind(&ConfigRpcComponent::FetchObjectsHandler, this, _2));
+       m_Endpoint->RegisterTopicHandler("config::FetchObjects",
+           boost::bind(&ConfigRpcComponent::FetchObjectsHandler, this, _2));
 
-               ConfigObject::GetAllObjects()->OnObjectAdded.connect(boost::bind(&ConfigRpcComponent::LocalObjectCommittedHandler, this, _2));
-               ConfigObject::GetAllObjects()->OnObjectCommitted.connect(boost::bind(&ConfigRpcComponent::LocalObjectCommittedHandler, this, _2));
-               ConfigObject::GetAllObjects()->OnObjectRemoved.connect(boost::bind(&ConfigRpcComponent::LocalObjectRemovedHandler, this, _2));
+       ConfigObject::GetAllObjects()->OnObjectAdded.connect(boost::bind(&ConfigRpcComponent::LocalObjectCommittedHandler, this, _2));
+       ConfigObject::GetAllObjects()->OnObjectCommitted.connect(boost::bind(&ConfigRpcComponent::LocalObjectCommittedHandler, this, _2));
+       ConfigObject::GetAllObjects()->OnObjectRemoved.connect(boost::bind(&ConfigRpcComponent::LocalObjectRemovedHandler, this, _2));
 
-               m_Endpoint->RegisterPublication("config::ObjectCommitted");
-               m_Endpoint->RegisterPublication("config::ObjectRemoved");
-       }
+       m_Endpoint->RegisterPublication("config::ObjectCommitted");
+       m_Endpoint->RegisterPublication("config::ObjectRemoved");
 
        endpointManager->OnNewEndpoint.connect(boost::bind(&ConfigRpcComponent::NewEndpointHandler, this, _2));
 
        m_Endpoint->RegisterPublication("config::FetchObjects");
        m_Endpoint->RegisterTopicHandler("config::ObjectCommitted",
-           boost::bind(&ConfigRpcComponent::RemoteObjectCommittedHandler, this, _3));
+           boost::bind(&ConfigRpcComponent::RemoteObjectCommittedHandler, this, _2, _3));
        m_Endpoint->RegisterTopicHandler("config::ObjectRemoved",
            boost::bind(&ConfigRpcComponent::RemoteObjectRemovedHandler, this, _3));
 
@@ -121,6 +120,10 @@ void ConfigRpcComponent::FetchObjectsHandler(const Endpoint::Ptr& sender)
 
 void ConfigRpcComponent::LocalObjectCommittedHandler(const ConfigObject::Ptr& object)
 {
+       /* don't send messages when we're currently processing a remote update */
+       if (m_Syncing)
+               return;
+
        if (!ShouldReplicateObject(object))
                return;
 
@@ -130,6 +133,10 @@ void ConfigRpcComponent::LocalObjectCommittedHandler(const ConfigObject::Ptr& ob
 
 void ConfigRpcComponent::LocalObjectRemovedHandler(const ConfigObject::Ptr& object)
 {
+       /* don't send messages when we're currently processing a remote update */
+       if (m_Syncing)
+               return;
+
        if (!ShouldReplicateObject(object))
                return;
 
@@ -137,7 +144,7 @@ void ConfigRpcComponent::LocalObjectRemovedHandler(const ConfigObject::Ptr& obje
            MakeObjectMessage(object, "config::ObjectRemoved", false));
 }
 
-void ConfigRpcComponent::RemoteObjectCommittedHandler(const RequestMessage& request)
+void ConfigRpcComponent::RemoteObjectCommittedHandler(const Endpoint::Ptr& sender, const RequestMessage& request)
 {
        MessagePart params;
        if (!request.GetParams(&params))
@@ -157,15 +164,41 @@ void ConfigRpcComponent::RemoteObjectCommittedHandler(const RequestMessage& requ
 
        ConfigObject::Ptr object = ConfigObject::GetObject(type, name);
 
-       if (!object)
+       if (!object) {
                object = boost::make_shared<ConfigObject>(properties.GetDictionary());
-       else
+
+               if (object->GetSource() == EndpointManager::GetInstance()->GetIdentity()) {
+                       /* the peer sent us an object that was originally created by us - 
+                        * however if was deleted locally so we have to tell the peer to destroy
+                        * its copy of the object. */
+                       EndpointManager::GetInstance()->SendMulticastMessage(m_Endpoint,
+                           MakeObjectMessage(object, "config::ObjectRemoved", false));
+
+                       return;
+               }
+       } else {
+               /* TODO: compare transaction timestamps and reject the update if our local object is newer */
+
                object->SetProperties(properties.GetDictionary());
+       }
 
        if (object->IsLocal())
                throw invalid_argument("Replicated remote object is marked as local.");
 
-       object->Commit();
+       if (object->GetSource().empty())
+               object->SetSource(sender->GetIdentity());
+
+       try {
+               /* TODO: only ignore updates for _this_ object rather than all objects
+                * this might be relevant if the commit handler for this object
+                * creates other objects. */
+               m_Syncing = true;
+               object->Commit();
+               m_Syncing = false;
+       } catch (const std::exception& ex) {
+               m_Syncing = false;
+               throw;
+       }
 }
 
 void ConfigRpcComponent::RemoteObjectRemovedHandler(const RequestMessage& request)
@@ -187,8 +220,16 @@ void ConfigRpcComponent::RemoteObjectRemovedHandler(const RequestMessage& reques
        if (!object)
                return;
 
-       if (!object->IsLocal())
-               object->Unregister();
+       if (!object->IsLocal()) {
+               try {
+                       m_Syncing = true;
+                       object->Unregister();
+                       m_Syncing = false;
+               } catch (const std::exception& ex) {
+                       m_Syncing = false;
+                       throw;
+               }
+       }
 }
 
 EXPORT_COMPONENT(configrpc, ConfigRpcComponent);
index c3f602de5207316c58d83d8dd19b4cb7c579b0b0..5e9bc073f0da9dec4223cac6556b110c6312ba87 100644 (file)
@@ -35,6 +35,7 @@ public:
 
 private:
        VirtualEndpoint::Ptr m_Endpoint;
+       bool m_Syncing;
 
        void NewEndpointHandler(const Endpoint::Ptr& endpoint);
        void SessionEstablishedHandler(const Endpoint::Ptr& endpoint);
@@ -43,7 +44,7 @@ private:
        void LocalObjectRemovedHandler(const ConfigObject::Ptr& object);
 
        void FetchObjectsHandler(const Endpoint::Ptr& sender);
-       void RemoteObjectCommittedHandler(const RequestMessage& request);
+       void RemoteObjectCommittedHandler(const Endpoint::Ptr& sender, const RequestMessage& request);
        void RemoteObjectRemovedHandler(const RequestMessage& request);
 
        static RequestMessage MakeObjectMessage(const ConfigObject::Ptr& object,
index e85316633843dae4df057cdfec857694bd27cb66..b3139ecc8d22459ac611baba502ff595811e6bfe 100644 (file)
@@ -64,8 +64,18 @@ void DelegationComponent::ObjectCommittedHandler(const ConfigObject::Ptr& object
 {
        Service service(object);
 
-       /* object was updated, clear its checker to make sure it's re-delegated by the delegation timer */
-       service.SetChecker("");
+       string checker = service.GetChecker();
+
+       if (!checker.empty()) {
+               /* object was updated, clear its checker to make sure it's re-delegated by the delegation timer */
+               service.SetChecker("");
+
+               /* TODO: figure out a better way to clear individual services */
+               Endpoint::Ptr endpoint = EndpointManager::GetInstance()->GetEndpointByIdentity(checker);
+
+               if (endpoint)
+                       ClearServices(endpoint);
+       }
 }
 
 void DelegationComponent::AssignService(const Endpoint::Ptr& checker, const Service& service)
@@ -84,6 +94,10 @@ void DelegationComponent::AssignService(const Endpoint::Ptr& checker, const Serv
 
 void DelegationComponent::ClearServices(const Endpoint::Ptr& checker)
 {
+       stringstream msgbuf;
+       msgbuf << "Clearing assigned services for endpoint '" << checker->GetIdentity() << "'";
+       Application::Log(LogInformation, "delegation", msgbuf.str());
+
        RequestMessage request;
        request.SetMethod("checker::ClearServices");
 
@@ -134,10 +148,6 @@ void DelegationComponent::SessionEstablishedHandler(const Endpoint::Ptr& endpoin
        if (!IsEndpointChecker(endpoint))
                return;
 
-       stringstream msgbuf;
-       msgbuf << "Clearing assigned services for endpoint '" << endpoint->GetIdentity() << "'";
-       Application::Log(LogInformation, "delegation", msgbuf.str());
-
        /* locally clear checker for all services that previously belonged to this endpoint */
        ConfigObject::Set::Iterator it;
        for (it = m_AllServices->Begin(); it != m_AllServices->End(); it++) {