From dc6246642f730eaef188a406a37c69a33dbe6379 Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Mon, 3 Sep 2012 10:28:14 +0200 Subject: [PATCH] Implemented replication for Endpoint objects. --- base/dictionary.cpp | 18 + base/dictionary.h | 2 + base/dynamicobject.cpp | 5 + base/dynamicobject.h | 6 +- base/process.cpp | 2 + components/Makefile.am | 3 +- components/checker/checkercomponent.cpp | 11 +- components/checker/checkercomponent.h | 2 +- components/cibsync/cibsynccomponent.cpp | 61 +-- components/cibsync/cibsynccomponent.h | 6 +- components/delegation/delegationcomponent.cpp | 31 +- components/demo/democomponent.cpp | 10 +- components/demo/democomponent.h | 2 +- components/discovery/Makefile.am | 35 -- components/discovery/discovery.vcxproj | 94 ---- .../discovery/discovery.vcxproj.filters | 30 -- components/discovery/discoverycomponent.cpp | 448 ------------------ components/discovery/discoverycomponent.h | 82 ---- components/discovery/discoverymessage.cpp | 71 --- components/discovery/discoverymessage.h | 50 -- components/discovery/i2-discovery.h | 38 -- configure.ac | 1 - doc/icinga2-config.odt | Bin 31305 -> 31407 bytes icinga-app/Makefile.am | 3 +- icinga/Makefile.am | 6 +- icinga/endpoint.cpp | 319 ++++++++++--- icinga/endpoint.h | 65 +-- icinga/endpointmanager.cpp | 257 +++++----- icinga/endpointmanager.h | 32 +- icinga/i2-icinga.h | 2 - icinga/icingaapplication.cpp | 8 +- icinga/icingaapplication.h | 2 + icinga/jsonrpcendpoint.cpp | 148 ------ icinga/jsonrpcendpoint.h | 71 --- icinga/virtualendpoint.cpp | 97 ---- icinga/virtualendpoint.h | 57 --- jsonrpc/jsonrpcclient.cpp | 6 +- 37 files changed, 524 insertions(+), 1557 deletions(-) delete mode 100644 components/discovery/Makefile.am delete mode 100644 components/discovery/discovery.vcxproj delete mode 100644 components/discovery/discovery.vcxproj.filters delete mode 100644 components/discovery/discoverycomponent.cpp delete mode 100644 components/discovery/discoverycomponent.h delete mode 100644 components/discovery/discoverymessage.cpp delete mode 100644 components/discovery/discoverymessage.h delete mode 100644 components/discovery/i2-discovery.h delete mode 100644 icinga/jsonrpcendpoint.cpp delete mode 100644 icinga/jsonrpcendpoint.h delete mode 100644 icinga/virtualendpoint.cpp delete mode 100644 icinga/virtualendpoint.h diff --git a/base/dictionary.cpp b/base/dictionary.cpp index 8f42647b1..6fcfeaf59 100644 --- a/base/dictionary.cpp +++ b/base/dictionary.cpp @@ -175,6 +175,24 @@ void Dictionary::Remove(Dictionary::Iterator it) m_Data.erase(it); } +/** + * Makes a shallow copy of a dictionary. + * + * @returns a copy of the dictionary. + */ +Dictionary::Ptr Dictionary::ShallowClone(void) const +{ + Dictionary::Ptr clone = boost::make_shared(); + + String key; + Value value; + BOOST_FOREACH(tie(key, value), m_Data) { + clone->Set(key, value); + } + + return clone; +} + /** * Converts a JSON object to a dictionary. * diff --git a/base/dictionary.h b/base/dictionary.h index bdfd2af61..43dbc61c9 100644 --- a/base/dictionary.h +++ b/base/dictionary.h @@ -50,6 +50,8 @@ public: void Remove(const String& key); void Remove(Iterator it); + Dictionary::Ptr ShallowClone(void) const; + static Dictionary::Ptr FromJson(cJSON *json); cJSON *ToJson(void) const; diff --git a/base/dynamicobject.cpp b/base/dynamicobject.cpp index b7faafccf..60471c288 100644 --- a/base/dynamicobject.cpp +++ b/base/dynamicobject.cpp @@ -145,6 +145,11 @@ void DynamicObject::Set(const String& name, const Value& data) InternalSetAttribute(name, data, GetCurrentTx()); } +void DynamicObject::Touch(const String& name) +{ + InternalSetAttribute(name, InternalGetAttribute(name), GetCurrentTx()); +} + Value DynamicObject::Get(const String& name) const { return InternalGetAttribute(name); diff --git a/base/dynamicobject.h b/base/dynamicobject.h index c85a006ca..6ff8b77ac 100644 --- a/base/dynamicobject.h +++ b/base/dynamicobject.h @@ -79,6 +79,7 @@ public: void RegisterAttribute(const String& name, DynamicAttributeType type); void Set(const String& name, const Value& data); + void Touch(const String& name); Value Get(const String& name) const; bool HasAttribute(const String& name) const; @@ -162,8 +163,11 @@ shared_ptr DynamicObjectFactory(const Dictionary::Ptr& serializedUpdate) return boost::make_shared(serializedUpdate); } +#define REGISTER_CLASS_ALIAS(klass, alias) \ + static RegisterClassHelper g_Register ## klass(alias, DynamicObjectFactory) + #define REGISTER_CLASS(klass) \ - static RegisterClassHelper g_Register ## klass(#klass, DynamicObjectFactory) + REGISTER_CLASS_ALIAS(klass, #klass) } diff --git a/base/process.cpp b/base/process.cpp index e8abd61f2..1a8e8aed8 100644 --- a/base/process.cpp +++ b/base/process.cpp @@ -33,6 +33,8 @@ condition_variable Process::m_TasksCV; Process::Process(const String& command) : AsyncTask(), m_Command(command), m_UsePopen(false) { + assert(Application::IsMainThread()); + if (!m_ThreadCreated) { thread t(&Process::WorkerThreadProc); t.detach(); diff --git a/components/Makefile.am b/components/Makefile.am index edf2f2952..7b1c745fb 100644 --- a/components/Makefile.am +++ b/components/Makefile.am @@ -7,5 +7,4 @@ SUBDIRS = \ compat \ convenience \ delegation \ - demo \ - discovery + demo diff --git a/components/checker/checkercomponent.cpp b/components/checker/checkercomponent.cpp index ed98ee86d..648373735 100644 --- a/components/checker/checkercomponent.cpp +++ b/components/checker/checkercomponent.cpp @@ -23,14 +23,12 @@ using namespace icinga; void CheckerComponent::Start(void) { - m_Endpoint = boost::make_shared(); + m_Endpoint = Endpoint::MakeEndpoint("checker", true); /* dummy registration so the delegation module knows this is a checker TODO: figure out a better way for this */ m_Endpoint->RegisterSubscription("checker"); - EndpointManager::GetInstance()->RegisterEndpoint(m_Endpoint); - Service::OnCheckerChanged.connect(bind(&CheckerComponent::CheckerChangedHandler, this, _1)); DynamicObject::OnUnregistered.connect(bind(&CheckerComponent::ServiceRemovedHandler, this, _1)); @@ -50,10 +48,7 @@ void CheckerComponent::Start(void) void CheckerComponent::Stop(void) { - EndpointManager::Ptr mgr = EndpointManager::GetInstance(); - - if (mgr) - mgr->UnregisterEndpoint(m_Endpoint); + m_Endpoint->Unregister(); } void CheckerComponent::CheckTimerHandler(void) @@ -158,7 +153,7 @@ void CheckerComponent::CheckerChangedHandler(const Service::Ptr& service) { String checker = service->GetChecker(); - if (checker == EndpointManager::GetInstance()->GetIdentity() || checker == m_Endpoint->GetIdentity()) { + if (checker == EndpointManager::GetInstance()->GetIdentity() || checker == m_Endpoint->GetName()) { if (m_PendingServices.find(service) != m_PendingServices.end()) return; diff --git a/components/checker/checkercomponent.h b/components/checker/checkercomponent.h index 8e59f6131..b54d5d97f 100644 --- a/components/checker/checkercomponent.h +++ b/components/checker/checkercomponent.h @@ -54,7 +54,7 @@ public: virtual void Stop(void); private: - VirtualEndpoint::Ptr m_Endpoint; + Endpoint::Ptr m_Endpoint; ServiceSet m_IdleServices; ServiceSet m_PendingServices; diff --git a/components/cibsync/cibsynccomponent.cpp b/components/cibsync/cibsynccomponent.cpp index a44a168fb..ed66ef43e 100644 --- a/components/cibsync/cibsynccomponent.cpp +++ b/components/cibsync/cibsynccomponent.cpp @@ -26,18 +26,14 @@ using namespace icinga; */ void CIBSyncComponent::Start(void) { - m_Endpoint = boost::make_shared(); - - /* config objects */ - m_Endpoint->RegisterTopicHandler("config::FetchObjects", - boost::bind(&CIBSyncComponent::FetchObjectsHandler, this, _2)); + m_Endpoint = Endpoint::MakeEndpoint("cibsync", true); DynamicObject::OnRegistered.connect(boost::bind(&CIBSyncComponent::LocalObjectRegisteredHandler, this, _1)); DynamicObject::OnUnregistered.connect(boost::bind(&CIBSyncComponent::LocalObjectUnregisteredHandler, this, _1)); DynamicObject::OnTransactionClosing.connect(boost::bind(&CIBSyncComponent::TransactionClosingHandler, this, _1)); - EndpointManager::GetInstance()->OnNewEndpoint.connect(boost::bind(&CIBSyncComponent::NewEndpointHandler, this, _2)); - + Endpoint::OnConnected.connect(boost::bind(&CIBSyncComponent::EndpointConnectedHandler, this, _1)); + m_Endpoint->RegisterTopicHandler("config::ObjectUpdate", boost::bind(&CIBSyncComponent::RemoteObjectUpdateHandler, this, _2, _3)); m_Endpoint->RegisterTopicHandler("config::ObjectRemoved", @@ -46,8 +42,6 @@ void CIBSyncComponent::Start(void) /* service status */ m_Endpoint->RegisterTopicHandler("checker::ServiceStateChange", boost::bind(&CIBSyncComponent::ServiceStateChangeRequestHandler, _2, _3)); - - EndpointManager::GetInstance()->RegisterEndpoint(m_Endpoint); } /** @@ -55,10 +49,7 @@ void CIBSyncComponent::Start(void) */ void CIBSyncComponent::Stop(void) { - EndpointManager::Ptr endpointManager = EndpointManager::GetInstance(); - - if (endpointManager) - endpointManager->UnregisterEndpoint(m_Endpoint); + m_Endpoint->Unregister(); } void CIBSyncComponent::ServiceStateChangeRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request) @@ -84,21 +75,28 @@ void CIBSyncComponent::ServiceStateChangeRequestHandler(const Endpoint::Ptr& sen CIB::UpdateTaskStatistics(now, 1); } -void CIBSyncComponent::NewEndpointHandler(const Endpoint::Ptr& endpoint) +void CIBSyncComponent::EndpointConnectedHandler(const Endpoint::Ptr& endpoint) { /* no need to sync the config with local endpoints */ - if (endpoint->IsLocal()) + if (endpoint->IsLocalEndpoint()) return; - endpoint->OnSessionEstablished.connect(boost::bind(&CIBSyncComponent::SessionEstablishedHandler, this, _1)); -} + /* we just assume the other endpoint wants object updates */ + endpoint->RegisterSubscription("config::ObjectUpdate"); + endpoint->RegisterSubscription("config::ObjectRemoved"); -void CIBSyncComponent::SessionEstablishedHandler(const Endpoint::Ptr& endpoint) -{ - RequestMessage request; - request.SetMethod("config::FetchObjects"); + pair trange = DynamicObject::GetTypes(); + DynamicObject::TypeMap::iterator tt; + for (tt = trange.first; tt != trange.second; tt++) { + DynamicObject::Ptr object; + BOOST_FOREACH(tie(tuples::ignore, object), tt->second) { + if (!ShouldReplicateObject(object)) + continue; - EndpointManager::GetInstance()->SendUnicastMessage(m_Endpoint, endpoint, request); + RequestMessage request = MakeObjectMessage(object, "config::ObjectUpdate", 0, true); + EndpointManager::GetInstance()->SendUnicastMessage(m_Endpoint, endpoint, request); + } + } } RequestMessage CIBSyncComponent::MakeObjectMessage(const DynamicObject::Ptr& object, const String& method, double sinceTx, bool includeProperties) @@ -123,23 +121,6 @@ bool CIBSyncComponent::ShouldReplicateObject(const DynamicObject::Ptr& object) return (!object->IsLocal()); } -void CIBSyncComponent::FetchObjectsHandler(const Endpoint::Ptr& sender) -{ - pair trange = DynamicObject::GetTypes(); - DynamicObject::TypeMap::iterator tt; - for (tt = trange.first; tt != trange.second; tt++) { - DynamicObject::Ptr object; - BOOST_FOREACH(tie(tuples::ignore, object), tt->second) { - if (!ShouldReplicateObject(object)) - continue; - - RequestMessage request = MakeObjectMessage(object, "config::ObjectUpdate", 0, true); - - EndpointManager::GetInstance()->SendUnicastMessage(m_Endpoint, sender, request); - } - } -} - void CIBSyncComponent::LocalObjectRegisteredHandler(const DynamicObject::Ptr& object) { if (!ShouldReplicateObject(object)) @@ -212,7 +193,7 @@ void CIBSyncComponent::RemoteObjectUpdateHandler(const Endpoint::Ptr& sender, co } if (object->GetSource().IsEmpty()) - object->SetSource(sender->GetIdentity()); + object->SetSource(sender->GetName()); object->Register(); } else { diff --git a/components/cibsync/cibsynccomponent.h b/components/cibsync/cibsynccomponent.h index 56456682f..89fae5513 100644 --- a/components/cibsync/cibsynccomponent.h +++ b/components/cibsync/cibsynccomponent.h @@ -33,18 +33,16 @@ public: virtual void Stop(void); private: - VirtualEndpoint::Ptr m_Endpoint; + Endpoint::Ptr m_Endpoint; static void ServiceStateChangeRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request); - void NewEndpointHandler(const Endpoint::Ptr& endpoint); - void SessionEstablishedHandler(const Endpoint::Ptr& endpoint); + void EndpointConnectedHandler(const Endpoint::Ptr& endpoint); void LocalObjectRegisteredHandler(const DynamicObject::Ptr& object); void LocalObjectUnregisteredHandler(const DynamicObject::Ptr& object); void TransactionClosingHandler(const set& modifiedObjects); - void FetchObjectsHandler(const Endpoint::Ptr& sender); void RemoteObjectUpdateHandler(const Endpoint::Ptr& sender, const RequestMessage& request); void RemoteObjectRemovedHandler(const RequestMessage& request); diff --git a/components/delegation/delegationcomponent.cpp b/components/delegation/delegationcomponent.cpp index 560dac591..7acbccd82 100644 --- a/components/delegation/delegationcomponent.cpp +++ b/components/delegation/delegationcomponent.cpp @@ -40,9 +40,9 @@ vector DelegationComponent::GetCheckerCandidates(const Service::P { vector candidates; - EndpointManager::Iterator it; - for (it = EndpointManager::GetInstance()->Begin(); it != EndpointManager::GetInstance()->End(); it++) { - Endpoint::Ptr endpoint = it->second; + DynamicObject::Ptr object; + BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) { + Endpoint::Ptr endpoint = dynamic_pointer_cast(object); /* ignore disconnected endpoints */ if (!endpoint->IsConnected()) @@ -53,7 +53,7 @@ vector DelegationComponent::GetCheckerCandidates(const Service::P continue; /* ignore endpoints that aren't allowed to check this service */ - if (!service->IsAllowedChecker(it->first)) + if (!service->IsAllowedChecker(endpoint->GetName())) continue; candidates.push_back(endpoint); @@ -66,14 +66,16 @@ void DelegationComponent::DelegationTimerHandler(void) { map histogram; - EndpointManager::Iterator eit; - for (eit = EndpointManager::GetInstance()->Begin(); eit != EndpointManager::GetInstance()->End(); eit++) - histogram[eit->second] = 0; + DynamicObject::Ptr object; + BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) { + Endpoint::Ptr endpoint = dynamic_pointer_cast(object); + + histogram[endpoint] = 0; + } vector services; /* build "checker -> service count" histogram */ - DynamicObject::Ptr object; BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Service")) { Service::Ptr service = dynamic_pointer_cast(object); @@ -86,10 +88,11 @@ void DelegationComponent::DelegationTimerHandler(void) if (checker.IsEmpty()) continue; - Endpoint::Ptr endpoint = EndpointManager::GetInstance()->GetEndpointByIdentity(checker); - if (!endpoint) + if (!Endpoint::Exists(checker)) continue; + Endpoint::Ptr endpoint = Endpoint::GetByName(checker); + histogram[endpoint]++; } @@ -102,8 +105,8 @@ void DelegationComponent::DelegationTimerHandler(void) String checker = service->GetChecker(); Endpoint::Ptr oldEndpoint; - if (!checker.IsEmpty()) - oldEndpoint = EndpointManager::GetInstance()->GetEndpointByIdentity(checker); + if (Endpoint::Exists(checker)) + oldEndpoint = Endpoint::GetByName(checker); vector candidates = GetCheckerCandidates(service); @@ -146,7 +149,7 @@ void DelegationComponent::DelegationTimerHandler(void) if (histogram[candidate] > avg_services) continue; - service->SetChecker(candidate->GetIdentity()); + service->SetChecker(candidate->GetName()); histogram[candidate]++; delegated++; @@ -161,7 +164,7 @@ void DelegationComponent::DelegationTimerHandler(void) int count; BOOST_FOREACH(tie(endpoint, count), histogram) { stringstream msgbuf; - msgbuf << "histogram: " << endpoint->GetIdentity() << " - " << count; + msgbuf << "histogram: " << endpoint->GetName() << " - " << count; Logger::Write(LogInformation, "delegation", msgbuf.str()); } diff --git a/components/demo/democomponent.cpp b/components/demo/democomponent.cpp index edeae95c2..2474ed9b1 100644 --- a/components/demo/democomponent.cpp +++ b/components/demo/democomponent.cpp @@ -26,10 +26,9 @@ using namespace icinga; */ void DemoComponent::Start(void) { - m_Endpoint = boost::make_shared(); + m_Endpoint = Endpoint::MakeEndpoint("demo", true); m_Endpoint->RegisterTopicHandler("demo::HelloWorld", boost::bind(&DemoComponent::HelloWorldRequestHandler, this, _2, _3)); - EndpointManager::GetInstance()->RegisterEndpoint(m_Endpoint); m_DemoTimer = boost::make_shared(); m_DemoTimer->SetInterval(5); @@ -42,10 +41,7 @@ void DemoComponent::Start(void) */ void DemoComponent::Stop(void) { - EndpointManager::Ptr endpointManager = EndpointManager::GetInstance(); - - if (endpointManager) - endpointManager->UnregisterEndpoint(m_Endpoint); + m_Endpoint->Unregister(); } /** @@ -68,7 +64,7 @@ void DemoComponent::DemoTimerHandler(void) */ void DemoComponent::HelloWorldRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request) { - Logger::Write(LogInformation, "demo", "Got 'hello world' from address=" + sender->GetAddress() + ", identity=" + sender->GetIdentity()); + Logger::Write(LogInformation, "demo", "Got 'hello world' from address=" + sender->GetAddress() + ", identity=" + sender->GetName()); } EXPORT_COMPONENT(demo, DemoComponent); diff --git a/components/demo/democomponent.h b/components/demo/democomponent.h index 6eb93af5c..181c8ae09 100644 --- a/components/demo/democomponent.h +++ b/components/demo/democomponent.h @@ -34,7 +34,7 @@ public: private: Timer::Ptr m_DemoTimer; - VirtualEndpoint::Ptr m_Endpoint; + Endpoint::Ptr m_Endpoint; void DemoTimerHandler(void); void HelloWorldRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request); diff --git a/components/discovery/Makefile.am b/components/discovery/Makefile.am deleted file mode 100644 index 7a41350a9..000000000 --- a/components/discovery/Makefile.am +++ /dev/null @@ -1,35 +0,0 @@ -## Process this file with automake to produce Makefile.in - -pkglib_LTLIBRARIES = \ - discovery.la - -discovery_la_SOURCES = \ - discoverycomponent.cpp \ - discoverycomponent.h \ - discoverymessage.cpp \ - discoverymessage.h \ - i2-discovery.h - -discovery_la_CPPFLAGS = \ - $(BOOST_CPPFLAGS) \ - -I${top_srcdir}/base \ - -I${top_srcdir}/dyn \ - -I${top_srcdir}/jsonrpc \ - -I${top_srcdir}/icinga \ - -I${top_srcdir}/cib - -discovery_la_LDFLAGS = \ - $(BOOST_LDFLAGS) \ - -module \ - -no-undefined \ - @RELEASE_INFO@ \ - @VERSION_INFO@ - -discovery_la_LIBADD = \ - $(BOOST_SIGNALS_LIB) \ - $(BOOST_THREAD_LIB) \ - ${top_builddir}/base/libbase.la \ - ${top_builddir}/dyn/libdyn.la \ - ${top_builddir}/jsonrpc/libjsonrpc.la \ - ${top_builddir}/icinga/libicinga.la \ - ${top_builddir}/cib/libcib.la diff --git a/components/discovery/discovery.vcxproj b/components/discovery/discovery.vcxproj deleted file mode 100644 index ba7b39340..000000000 --- a/components/discovery/discovery.vcxproj +++ /dev/null @@ -1,94 +0,0 @@ - - - - - Debug - Win32 - - - Release - Win32 - - - - {EAD41628-BB96-4F99-9070-8A9676801295} - Win32Proj - discovery - - - - DynamicLibrary - true - MultiByte - - - DynamicLibrary - false - true - MultiByte - - - - - - - - - - - - - true - $(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\icinga;$(SolutionDir)\cib;$(SolutionDir)\dyn;$(IncludePath) - $(OutDir);$(LibraryPath) - - - false - $(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\icinga;$(SolutionDir)\cib;$(SolutionDir)\dyn;$(IncludePath) - $(OutDir);$(LibraryPath) - - - - Disabled - WIN32;_DEBUG;_WINDOWS;_USRDLL;DISCOVERY_EXPORTS;%(PreprocessorDefinitions) - Level3 - false - true - - - Windows - true - base.lib;jsonrpc.lib;icinga.lib;cib.lib;%(AdditionalDependencies) - - - - - MaxSpeed - true - true - WIN32;NDEBUG;_WINDOWS;_USRDLL;DISCOVERY_EXPORTS;%(PreprocessorDefinitions) - Level3 - false - true - - - Windows - true - true - true - base.lib;jsonrpc.lib;icinga.lib;cib.lib;%(AdditionalDependencies) - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/components/discovery/discovery.vcxproj.filters b/components/discovery/discovery.vcxproj.filters deleted file mode 100644 index a356d8a92..000000000 --- a/components/discovery/discovery.vcxproj.filters +++ /dev/null @@ -1,30 +0,0 @@ - - - - - Quelldateien - - - Quelldateien - - - - - Headerdateien - - - Headerdateien - - - Headerdateien - - - - - {53341f7e-6bad-4cf1-92cf-be906efe1704} - - - {c7b2deba-743b-4449-ae46-0b7ba1b1350a} - - - \ No newline at end of file diff --git a/components/discovery/discoverycomponent.cpp b/components/discovery/discoverycomponent.cpp deleted file mode 100644 index cdbfddb88..000000000 --- a/components/discovery/discoverycomponent.cpp +++ /dev/null @@ -1,448 +0,0 @@ -/****************************************************************************** - * Icinga 2 * - * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * - * * - * This program is free software; you can redistribute it and/or * - * modify it under the terms of the GNU General Public License * - * as published by the Free Software Foundation; either version 2 * - * of the License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU General Public License * - * along with this program; if not, write to the Free Software Foundation * - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * - ******************************************************************************/ - -#include "i2-discovery.h" - -using namespace icinga; - -/** - * Starts the discovery component. - */ -void DiscoveryComponent::Start(void) -{ - m_Endpoint = boost::make_shared(); - - m_Endpoint->RegisterTopicHandler("discovery::RegisterComponent", - boost::bind(&DiscoveryComponent::RegisterComponentMessageHandler, this, _2, _3)); - - m_Endpoint->RegisterTopicHandler("discovery::NewComponent", - boost::bind(&DiscoveryComponent::NewComponentMessageHandler, this, _3)); - - m_Endpoint->RegisterTopicHandler("discovery::Welcome", - boost::bind(&DiscoveryComponent::WelcomeMessageHandler, this, _2, _3)); - - EndpointManager::GetInstance()->ForEachEndpoint(boost::bind(&DiscoveryComponent::NewEndpointHandler, this, _2)); - EndpointManager::GetInstance()->OnNewEndpoint.connect(boost::bind(&DiscoveryComponent::NewEndpointHandler, this, _2)); - - EndpointManager::GetInstance()->RegisterEndpoint(m_Endpoint); - - /* create the reconnect timer */ - m_DiscoveryTimer = boost::make_shared(); - m_DiscoveryTimer->SetInterval(30); - m_DiscoveryTimer->OnTimerExpired.connect(boost::bind(&DiscoveryComponent::DiscoveryTimerHandler, this)); - m_DiscoveryTimer->Start(); - - /* call the timer as soon as possible */ - m_DiscoveryTimer->Reschedule(0); -} - -/** - * Stops the discovery component. - */ -void DiscoveryComponent::Stop(void) -{ - EndpointManager::Ptr mgr = EndpointManager::GetInstance(); - - if (mgr) - mgr->UnregisterEndpoint(m_Endpoint); -} - -/** - * Checks whether the specified endpoint is already connected - * and disconnects older endpoints. - * - * @param self The endpoint that is to be checked. - * @param other The other endpoint. - */ -void DiscoveryComponent::CheckExistingEndpoint(const Endpoint::Ptr& self, const Endpoint::Ptr& other) -{ - if (self == other) - return; - - if (!other->IsConnected()) - return; - - if (self->GetIdentity() == other->GetIdentity()) { - Logger::Write(LogWarning, "discovery", "Detected duplicate identity:" + other->GetIdentity() + " - Disconnecting old endpoint."); - - other->Stop(); - EndpointManager::GetInstance()->UnregisterEndpoint(other); - } -} - -/** - * Deals with a new endpoint. - * - * @param endpoint The endpoint. - */ -void DiscoveryComponent::NewEndpointHandler(const Endpoint::Ptr& endpoint) -{ - /* immediately finish session setup for local endpoints */ - if (endpoint->IsLocal()) { - endpoint->OnSessionEstablished(endpoint); - return; - } - - String identity = endpoint->GetIdentity(); - - if (identity == EndpointManager::GetInstance()->GetIdentity()) { - Logger::Write(LogWarning, "discovery", "Detected loop-back connection - Disconnecting endpoint."); - - endpoint->Stop(); - EndpointManager::GetInstance()->UnregisterEndpoint(endpoint); - - return; - } - - EndpointManager::GetInstance()->ForEachEndpoint(boost::bind(&DiscoveryComponent::CheckExistingEndpoint, this, endpoint, _2)); - - // we assume the other component _always_ wants - // discovery::RegisterComponent messages from us - endpoint->RegisterSubscription("discovery::RegisterComponent"); - - // send a discovery::RegisterComponent message, if the - // other component is a broker this makes sure - // the broker knows about our message types - SendDiscoveryMessage("discovery::RegisterComponent", EndpointManager::GetInstance()->GetIdentity(), endpoint); - - map::iterator ic; - - // we assume the other component _always_ wants - // discovery::NewComponent messages from us - endpoint->RegisterSubscription("discovery::NewComponent"); - - // send discovery::NewComponent message for ourselves - SendDiscoveryMessage("discovery::NewComponent", EndpointManager::GetInstance()->GetIdentity(), endpoint); - - // send discovery::NewComponent messages for all components - // we know about - for (ic = m_Components.begin(); ic != m_Components.end(); ic++) { - SendDiscoveryMessage("discovery::NewComponent", ic->first, endpoint); - } - - // check if we already know the other component - ic = m_Components.find(endpoint->GetIdentity()); - - if (ic == m_Components.end()) { - // we don't know the other component yet, so - // wait until we get a discovery::NewComponent message - // from a broker - return; - } - - // register published/subscribed topics for this endpoint - ComponentDiscoveryInfo::Ptr info = ic->second; - BOOST_FOREACH(String subscription, info->Subscriptions) { - endpoint->RegisterSubscription(subscription); - } - - FinishDiscoverySetup(endpoint); -} - -/** - * Registers message Subscriptions/sources in the specified component information object. - * - * @param neea Event arguments for the endpoint. - * @param info Component information object. - * @return 0 - */ -void DiscoveryComponent::DiscoveryEndpointHandler(const Endpoint::Ptr& endpoint, const ComponentDiscoveryInfo::Ptr& info) const -{ - Endpoint::ConstTopicIterator i; - - for (i = endpoint->BeginSubscriptions(); i != endpoint->EndSubscriptions(); i++) - info->Subscriptions.insert(*i); -} - -/** - * Retrieves the component information object for the specified component. - * - * @param component The identity of the component. - * @param info Pointer to the information object. - * @returns true if the info object was successfully retrieved, false otherwise. - */ -bool DiscoveryComponent::GetComponentDiscoveryInfo(String component, ComponentDiscoveryInfo::Ptr *info) const -{ - if (component == EndpointManager::GetInstance()->GetIdentity()) { - /* Build fake discovery info for ourselves */ - *info = boost::make_shared(); - EndpointManager::GetInstance()->ForEachEndpoint(boost::bind(&DiscoveryComponent::DiscoveryEndpointHandler, this, _2, *info)); - - (*info)->LastSeen = 0; - (*info)->Node = IcingaApplication::GetInstance()->GetNode(); - (*info)->Service = IcingaApplication::GetInstance()->GetService(); - - return true; - } - - map::const_iterator i; - - i = m_Components.find(component); - - if (i == m_Components.end()) - return false; - - *info = i->second; - return true; -} - -/** - * Processes discovery::Welcome messages. - * - * @param nrea Event arguments for the request. - * @returns 0 - */ -void DiscoveryComponent::WelcomeMessageHandler(const Endpoint::Ptr& sender, const RequestMessage& request) -{ - if (sender->HasReceivedWelcome()) - return; - - sender->SetReceivedWelcome(true); - - if (sender->HasSentWelcome()) - sender->OnSessionEstablished(sender); -} - -/** - * Finishes the welcome handshake for a new component - * by registering message Subscriptions/sources for the component - * and sending a welcome message if necessary. - * - * @param endpoint The endpoint to set up. - */ -void DiscoveryComponent::FinishDiscoverySetup(const Endpoint::Ptr& endpoint) -{ - if (endpoint->HasSentWelcome()) - return; - - // we assume the other component _always_ wants - // discovery::Welcome messages from us - endpoint->RegisterSubscription("discovery::Welcome"); - RequestMessage request; - request.SetMethod("discovery::Welcome"); - EndpointManager::GetInstance()->SendUnicastMessage(m_Endpoint, endpoint, request); - - endpoint->SetSentWelcome(true); - - if (endpoint->HasReceivedWelcome()) - endpoint->OnSessionEstablished(endpoint); -} - -/** - * Sends a discovery message for the specified identity using the - * specified message type. - * - * @param method The method to use for the message ("discovery::NewComponent" or "discovery::RegisterComponent"). - * @param identity The identity of the component for which a message should be sent. - * @param recipient The recipient of the message. A multicast message is sent if this parameter is empty. - */ -void DiscoveryComponent::SendDiscoveryMessage(const String& method, const String& identity, const Endpoint::Ptr& recipient) -{ - RequestMessage request; - request.SetMethod(method); - - DiscoveryMessage params; - request.SetParams(params); - - params.SetIdentity(identity); - - ComponentDiscoveryInfo::Ptr info; - - if (!GetComponentDiscoveryInfo(identity, &info)) - return; - - if (!info->Node.IsEmpty() && !info->Service.IsEmpty()) { - params.SetNode(info->Node); - params.SetService(info->Service); - } - - set::iterator i; - Dictionary::Ptr subscriptions = boost::make_shared(); - BOOST_FOREACH(String subscription, info->Subscriptions) { - subscriptions->Add(subscription); - } - - params.SetSubscriptions(subscriptions); - - if (recipient) - EndpointManager::GetInstance()->SendUnicastMessage(m_Endpoint, recipient, request); - else - EndpointManager::GetInstance()->SendMulticastMessage(m_Endpoint, request); -} - -/** - * Processes a discovery message by registering the component in the - * discovery component registry. - * - * @param identity The authorative identity of the component. - * @param message The discovery message. - * @param trusted Whether the message comes from a trusted source (i.e. a broker). - */ -void DiscoveryComponent::ProcessDiscoveryMessage(const String& identity, const DiscoveryMessage& message, bool trusted) -{ - /* ignore discovery messages that are about ourselves */ - if (identity == EndpointManager::GetInstance()->GetIdentity()) - return; - - ComponentDiscoveryInfo::Ptr info = boost::make_shared(); - - info->LastSeen = Utility::GetTime(); - - String node; - if (message.GetNode(&node) && !node.IsEmpty()) - info->Node = node; - - String service; - if (message.GetService(&service) && !service.IsEmpty()) - info->Service = service; - - DynamicObject::Ptr endpointConfig = DynamicObject::GetObject("Endpoint", identity); - Dictionary::Ptr roles; - if (endpointConfig) - roles = endpointConfig->Get("roles"); - - Endpoint::Ptr endpoint = EndpointManager::GetInstance()->GetEndpointByIdentity(identity); - - Dictionary::Ptr subscriptions; - if (message.GetSubscriptions(&subscriptions)) { - Value subscription; - BOOST_FOREACH(tie(tuples::ignore, subscription), subscriptions) { - info->Subscriptions.insert(subscription); - if (endpoint) - endpoint->RegisterSubscription(subscription); - } - } - - map::iterator i; - - i = m_Components.find(identity); - - if (i != m_Components.end()) - m_Components.erase(i); - - m_Components[identity] = info; - - SendDiscoveryMessage("discovery::NewComponent", identity, Endpoint::Ptr()); - - /* don't send a welcome message for discovery::NewComponent messages */ - if (endpoint && !trusted) - FinishDiscoverySetup(endpoint); -} - -/** - * Processes "discovery::NewComponent" messages. - * - * @param nrea Event arguments for the request. - */ -void DiscoveryComponent::NewComponentMessageHandler(const RequestMessage& request) -{ - DiscoveryMessage message; - request.GetParams(&message); - - String identity; - if (!message.GetIdentity(&identity)) - return; - - ProcessDiscoveryMessage(identity, message, true); -} - -/** - * Processes "discovery::RegisterComponent" messages. - * - * @param nrea Event arguments for the request. - */ -void DiscoveryComponent::RegisterComponentMessageHandler(const Endpoint::Ptr& sender, const RequestMessage& request) -{ - DiscoveryMessage message; - request.GetParams(&message); - ProcessDiscoveryMessage(sender->GetIdentity(), message, false); -} - -/** - * Checks whether we have to reconnect to other components and removes stale - * components from the registry. - */ -void DiscoveryComponent::DiscoveryTimerHandler(void) -{ - EndpointManager::Ptr endpointManager = EndpointManager::GetInstance(); - - double now = Utility::GetTime(); - - /* check whether we have to reconnect to one of our upstream endpoints */ - DynamicObject::Ptr object; - BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) { - /* Check if we're already connected to this endpoint. */ - if (endpointManager->GetEndpointByIdentity(object->GetName())) - continue; - - String node = object->Get("node"); - String service = object->Get("service"); - if (!node.IsEmpty() && !service.IsEmpty()) { - /* reconnect to this endpoint */ - endpointManager->AddConnection(node, service); - } - } - - map::iterator curr, i; - for (i = m_Components.begin(); i != m_Components.end(); ) { - const String& identity = i->first; - const ComponentDiscoveryInfo::Ptr& info = i->second; - - curr = i; - i++; - - /* there's no need to reconnect to ourself */ - if (identity == EndpointManager::GetInstance()->GetIdentity()) - continue; - - /* for explicitly-configured upstream endpoints - * we prefer to use the node/service from the - * config object - which is what the for loop above does */ - if (DynamicObject::GetObject("endpoint", identity)) - continue; - - if (info->LastSeen < now - DiscoveryComponent::RegistrationTTL) { - /* unregister this component if its registration has expired */ - m_Components.erase(curr); - continue; - } - - /* send discovery message to all connected components to - refresh their TTL for this component */ - SendDiscoveryMessage("discovery::NewComponent", identity, Endpoint::Ptr()); - - Endpoint::Ptr endpoint = endpointManager->GetEndpointByIdentity(identity); - if (endpoint && endpoint->IsConnected()) { - /* update LastSeen if we're still connected to this endpoint */ - info->LastSeen = now; - } else { - /* try and reconnect to this component */ - try { - if (!info->Node.IsEmpty() && !info->Service.IsEmpty()) - endpointManager->AddConnection(info->Node, info->Service); - } catch (const exception& ex) { - stringstream msgbuf; - msgbuf << "Exception while trying to reconnect to endpoint '" << endpoint->GetIdentity() << "': " << ex.what();; - Logger::Write(LogInformation, "discovery", msgbuf.str()); - } - } - } -} - -EXPORT_COMPONENT(discovery, DiscoveryComponent); diff --git a/components/discovery/discoverycomponent.h b/components/discovery/discoverycomponent.h deleted file mode 100644 index 1160a5884..000000000 --- a/components/discovery/discoverycomponent.h +++ /dev/null @@ -1,82 +0,0 @@ -/****************************************************************************** - * Icinga 2 * - * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * - * * - * This program is free software; you can redistribute it and/or * - * modify it under the terms of the GNU General Public License * - * as published by the Free Software Foundation; either version 2 * - * of the License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU General Public License * - * along with this program; if not, write to the Free Software Foundation * - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * - ******************************************************************************/ - -#ifndef DISCOVERYCOMPONENT_H -#define DISCOVERYCOMPONENT_H - -namespace icinga -{ - -/** - * @ingroup discovery - */ -class ComponentDiscoveryInfo : public Object -{ -public: - typedef shared_ptr Ptr; - typedef weak_ptr WeakPtr; - - String Node; - String Service; - - set Subscriptions; - set Publications; - - double LastSeen; -}; - -/** - * @ingroup discovery - */ -class DiscoveryComponent : public IComponent -{ -public: - virtual void Start(void); - virtual void Stop(void); - -private: - VirtualEndpoint::Ptr m_Endpoint; - map m_Components; - Timer::Ptr m_DiscoveryTimer; - - void NewEndpointHandler(const Endpoint::Ptr& endpoint); - - void NewComponentMessageHandler(const RequestMessage& request); - void RegisterComponentMessageHandler(const Endpoint::Ptr& sender, const RequestMessage& request); - - void WelcomeMessageHandler(const Endpoint::Ptr& sender, const RequestMessage& request); - - void SendDiscoveryMessage(const String& method, const String& identity, const Endpoint::Ptr& recipient); - void ProcessDiscoveryMessage(const String& identity, const DiscoveryMessage& message, bool trusted); - - bool GetComponentDiscoveryInfo(String component, ComponentDiscoveryInfo::Ptr *info) const; - - void CheckExistingEndpoint(const Endpoint::Ptr& self, const Endpoint::Ptr& other); - void DiscoveryEndpointHandler(const Endpoint::Ptr& endpoint, const ComponentDiscoveryInfo::Ptr& info) const; - - void DiscoveryTimerHandler(void); - - void FinishDiscoverySetup(const Endpoint::Ptr& endpoint); - - static const int RegistrationTTL = 300; -}; - -} - -#endif /* DISCOVERYCOMPONENT_H */ diff --git a/components/discovery/discoverymessage.cpp b/components/discovery/discoverymessage.cpp deleted file mode 100644 index fde9df491..000000000 --- a/components/discovery/discoverymessage.cpp +++ /dev/null @@ -1,71 +0,0 @@ -/****************************************************************************** - * Icinga 2 * - * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * - * * - * This program is free software; you can redistribute it and/or * - * modify it under the terms of the GNU General Public License * - * as published by the Free Software Foundation; either version 2 * - * of the License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU General Public License * - * along with this program; if not, write to the Free Software Foundation * - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * - ******************************************************************************/ - -#include "i2-discovery.h" - -using namespace icinga; - -DiscoveryMessage::DiscoveryMessage(void) - : MessagePart() -{ } - -DiscoveryMessage::DiscoveryMessage(const MessagePart& message) - : MessagePart(message) -{ } - -bool DiscoveryMessage::GetIdentity(String *value) const -{ - return Get("identity", value); -} - -void DiscoveryMessage::SetIdentity(const String& value) -{ - Set("identity", value); -} - -bool DiscoveryMessage::GetNode(String *value) const -{ - return Get("node", value); -} - -void DiscoveryMessage::SetNode(const String& value) -{ - Set("node", value); -} - -bool DiscoveryMessage::GetService(String *value) const -{ - return Get("service", value); -} - -void DiscoveryMessage::SetService(const String& value) -{ - Set("service", value); -} - -bool DiscoveryMessage::GetSubscriptions(Dictionary::Ptr *value) const -{ - return Get("subscriptions", value); -} - -void DiscoveryMessage::SetSubscriptions(const Dictionary::Ptr& value) -{ - Set("subscriptions", value); -} - diff --git a/components/discovery/discoverymessage.h b/components/discovery/discoverymessage.h deleted file mode 100644 index 6f984b451..000000000 --- a/components/discovery/discoverymessage.h +++ /dev/null @@ -1,50 +0,0 @@ -/****************************************************************************** - * Icinga 2 * - * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * - * * - * This program is free software; you can redistribute it and/or * - * modify it under the terms of the GNU General Public License * - * as published by the Free Software Foundation; either version 2 * - * of the License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU General Public License * - * along with this program; if not, write to the Free Software Foundation * - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * - ******************************************************************************/ - -#ifndef DISCOVERYMESSAGE_H -#define DISCOVERYMESSAGE_H - -namespace icinga -{ - -/** - * @ingroup discovery - */ -class DiscoveryMessage : public MessagePart -{ -public: - DiscoveryMessage(void); - DiscoveryMessage(const MessagePart& message); - - bool GetIdentity(String *value) const; - void SetIdentity(const String& value); - - bool GetNode(String *value) const; - void SetNode(const String& value); - - bool GetService(String *value) const; - void SetService(const String& value); - - bool GetSubscriptions(Dictionary::Ptr *value) const; - void SetSubscriptions(const Dictionary::Ptr& value); -}; - -} - -#endif /* SUBSCRIPTIONMESSAGE_H */ diff --git a/components/discovery/i2-discovery.h b/components/discovery/i2-discovery.h deleted file mode 100644 index 8d1139be0..000000000 --- a/components/discovery/i2-discovery.h +++ /dev/null @@ -1,38 +0,0 @@ -/****************************************************************************** - * Icinga 2 * - * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * - * * - * This program is free software; you can redistribute it and/or * - * modify it under the terms of the GNU General Public License * - * as published by the Free Software Foundation; either version 2 * - * of the License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU General Public License * - * along with this program; if not, write to the Free Software Foundation * - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * - ******************************************************************************/ - -#ifndef I2DISCOVERY_H -#define I2DISCOVERY_H - -/** - * @defgroup discovery Discovery component - * - * The Discovery component takes care of connecting peers to each other - * and performs authorisation checks for the message subscriptions. - */ - -#include -#include -#include -#include - -#include "discoverymessage.h" -#include "discoverycomponent.h" - -#endif /* I2DISCOVERY_H */ diff --git a/configure.ac b/configure.ac index c9c39f6bd..859abae8f 100644 --- a/configure.ac +++ b/configure.ac @@ -76,7 +76,6 @@ components/compat/Makefile components/convenience/Makefile components/delegation/Makefile components/demo/Makefile -components/discovery/Makefile dyn/Makefile icinga/Makefile icinga-app/Makefile diff --git a/doc/icinga2-config.odt b/doc/icinga2-config.odt index d9d449c3fa16057341199b41f6032fe52e10ecb0..8b93cea751fdcc0e0aff36dfbf770ef0fe71fb55 100644 GIT binary patch delta 11342 zcmY*fWl*KNvPB0QWaIAc?rwv-ySqCKu#v%?4THPG;O>LFySuv%&O7Ja`|i81tG-IF zPI`Swf zFozP(P~Er9$J{^aQ)tu$%DHaD(?e|lnf)Qj;?I_FuW}s7CuivpdJfL?%`N2SOEr$} zo>$&(zb-WR_`ABQ%3oFyJt!+Y-D_!{%w6e|><%XQk?70qp66?{mluC(DriLcVx>oo zzVjlTU2JVt;THlGS_7vC56#?H7%!?s8SxzP4>L;DA3OhSdHPS^WkYcGmezFvO@9J5 z0MsSPuTgPJL}yFQ3;KLRYnQ$K=Ntxf7JouuESi?C4-Ou}GE4@$!Ez=anO@(--rq=b z9(KMSLBdkcVP?7D{DuhH zFuItk+b8G^(;>vVRj6rHvqx_R_B6p|u`Mq;SX^c;ok`sCA~2;i>RgaE>rV_pjE2dY zEF$)z%>~PqY7t}BZ-|>RkB2`TEVwJ22?6>5E31=jIJByz%8SP>&KM!YI8tD7&_l?9D!KLS)#{p zi7nA1lJM8c?cGO;Z+UL&1rs)(h$0u#bMOhrz|LE~G*zhLJVuy3fI*pG;knxp%vLeZ zOOs;4aDRTO*+BmZRH;+4A$&tf$Sy3%b9MG0Z2EK~_fx^IgH_Wt=7vIR566ypDr7j7 zr<~X17Lm?hz95uDk?f7|F>EBQU^M$7o!%(SU64f53b@7?VkDJp_trzXlA&z-9R&GM zxXmc6ow=6B8Ypma%@%6RCUdVk3r20CyL3q(Km0zOI8|kw3^v?HxNQU_<7ztcNE*_p%ikF7~3Pz zA#hsqv^LMb?^N-rM!>J?S)1auB~9|qnv=~Z^iLE=L_S%nhi8_tnPgJBW2=VTC{(Cu z2+0wIMquy*sX3`o7X++3+W>?Dk?Z2y>kDGi(N|pHghm`?(>5Nrgjl zo@ObwSnw~}jKc}YLLwx90Y~oqLMFz2=gfQMjHa5WKPil$IWMk?#X_`wjlkN69OH)_ zmLv2`n$sIr*xT^L`5!bXP1pxb*VMtmS6Er9pOc@!_|BfVdsk=0CMX-M17aE{`zC5} ztq6c_E0Vi^kwVv)=d1+gSJ+kRf!Ht9bdx#udA!8tx7A(t-=-L09nFm%D~$E2yDMo5-}(Bro}XE2(iE1{cNF3aWyTbc`El5x3$EjJ&*}$$ z5Fuy+_iElv*97jQN8{J)gaD$Dxs%Jg8*Gv*6A6(G_4-&DsQkD0p~eCys!I?(QTgB0 zb?HGG0*Wu@{z1Pk-?mHAA5=?U4V7eJU`K@_TRWh^z~m^v!2We!z{0}*eO`!w{g)y9 z<(oP>xS2b+F?!kCU4VYWBLn9bG_$-EoU2K(W0PmV=g7Prop;D&fb@)Jn17_VHuHDV-~@U1L^n) zzcL~Dcf2q1@I9SX0AF;zepxpcP;3FQAfOImh(LT&P4k6+KfM^}AqT?y1P$_ELc5Lc z8tgub4@1mVST2+9T%oOjk zJ7|?ZWW%R-Xu`WFwg&Q9z%fJ;H8m>3Q|<{y!Gl66-2y9aHfn0b*AX@22MK0MYwU&x zy&pX~ch4_;JhoWmCF#6e7T>^_4ups{E$dKXqGChrT?<{v2r;vZZS4PGi58Zcz%&X| zv}I|Wm6rMIR+iXiD3WE@4BK2**z2bTH53<{i6FHn;6DwH7D^rS6{>nUYW;4kPPfTJC-i1eNS&(+dqc8mPcJKkR85(!D1OQl4_{`Z-@4M0<&D-u&LXfP`2bZNPH{dN^NG z;YL0&X^=f^=e81_7Nv@3O6-$mRE64b%qmGEQ8u}U(gVTTdT9v|;v__>q=(V4&{qPs z*s7rmy$sproI#=va1e6J-X9hSw=CQMWU^1P+MC)pi!dOJD@u*t@?`?eZd-+ zrpFC~;(^{>KF408dtRVCwh;#6b!aQV)b5WMuAbq<)pYW9Y=sH+a$TVk5Y$tq(^7(y zN*3ei#2otg;RU@Jjw$=?D06!3Z3lqQ zyh|vKj=zqDAlyJo03)jjx6(C&g#SDWHYsecUWC*TKMXD+1li?XLiWS5Ir7N~Y61el z{OU~F8}|Ark(di=YtV<*(Q}Bxe8_uhtM5Kf2Ws(se>A;B=2#Nf7~?qr4qZ7rgl zU(3@NJTAgKR`V>Hmg|XJf#MTyywaqbz#Q_poXKJ4l}HCcgNk$B5-M|F3xp;6(nA<$ zhCS9bxA#6Ke3F=B=KP)EjsO^~1Tr9Z`BJm! zHc|(8hDRD1%B3%_j&?hJe8yw>&=!?*FAN*imlyDP8o85QBjNYdWMRKRZwV!!qUXEH zBWa>{hxr%aD7x!Q@025;91AIi2T`MY&A3zR-F^$>g+t0 zb^_@z0Ca}n*yLh@87tdV9cAjJ2i+&TQ(I~3QD-`?q8;%J>}&U+l5@!qh>OXj2d2qH z`0D!>m#AC#5U|isSLC|91icew~qbz*0}Upuf3B4w^$;-pbR-w z<9l7yTr3Xm37Y+Q0Tkq93BtpOUH73<9(v3K>Qbr4Rp>clWu>tWe`)c0<2bHzh$sD( zf&R_AXB9)#dui;}qsOjNqQ0;o8lTdB}kNG%c_Je^0UPnRugh#s9qu>}_Yx;B%$9~YLs zt(E)Qn#e#_6Q!d1XJD3XzVTQX?ziLgeZL9@XOu@8 z&D+kS^-KQF=XGe9{CECjc3LvaS%bcdeHtU5>C!V>JMe3}0`5lQ`mBJ*AiW%ifa{pk z+1oGncC{4wXYR+)$pPWo#(ked zt+b4IF1z25G~2FDeG)Ct3`OW8-~}V(K}E8Vq~vNPil&DEuCPuOFhm#VN++hDpT0t`DMZlO1CTv-#|D34F(ycOGlE+rfb0p_-g8Mq z$$FDY)BEvF+31vm`o|%i5(PUY7L8oMow^9~1ZrOk*`fajx#IMuGST2!QzVjUaD*1> zgdOw69)!_P2e{-Of4L1Q;^cCrWy+?bKs9TZ<|)3I#}Vy<1>t;6us7KmL1^6KwH@1G z6QF%sg(nGp+yDi1#e0EN>I~`hv3lHkFRxU}Ql0t_gdikH6p_na-t<_n-a_Yo_2Yy9 zc>=vF=Yq?Tj-pGcYRy(!NF<-B96)SwJTf=^tKeJj06zlGGpbft1&Y_BWeCo1kfPs? z3b4?SWB88_bc--W$Ka3K0Hu6s7&_7DA>e!j&ZTCc9x8i?_}B2JJhmX46k6N?L}_93 zoxpG?*vpw0W^*(blc(%nXjH&e_z0?5lRv87e z9i5(d(@a zjat#kg@6JolOxGhkHuA#mb%M_Do`yV_aLzxM%~v9{Y!wC+kEAA&Smm3UP^Qk{!jo? zp)aoqIKTNN)PO$}I64KP=Zt#6O!V;9wCOIDdC|JRN^egkK5GUjxJbEm3~1T+HF%@7%zGn?g@7#nH6n;Uq*}1 zS_tiIZsYI5vj1#|P38cr9DtIHHv2o57dXpVq;Ppf1d5&(Rkuftwyc`yAgq%1>+;jx z>oF=O+s|qLIoK@k2dd|y!nX-~{r$U;eRIxXbW>=yVbv7%ER-l|q7LvhUrE&b*8m!h zCk9J|Zk9XfpbG_3`p?i~5vc1a_*s#XDQsJ?;c-jQrN~7p1$8G``2yLo8b!rC<(YYaw==Go~M*}x1hu`gtT?f`p1ZvZr* zI%wwbN=#DGgCu`|70B00KUTbwWm^wOH4szboB(aPMw zmA4bxsQ7AATpwjMnr_+NZzaycE+I)WIGKxoj}0WALsW{B*5Xv+1Q+PGDpry167R-d6uZU0hHPV%8Cu`B;DxGtck zUi-{a+^5eC33Sy|QD1Uxh*Mv^95h&1=TPPtAkZvsQA&nAyA^A7&SZvVccOCR-Myqf zviW0aai`Y!XI8~hpQJQr%m!f`QsmpiP415|QZc}=s3Z%b2f%n~P*z*JH7DA)&(s$i z6doezIhUxR{+Rz%o5)qCCQ0>}*dV;j3k&qk!(Br0GhpeI$D=e7i_ zRoXV;@GLT;X}k_*&XaGhzfa7yAV*pP!#f}j+O!&m(;9RPTqUa?yohj`m8h0@>>D^M z5_Nz00~3R;9#+4TV7GIlLnwz-I#m*Et3jA4r!~PFsSdHfL=O}@pVI6o)3!hr$r|J#{Pp9a4f1pi1JKBJ$=YL`_*>VMkrb(CY*T3 zdLc_tE0hiV8^kXQol+Pt-B5ITQ2E+i1$l+xJP@382uVZTw2d#q+(Um3F)c)CR*XuR zuk0w^Ned@}sR$I}sC2YbeAs73_uYbyYzM!j)&=Ef%Jr36l}_8*X#*j%G6q1>V>|;y zQINbiGV28)6Z5kD?8TNu=nkdia|TPXWBQY5tZ-s(F61Yfp+tT;uVab{eRrUixu(&( zF_2Ne90xRZcrcdoBFd(Tthp?0+eNJ{Gv`au?0ij~hEoY9a3tJeg?VvXqg*lOOg>Qz zPwiymPft6=r0pD8SH4h6>!TTXrq}_G!*p^nHG==AejP(I0PAP1Z)C;t9)LM!i$!l} zM2cxkkQT0nLT?4^yooz}w5-TSsqdo~0$8OkZVboe*Z9(wYrX4*C~jOvRrBL~ z>15zbh@#<{l5H%Ex^^V=lRzbwL6|kOo%H@3Ve+Tj^rnvbRwQLXKm3-}Up z!~^sFg1(*Fr*EKk$C#J%EsLm~8XBnJ>eUa&{Uj^7`|d`*QuxHLNJ!C*`T`>Hx8RKW z%m882e4*2tf}#oui+m(~)I9@Ma{)FEDDyKg3h!Z0<8Ob8@Ce_n5oL1hJM0G&IBoQE774kp%18lK=F zL7C?Yn!ongZnxr$#i(uc#0N-7F|$Mur6!;>L%sl_aOO&6tQC;*40AAgSkt2 zlHqGOQpu#tF&kQ4uCf6xDH#!9;xzpvav~=hY*o56?yoDK38APvdF6b!Tm9d{teGaN zdpJ>nk@x{h%Xre{Q{lw@^jEl z6%bjz6}7|MSzgFY;m`oI8!E*nlVxmBxEO=xNa7H&SW_RxTJj`VK$|llnvf{B;nWsB zsr@iJE(8|?XyO^`WLNhuaxb=gi424E7N#&w-wiMg{ZLI@=pK>Aa(@75wa)7%JS3Z? z;3l&!C9diKz@>5bRSq`IPKCll)M^cPro1r4AX@{`v`;+|ikpBz9-ZpPbVZ6)p@HGd zOgWQu@?1L#Gn@%rb;{-tri9lwW|_UiHj0PbWCs)1AuzNps@@#WRx)J2kjA|ZAh@f3 zzUJExKRAxlaEE>u9hhGuvs!leo)+4PzNZ9bv^>g6;%ImAoly@`MTJ2?kd&_G9ub7Z zU5Z$boSq-RujK(m3?+zp4jPSVPF>l{g<~_W?Yu})3=wU17OT{-?O*p;9Fze{2eD@5 z%mnazKucZN1+SY4q2}Nms;v&V5`~1g8>kS9X|<32c(C6N2}(&(pJQs~vFTNvxJaUt zfkzzsT;uD2vuV6jJ~?#o0Z;*-rnn z*p#;Pn?TNYsi6*Mb+dZ+`r`nNP`ssu@K~^mklUab#-Thx%5yBzSnKN`;W|_n0ZXCg zqZxT|`6fjauG#OxhR{6Q8EAtBmGN!v=ITp1(O)IZn89j1;}AxIJMC@@Qx~g_rg)=z z_557K&ok-3NsUo4ZB|qRf``1}TQ`Wnb(*UO!C4Zl8;Y)qF5BA9*Wv?$cH-VI7kzG6 zhc#t7{XN{|lYB0v3+p7#j3XqE8Bp8ueTpJT(0`C7C@^&yb}C6Hp&wJdx^bi=In<+C zChG_pVUBy<~ z@{BGT+fokJG0woIdBs^7?x6OgP_Uj^U8j(hw^T1tF{g|Mu3L7&hGG*;r5q^T2?WtI zbI^PTrVrySc{YWSFnB)T-`h5d1gGAiFKJTXO>O-&NpLOL$M{A zi4~wo%`~Qj6!4bI*aFo8UM@erzVO>i=mWJxWHZU|)1k%wt)foBp3>x0Q5}^wmv)A} zdn;G2yeofPNe`nkOw9j9gyOdmGWdCa?YE+6E5OZm`=+U zsDgNl_D3K^(%{3NfIZs%Li}*{%NB|8*yL^FZWu$LdAG3{G80$C@3r!sI|(i#A-xiA z`x7=vBP6XiYt-kLC~0Zb$MlI4HydDNnYPLGx}KRrqsH+=33~G-63z3fWK4>MzWe@x z{^b4;(WdQ2Ip{EDOMjtQTC=4f&B{&~=#yM$7}y1+N{!?5_B1j*XFj6DA9}~^j;VM# z?$>Iq%JHMPV#*0Yet-{q@!I}`48JeZ_GXT%^)a{cFl+n*BHHf;tLgy$?%$==-rxUU|#PK2&K^dcq`jDco;d+;s_6dB56_ntFz9MZ2f-E z!tXX4aG}tKQ!=>65x8=pD@m{XL?hubl>6!~L&)us7#Fz77Qo57S?UTq4Tra@w$)aY z5$(kFSS1hrf>JB8g|S2?xMM-{u&K8m4cAwbKrWTiIS$-NGGtdV%d=ht;B%guHkv0@ zwylcQ(kBA_teCX;^RlX==I|dW zmbZGFte)22FwFYGqZ+K-V!>}?<5PWUro3XFoPMyx(DP^Z8X{l`jfVILJ$_$THOz5r zUag$_>sEe^)eySb*?UW{2FT0%BoMq1ZkSV4fmAyhYiH%6B|d8rR)O);T;fO&j_^=W z*fGWc1rLYY;zv1LImb7@D_BaWgpU!Ik&iA7}mdx~*^Wgxqh+5skWOH@(?raIF<$KdSg%W`PzD7SDb@)5YIDC@Je$ zl{T+&9EKWN{j)-ZG@>rwp%i=hs$QZ8D?VZVej_8-=861`3kF6w`@e5w ze`_lK)?QFsggBZ{atclakjY{3uyPr%TUR-ON71uv8 ze=D|swxRq#c)m0t^Nx2g*vj%Va&77%5}b5?PDyoI~FQ-RrjP zCnU~J=>EbU#4FNlq$|>VR3Rg!?HwVH0Gg`Xs3mk|;^p+Aa) zO-tJHeeV7lPam{-hX#3CcJ6OpV*&%+?4rIMiqcT`zTK7-jCt*GlKI~=oiIV3LXEdm zE$@y;iDe!qb$Zub99-8P`R`E20iveLZkVDIm!E@f zXIo2n2yd(bnGId<2Lk&1`t`R<{BeE|iC(k(u-_rL+YtPX=A7xc@-uCWR&Lo%7&o z8@!5U3&@u|E1fRxm6LF_4RMR8m#?{zX`i(imo6s`b9B6{&|#w!4Z^jgENhMcao$7o zYGr8LT!a|>ZkF7|iP~kP(ifAcd$>j#&&b^xA(aUkT7Gf!JoMLRt2mJGUGh&i^{=mO zwSL$E%sczV!iMvh?s8)W4777$)2$q3ah{Pvh0BZ(QwqkBO;l%C=;q(cRN}P0o7s?+ z)Wi|&=qh&pB&TLz$e~3Mo84pp+R7$w|4|w!(}Kj0=})EANW#=L!;H82g(EqA3n~8B zu}J#Dn3o|X?&&bf-{ruA(L#nf5$okw&t;m#&MVpu?k^Ul;ydE2JJ?Rxu%%;u$IZi*e6^R^+XG2q!Z-U2(VukL5N}_gm+t;Ni3$&}WNyL1rOY z*fvG97>0omRUl5NozdnM`zG|o&z9PUK+Nm&Su5YGTk2-hYAr!awYiY4p3?8_ob!wQ zz)@0eCBW&MK3sH@0J?4_86emo(u7(jdtTy;nGCB+h0x4iByF5m5maPAsD5u(&r6B^ zY)c{*8tk|pJ%rz}`*go-{fT`VII!w-o%vr>#8q5zlalX@ns5Ein zu7_5WONUe}U zSrdP3#oaY&1R0tdFYvKu0{A@A+9U5ti+0%f)#il72T5q+Bj-TSec6x4EMQZ3u#6`Q zN!M@JeOKzFfLSEgJicry?&T+oUhKrIjtM#Q_yGtR*x#sj2K(pLe@n1$!rFj~Tty4I zT2f9wsI;lFkR}2~%Yz(~=x*5P;%Iu6{b(jQtnKv$y4tnew!+Jo9Jt0-T|AQZ&6t{C z-K^d;{3DOrAxybgV4_O2&_6jL&OGoGnCFcO_hx^z0PL_S4 zavVPizegm!TDjDl0u~=@P!GU47PAjOIN+9w!OHhDzKJZCsmn!=4anBM*bcC#KKEo8 z$l1#vsETp(6-Pyxw0RArhH^p1FaAKvcX>MxFwjSzuF(8??C!-8U?p}UTn=kFbpMi{ zYS3*W=jiYgGL?BGW43o51vnXdG9kgz>HQbw*vFTV zVmD^>AN8hf9n{5Rn$aijK%Pce0`Y2~6ia6(b#8Mt_H@P?i8!7NjEclxlL-VWD?xrP zk0>$+LOfTN3Q%#y?!kEsnl6>-aq~c009q z+9QR${wfGz*Fage!5tx^1P;;o0MD=>Rdnvez@4MvSf%%IdZ$?~-yIcSf<%e#bK2;V z9O(A)1lgBZKXsEc$zc?IQ{hxRlLi*U?)^$OZdIUxqImrEox!9C>x9Lzw2AqO-hD=g zuYpL!(#xk6Kd(JJ*urBCAbuYannOJXH)N;`;*hSxOQ9?vDR~?6UOTG;ZPy9Sbb9ip z%K5sXm1_6RtqiZ^*GDqPbMGr^qDaNF%|I96`eJO@e|^HC89Rb0=%8<%GZ5#T*+%7P6HmBC z-rIM14Iy=F?+Yay`%7~&&!!sM$xvP;+)H^K!S&+1Ih|G8l6}*=Pp?c_NxhM?-joC; z%lzS`>fH|dH@$K&>0A>z%&DR(Hq3S!0f`}6hKGX2v_Zlw`>EGYv|!Iq$iFM5N9zYM zPk1mesJ~Q1|4&s^PwD3Pulgv}o$Wt*RM4C|Bi!Gbqkq&$Ab1ZJqQB99*Ij=rkp9m= zARQ0ff5zl6Z%0WAgMt11VgKKNzr1J=$b;&ilBv@J4+7Q#wCaH?@HcJ4AFEkta4@i7 zNHDOk|1ZVc0u0RF*umPu+|`ZI#mquU790W{>^~A(5Q`@scne70lja{<@4wJBDA|+o zAM{rr1cMc5&y)H;(A)|{=SBU`H2?cI`2P*SvOq3gbpNE}df|aDfjYeYHR-<)1fn$v z(wp*sJ0ZlZLDJro|KWr9HxJeSUzY#? delta 11092 zcmY*fV{n~a*FLe6#%5zDjqS#bZQIsqY@XP*ZJUjA;xuO4*!iC4c`@&|XYN_LuQhA0 zKYPvISKS@N#4QAhk{lE?761Sb02~^jCZfo~|0P7Zym1i?Sb5SvTC zE+ONT`tJgpL*ka=QKKhaP@z8GR`uT+#M0ANc&o=Xj=Bk)UPWPw0cS z@A12fPSwul=F-L1T@gf-42R%~`u(l1W<sXX1n*{{bd%-XFIK5D&6ZG$nTT!gE-FQ zY%9;u&_MsyTX&}gA71LhEEfJe zLr(xPSq`h2^z@+hYgVa){t(YPbHoupHwo3BdIOkY36}nY?R@L6*3(Ig(joQVow&7L znx=cj9}}kb#kLXJQY)Kdfb#VUVz|hy4Ov;fchS!Z2whKJ9lbnV^fsE3rl`w9Bbn~O zPPRS?NZ3%r=PIpP<(#80ngOmJ0oqQ7!sz?x;Pu%~d|CZUR(CLl>XumUs0raF!}(p~ zN>)T&N`WsovptSgGOw*L3tmzuk2Q2B=!A!Al>7-O@f z(+>I7xTKbd@)2{fiCWc&cI|jxjp$m{?{kkDA)3Ay)Vn6MfZEQY7jo)iky&; zk%Y>ch;kSTC$+yCFcCuJ&Vb}M|5;td!n_%{WVa$sdM-Cz8s9OA!q*+U9i7oFuL)o= zF22U>r{32Khuz4Pkk0p!|Fo#)f@cszYH+UCXgV1p|XL-NH1;5@|m@l1N#L zpPfX3Z)ueaF$XCU1Bqlb<_1Sjjnqtpf#yJ8zXKk5S&zaDn1Gce5yWn9ayU5>RwbF3 zG)}yPX8SIO6-I`Gcbc;=^q zvz4h;OzbLK=c%$LkB8@;C4Bb#=rn8tT)Y^eSoaCmY8BPjXp+>Sc9VM?mm1uqyE>I< zG#F8x-cgKzU3AsOW(7*hk1P^CL2+TTBVh0J_r=8938!(41pCN8pWMzUOelZa>n>%< zm)AK3VN|WnkVt)^*^KltY@w)PHVfgJ(do}oo=n(udWe;#OJuBYsl#QFsvwh}IsYo^UevvvsR#0S0h@;~z!EVd@NE@T;LZ}d-o zpV3ylVsh{6IGGTsTg)EQ?IBb zEkzXvmSK=kW?@EF=CT@W!N-$GIt)5r6{b=n7-IO8oc_u_!QYt`ZbbgfYn9JNb;_nn zJRh5k1oOs-b3pq&k1R3bCoYllJTC4vPpkh@DHG$zTglWg$JR_G=V?f&x{=N6v6jCV zBM*#al212LkU$|Nr!fJ7Y7C;T7uMdu!_;gA5O@e2-`@Rt-_cH^LzEKjVuYlZ#UzWK z%4tHsS}+JJY9LX&<%L zr-jxt$pw}NOf&mqIN_G=Jh810C?z>Kcuojh6Z&%b+SoF{BqoG>p&Up^HovS;J0!h#w-shKY}{&3$akQolTF1u zHa1BVDVpCpAtcyc1@@r%xs3e`iT-U;x5&5lnb9qDY-n#}yKKWTr>Uo}_qnWY#He3? z|SDw0N!dPy2$m96+wVP?j0=`ST%ZliwG3B^5W6r6*c+1*-y%cEQucCt^ zY-d}96&EZNEe8rKb&IQ-*s5=q*hEQ77$u%>Z?+pBmAohMIRyKjfc>E|<5GVgRKGzO zOag_v=gi4bC*wlxk&0bRNm6o4ZR{U##mvf0;95keI=6y@i&~%UTXL)Igd)8sO{zM& z8aw3txr^H2anMoL-PeB%8UsCPOEY#qlP6ITciP|a*=BpmiGGZQKX6|~z;HNDlR8}d zW|gr{SEZJsX3RNt=C??USd3u}zDC4sU!HLyED7xM>)6*_bi?!A9xVMnYfP9b#3*na z*EE3(RSWRYTKf9JO5KWq~Br9cQV2$NET;}a_ST{Uo9G6xUt;jq6z z1m2Mq$=SlWUB)Gyxp3eT&ED@gUj|T8_O(&o*z;FPgRjjD<>kcy;3Dr!KC`R9(ep%A zUCR3;3FaN)pWZcJ7AXO$UtTAFW=SCI&DUWz^9(NDBBdyvqL-xw*KF&;eCS5mEZLsM zsiz8bVmZ`QzDL^UkRt&DYwKm9_8NE?IU>Ezf+z@o`yQ$Hms5)^VLH<)%#UoNR5Qho zZd0=bU?N>55)x1!1jCQ0R9mjw=O(9exG_Xb!?HamhJJ9B?(OhA^>1kH1D^pro(fl_7LZxN=S$nf z7}O8Knc$Gou1A$hUE-nyA3E8vsBS&;5|{NKCwIk#fKJ_>91}~8xE#ySr|^psHN)oJ zlKZmZ%#YIg^^P$;Xhm>%(uz;4Zg=l?|OjC-USP5A+FG`-AB`9AeT_|WqSS`lZ< zA@3O*4(AIH<)NrN{p69Aq3}yMXF)vxdQ_#Oef`0RMczbOM&Z-$Xjn|gSAn##oIi3{ zYBf(3RoB%uRSOP}ZL;J(dRJ;fS@`Ljj;0{_dNRo@ZYB<#ygDE^_f!%EAC3py@MJzU z<20tl2VZD2ndQwyjI`@>e#YtY5KnQKkI}jS$94!8;&t;5(bIMjfsba!wa=?vdojR6H#avr<3hF;(av3!8B89RVIJ#+7OiVdgswn| znO8mq(`_JWS$y8?I2$;^LCB!uqO*)f-OmDPRiXR{&bj6%`-a=wkgY}5U+_%(u*}n z`^vGMF_}p?RT_wAh$RI1Qxiv#w-9v^|2S`RCE%RL8~Z&)l4Q3zEN~FZ^?8uR0lg9z zRe=X3t#8x^J#XtA_K?P1%TlF{woup|Eud4)Ty>&$_Nsj4!U&b8qxW~I9F4_ZyU|Oh zK-ic&Yd03j29E`e?kFQDCFFy<``oRlnGEr9ZG7TqT1N8t>Y=5Nj%Dd8+t31XX6dxJ zHBhv|+vSU}uCM=zh0wRC*lo{i|3w(6yjA>1;vy@6^Tq5!peE`Dua@s-c#M*;#ck8# z*F7Wn0Ih-H%~^h@s^T-}l=I8(FRC!$Pq^`YkXwf+>>=);YuY*OE)`lwU+wMLjG7cH z4YkKcGaf^((8Qf0zhe|X54eF$1H?w?RA719BauZ*S-`sOVUb;_=F|)aAY<~U6JU>W zf8aZmq|Uu~i8OkCMiFN>=c4vBMdI(|=pPWIQ!*6Qf&GP9(SmvRdo6A-4bL{{YZ=r8 zt$sY8dReHaU7tmKE+CnoayfZkp$j8u2@N$?@#FOx!^24cJ!`J0IyB*9&#|&(RzO4j zhjDO|)8uatY<_yys+Z~NZ0%#XG4+nWhR9g`@VN{2i?i0a_)2HFhc$wxxZDi$boaHh zYz;l#(y5g-_t)v#Y<-*HI>a`!_!_qJgADC3?XNXOMNzg>ZeW5g#XPY2n)@anPt_>X zmZ|MDpRa|+Z1tkzNQ?R3>5u1tf92=1Ce}Dey=vgJFs?k!&<)c)woCz+bXHHXoVUyZ zo6~aRf3UFDWxVxY*|_Y7@J8YlqaNK7WgN*d-!FJg%g<683&(4xlMbf?%gMY2BK`Q> zhk5S2$BCHOq>Pfw^`72&WK5&5B?&Lw9ybP3RYmQrE&<~SbN{m#-aiaEC@<(4bXV#Ii)=R;BED2?mT?VZYFmdLy^crJwm)7x4LKvc|W=T%_4Sh*`XWTs$+&HvmWRg0nN+^Ha4xJvVE2dtL zR!)ditU`>qF?=tOK(BB$>)p1@%WF&AoN&Sk_)uy678wTictiv?YUv)G=|fMM2FMK9 ze9)>`G{bZFEPrmRM{PW8vBKU9${+e9kUa!N!N(v4Vf$ zT)lQOA$a?yETo9;019=g7p>2U?b~OmUg(uQdg$CYZDZKq2dakcOQffoOxHac5X?#l zb!-I!l*v{L15uxVq9QpVdjZ(vRHCZV!{4n4`KZc0DhibslAzUaNFtdP?NIPJ zrp!8`W=wG=420%ZOO*5^Idk%WYr-#s7Vr*lxyP3?%ju}L1Yu}`c(xW-jDx%F<8=Do zo`t`q%yT!+)v`W+6ONaBPV;1qAQO&SdwACt1k2R+QCDh4`(cGpF~cB5Jm4zh>am*p zB{_$z&U{AzLNSm80)w6Dq>&zbO$4AV1E4kB2O%u;fKo$pDcy13 zdTYt{L-3U3MH1d8KOZPOzav2 z16rZFhdWwO^XB9<-y4fvZS%++HLq?IVujM&^$&j19d|n92R2^uIlyLx2^{<^IM#ni zh(68(o@XUS!+N*-`@(vZ$%|PTk1f_@RQ>e644cLtIT$(R>aHy1dA**PS{H8K;SjSl zc#{dJpap)>#U1e@_s&3LAJ7^=zFsm{ry0grfO8a~&PxnA5i*iR377&8I!^lq8g=dC zP4$ucd?J5s3UT{!`el>=!2SR(_Dv8d3Eg`IlHiVNMh~~=)_Mf|iXPNKBm4M+rP{ko zHO83G>n80r35gx$6+}geMF%h7GobhOotiODGRBx+K*AHgEt*^_qj8dL!jw4__ges6 z_R{)E;Q>rL`#1f3Q35nH)MmuAC|zr-&60kFZt`ICyv~T$yzU6d__GPCmk079nvGBM zK$nahu~hS|i`f=1{B$jOMdlIR6(2Qh%;w_77wrLnTBd8OteIZ5Z|0G9P9I|iLIaB~ zeU@OuFM-MoLAq@THEv0x{OT@|C{Kvx+1yuwqaO&M$9AW-{D^2=XnW1KnT`rwRm+j8 zz*c@+-W|sCoyUv3UM=aQ*RsM1OD=UF@KsZajp6JOYGP`tWNiTv#S8e-1Z?cneA(CVlZ8KUi?QvS_JuZriQ7<|M22W6;qgN0bCPvfvv4h4!<)i_Y zaO$;G>g36=K_>Qi=A5AVthR6v@KPr!q7PA2wj+Clk1Ljeb-4HCGY&)q+mJ~IKmpW4 zHo4S^(*o6SewM-w866e(wlpAXkfU9@e_yTq=N+%))CfJWaFjP1?4jRuH{KAGQNO-= zR80Nn`N}bI)L%)UOZ@Pr2c(V(sg zGpHd}ocxpe8sh3Y#4M!J`&IfbCp-O1MJxdJQzlt zO=5!m8|=kK8ERX(22|c}z(UkdydL$H1Wj8qj7|I;TM63mRQzs`%^)9Cgz%KaM-@Kp zM_6|f2OA;z)=JA(7mQga0tPr^Z-fuU!^)Db8pU%6Om%c)yRmKnf&%5=bL;|21W^4P;fJ1tnYG26HE~nC1 z?m{hUNot!xD*?(4$)#_y4*R>x>|b;cd}hCU%`=UeZ4F-aH~(A0mwSOt&3mV7fjSF#O* zbJvc7c@(U{HTbH{s`s@6pnkk7Egl2M$^=!Ge4mW$KMyC$?1GG@0V_|UI^d-Tk=%-+ zR&UJL1xLtIHHla7WCZs9k-#cI(y6t9{nH0Q`RS7ltO~9)5I9Onaz-hq85K`7NXGh= zjY&;U$nYyUn|HprD10gEyMrWS`LEYbj^N{kRKl`l5%tJ;ajzd2PpK))>+{)D)c2r7FJVF%E;`{k(ASANiFdDvyo{Osk9!``r~$;4F9R}r z1%){gZyL`#RSLs!f#=Gc&$FpNY0ypiy$WuIoPi|5^=gBJU%ggjLKMeUswpAW zj>+bp?NAaxD^#U1TciH<_y^bSfgTIshH-zmz#m4L^QdS;6=oNUG=}N8ood&ryAaXF zLc1cHe?s=-j+Qyuz=eM6e>v)Nqz`6QM+>QIRf6P0;?@d`AY*j-ee zvl(?xp5pmXV9nomxWp8d6>Z8{zqm=e_}@vvQZRu;h)6KMWo4cw&E#bR)@7{X{<^)H z5sn8r6A_CG{0b&^sz#|s-XG!+fzL)Nj3tvv#5t7~oRB2sh)@31|2fgp%zd;Ht?K)L zN}wc12wCL!xpYPl9q$5H34K}Cb*Dbq+d3*Y1zT z`ADxGmbmt+X{%yMjFZ2i@)zgRyLtKM3JVZiD9L$zxZ=@mnsXX@>0ytoU94-2i{5q# z3uGl;S~6TV>uumdcbI}=XLVOxe`U$sWOX7(UwV1cM9R|2B((GbMH)Sjdhx-SqBs-y ziujibO)##IzNS(@$ePB>Nax_fi7Xk@ac5_sJqhpW%N5z37|VwV?=>FmoG>tc z{?&r#^Wn_z&~SPY)eg2ND>2jR-&5X)kUX`iFWmtww~iXC`cY&U2eg^YFDvk? z-;*CL!`aLPLA`bJ5P#jrdtiUTN+1A6GpE#>5*jLX#HO(N5^=y&k2iDAM@34W;509F zHb^nrq1My{;1<6$KH=*RV`_eXm0umdy)*rjVH{T2bomCL;&K#c!RYQ>N)HHC5XSVd z?zFsBzBY>B27Ud$OyuOhjMOL`?-AKzK)DkwsjDHN8J`CxVqcwd?&5T~({l&r`Wds| zw!6Ej2orYlY=G-G?byS{w)p&3TPZy4anqc=9!!p<&;zVewwJ^;Jg4MXwIGiWkh9_@ zP|%kK2_qvb8^_=DPD`se?JcxCDpG2`vGdGc9P#bLd;W>FpmL8>0;K2t)^B~cxR2s) zcrs>w$iYwWYbj1-u37C_5j!(CG!495DzR7A z=uj%)Oxc%Xv6za&8Xk_`O&VhIIdKHP9Gx8voyX5^0D&)mR-T;h6zYI*&MRdnI4g%K zE9*W#_y&6r0yC=QDm2$JaGt!%ksMtB<8 z@_T>b$!T`u!?X-iv#H#FX-)K2exgX7ENtm@P0pC@yvm6xi7i{V?1c}o; zuP>WlJrS#GqaV67b~GwJ2{Gt)kqFNuH*XnO@NzY!Yu!bA;j+n;L8Q;pp4PCa!RZX> zfNT;;JAQR=+(!Z1I`fS;rPp`)DeacEdo^C`sp`+tTrc(xTVcI)QT#ZLkM_gQk;_p0 z1>mM358j#2VI5&{NH@z8+#yg(Pzqbb32dT0&`|FA3} z`s;o`u%qmkT|AAD$m12V?%}fQ;RUHd>5y6m>f7|hZP#k|oMXLwRDiM6?l)fi&6&_+ zlml8<$m$FdNRfdh(KlU9WZaJo$9b!}UUFl-S4&lxrRa}tP}Rj?Hg3)v<#r~KcG7P( z(smw(9vR>>*)S)X_3CMFBN`Q^9_$V3@)6z{ zM@V0X{tE%=oM^&IfunC26NRvJ#%f!*ENuBl2=MjK7TMNuMnn{eU&E^A;be#(~OlJRsG;%{>AUB z@fGe@lVRNBop$Tlr>br%@ngEk5&MX783F-5rPVJK4uoVCMt-QpXGmWm?uc)7o1G zSF+U3TV!pX9dMI3RVK71%Q8)-m@ltvj{PEinl+=G6J$eLI{e*7v0k_b{&#$i{thsZ zgrj&dY0ADtQ~le249@k9lUc^P+OM`ltUJY0xLMG* z^2B%AQhXwoICtQJs%S*TSv}Qsu+trJL;+u2JY`XiA>X6K;<@20fm=tji?gGtQp=KA z$zG9SN~7g7^B46?%tqlz)y|LQcS_*(!$&9&GuU$(3YXNbxA*PvIVki?KiU*Y@~A6* z{&*T`xjtr0)$dcH>evJZ^fMxXloq9*gCl&Z4G%%btII51{;xj+q0n2jx)Sl{E>SfS zx~L^iI;-mD)E#kl>|pJE4jfGws}2tovM>os@6nB27t%^cf{XpWbI+9@Wq7I}zfq@; zf(~=`iw~7k>Q^jxb(r1nN{BOXgD8> z*E~#30sz2a?murz|E3`{Wl(vG{6izaws81cHicOTK>Xw2vmyr_Sz`U8zghDA+sFCe z7|7~hu}UlMe|$>Tkh)P(op4Rr&1`kJFurTu~Lfl4rzxDqrNie>~6~IfHxUBk_i$AOBK_m z@9p9q{LahaP4fqeLGp2cwxW8IFsaH_(Vg@bWbB*6^sc}-f!N!)O(j=O3jv6sJ6iA109m+C}D?Ov}vQN3D z*EQt=b3_zqcFk(Mfmb_9Kg+MP-(pm7!aAvt_q=(RahYdK$*0ffnd=FouZ10?T zhmZ0Y#OvhsDB51JXJN7!oRsDohBWC_Bh4--8t6XA*WD^07d|AXgmNA>;>^NO-jpt0 z9$4gy2K_6*<}lCsWoMR3^>V?yW7pr-@6DgtDp%V1(=gwZPKKb{P6kJ=lJp)zR6XuK zEz?eq`kp2l{~1sl?{xM}qAi4ysxse7==Ea zJ?F*ExNNeZl9MVwaBlAuhYvwBWfh~nZ9)ehPlk%wIQHi=hfkpxZ7E|~#_jCepfbs4 zoIKTiLp-X68<@iq*Ke+T((}?8y)DY+nqvGrxW_r3RAi z8-~Xe5iyK_!&k*G$!PSeds+E-i`Q|tJ|Z#WODMfX+W7hO6lQ7xd)TI;w@xhBR`kGAA~0g#tQG+RiR#k(Pob zn_?cD-ud>C9c_KCZ(xc^pU803L~+9-QNrk+MiF$5`R!EpYhvU;l>vh77$$e_bXMWQ|KIK2lBsOKlr?) zi@O3({e#sQ)HNY%^bs>in!TiN1>Wtnm+skqqTIc#DBbdf{eHh`koXgj__Jre^>ah1 zgN&wy!t*289(g@#j!snf^KM)x2A;hyZ7a`L3fLa35v62-ifH8VWV2!oVBr|HI>{ed z1S|-Yt`Ps=yKh~^&QvVapVB+Dpr3!3{}`5Q`sR=U0ZautWCw&#SRl37DJ*)e9Uo#v zms{Ywd~A-U!C9;#yS%H4AdZP5vLLHGA}7>Ju`lk|pSQx5T`%v~U#4x(sIr8=`x5M@ zjhlXLe;=)He--;pf`jA-Gwc6&T7aDLGgP5l-3DSG{3C%&Vd0D5^YBrFe>f|k)#ozH zZl*z@4QM)fGYDr z`rZpCb7s(lrZ#cp<{co9_IPr?JHKqqWo}TW;|L!}BaY=3Mgl zE_#XBMI6cYm$DVZYlQyrITzn5r<0Abk1rj6^`F9H9<#7!V#B~Yx}FFZ{;5;x9`fSf z=a{#5#8F3xL0285`MH!?RBc0JA$b^zksl*E-rKDH-qp%J^X6nge9QkWXq9)lMO{EB z33!>Ixp**L#F~ui&6;w?%Fj4PgcyVf7(#$}Bl4}UPTCh; zZjMk#{`vDnl3@|WGOIXvBKVcev8le}Rm_%5T(c`N&-6C(vfc{p!dhT0>&bJ!0_P47 z9eAP>z1l{g!1~g3&VA%2@eG^zVCPwD2~>4sNd6hZtx9m z__IXot;K|3`gK=dT!WSIfh4c`ET zY+Z>jfqoForf-0?c*rn%`}}MHSPWYjP(z=MJI#tVc|KY;A((xY1O}U_>ZB740>NxO zy|ksPb$E-vOZ!9{>4@dA_b$@6^{^JRG-FO1`?7UHPz$F>RpcF65Kk@{U~iv2;OD~@ z{-Vsy##!$gVlDLpTgsU&-qnH8e`PEu7<5icXoPCB1Ta(o*vHMxEfDx>y#9_A`Y zjy`5fSc2O`qphju0jURDm{?~3MFJRX<|bnIy~J{#6w=myxuH0dpwUS`V{`lrl_WrU zPQ@1p)5z_G)&Hn-SR(6G0vN7pZjyBUW{h>icGckZkY7w+VY~FB3?!RN*)~yx@`~f>>_bY!K zvMs!#D605Gb(=N)DIgz88l&~OlBzpLAFZ$`4e-0;D}LY`*=1?6a~OOZB!)x_l-hP0 zs-NES4nZ4ShKAEg)7h*egc_qa{`#sfW?SRTF9Qzo$eEhZ)>z@2;4BOD8|Ah(oYmPR z!{5*WKZb%aoV8cR*u9YBfhKzhlSL_cRnD*o6Zy-2EhangkN4(g!dDh;x-rAJ1J}k5 zS^dZ!c$^ikzfj25N(F>2E@7sBIYWV<3Enm)i5x2MqMQN7OJ8?iaI&bc-M z1dU6U7PaenYL6+R@oZclDxRF8J~OL-r&MC2j<(!A7c>2nwXW(m&7>l%(`Ajk0DkEH z0J3>K2>(4W1tgabdLjY_RY!haGldJ#gXfSNq%{;_uciba85JemI!gB-I2$#~KJ*Zi;qCH~v&{tL+s zTJvJ~Cn)bt2;mH(@TU2X1zBbVvi7F_ueoIfs`IA)XJda?2Kn#uX#QWBn4nv4F3rEU z^1m(otCRn;Y^(tQS2u4vb5~|BdpjjL2uLgd7U16v7EKrw`^^La02Dw20NDS70s!zq Nfj+dbk?wyx{|_qiJ%IoK diff --git a/icinga-app/Makefile.am b/icinga-app/Makefile.am index 8fb735efc..dbf9c06a4 100644 --- a/icinga-app/Makefile.am +++ b/icinga-app/Makefile.am @@ -33,8 +33,7 @@ icinga_LDADD = \ -dlopen ${top_builddir}/components/compat/compat.la \ -dlopen ${top_builddir}/components/convenience/convenience.la \ -dlopen ${top_builddir}/components/delegation/delegation.la \ - -dlopen ${top_builddir}/components/demo/demo.la \ - -dlopen ${top_builddir}/components/discovery/discovery.la + -dlopen ${top_builddir}/components/demo/demo.la icinga_DEPENDENCIES = \ ${top_builddir}/components/cibsync/cibsync.la \ diff --git a/icinga/Makefile.am b/icinga/Makefile.am index 744f80825..91952ff2f 100644 --- a/icinga/Makefile.am +++ b/icinga/Makefile.am @@ -11,11 +11,7 @@ libicinga_la_SOURCES = \ endpointmanager.h \ icingaapplication.cpp \ icingaapplication.h \ - i2-icinga.h \ - jsonrpcendpoint.cpp \ - jsonrpcendpoint.h \ - virtualendpoint.cpp \ - virtualendpoint.h + i2-icinga.h libicinga_la_CPPFLAGS = \ -DI2_ICINGA_BUILD \ diff --git a/icinga/endpoint.cpp b/icinga/endpoint.cpp index 1209963df..701a240ca 100644 --- a/icinga/endpoint.cpp +++ b/icinga/endpoint.cpp @@ -21,24 +21,109 @@ using namespace icinga; +REGISTER_CLASS(Endpoint); + +boost::signal Endpoint::OnConnected; +boost::signal Endpoint::OnDisconnected; +boost::signal Endpoint::OnSubscriptionRegistered; +boost::signal Endpoint::OnSubscriptionUnregistered; + +Endpoint::Endpoint(const Dictionary::Ptr& serializedUpdate) + : DynamicObject(serializedUpdate) +{ + RegisterAttribute("node", Attribute_Replicated); + RegisterAttribute("service", Attribute_Replicated); + RegisterAttribute("local", Attribute_Config); + RegisterAttribute("subscriptions", Attribute_Replicated); + RegisterAttribute("client", Attribute_Transient); +} + +bool Endpoint::Exists(const String& name) +{ + return (DynamicObject::GetObject("Endpoint", name)); +} + +Endpoint::Ptr Endpoint::GetByName(const String& name) +{ + DynamicObject::Ptr configObject = DynamicObject::GetObject("Endpoint", name); + + if (!configObject) + throw_exception(invalid_argument("Endpoint '" + name + "' does not exist.")); + + return dynamic_pointer_cast(configObject); +} + +Endpoint::Ptr Endpoint::MakeEndpoint(const String& name, bool local) +{ + ConfigItemBuilder::Ptr endpointConfig = boost::make_shared(); + endpointConfig->SetType("Endpoint"); + endpointConfig->SetName(local ? "local:" + name : name); + endpointConfig->SetLocal(local ? 1 : 0); + endpointConfig->AddExpression("local", OperatorSet, local); + + DynamicObject::Ptr object = endpointConfig->Compile()->Commit(); + return dynamic_pointer_cast(object); +} + +/** + * Checks whether this is a local endpoint. + * + * @returns true if this is a local endpoint, false otherwise. + */ +bool Endpoint::IsLocalEndpoint(void) const +{ + Value value = Get("local"); + + return (!value.IsEmpty() && value); +} + /** - * Retrieves the endpoint manager this endpoint is registered with. + * Checks whether this endpoint is connected. * - * @returns The EndpointManager object. + * @returns true if the endpoint is connected, false otherwise. */ -EndpointManager::Ptr Endpoint::GetEndpointManager(void) const +bool Endpoint::IsConnected(void) const { - return m_EndpointManager.lock(); + if (IsLocalEndpoint()) { + return true; + } else { + JsonRpcClient::Ptr client = GetClient(); + + return (client && client->IsConnected()); + } } /** - * Sets the endpoint manager this endpoint is registered with. + * Retrieves the address for the endpoint. * - * @param manager The EndpointManager object. + * @returns The endpoint's address. */ -void Endpoint::SetEndpointManager(EndpointManager::WeakPtr manager) +String Endpoint::GetAddress(void) const +{ + if (IsLocalEndpoint()) { + return "local:" + GetName(); + } else { + JsonRpcClient::Ptr client = GetClient(); + + if (!client) + return ""; + + return client->GetPeerAddress(); + } +} + +JsonRpcClient::Ptr Endpoint::GetClient(void) const { - m_EndpointManager = manager; + return Get("client"); +} + +void Endpoint::SetClient(const JsonRpcClient::Ptr& client) +{ + Set("client", client); + client->OnNewMessage.connect(boost::bind(&Endpoint::NewMessageHandler, this, _2)); + client->OnClosed.connect(boost::bind(&Endpoint::ClientClosedHandler, this)); + + OnConnected(GetSelf()); } /** @@ -46,9 +131,18 @@ void Endpoint::SetEndpointManager(EndpointManager::WeakPtr manager) * * @param topic The name of the topic. */ -void Endpoint::RegisterSubscription(String topic) +void Endpoint::RegisterSubscription(const String& topic) { - m_Subscriptions.insert(topic); + Dictionary::Ptr subscriptions = GetSubscriptions(); + + if (!subscriptions) + subscriptions = boost::make_shared(); + + if (!subscriptions->Contains(topic)) { + Dictionary::Ptr newSubscriptions = subscriptions->ShallowClone(); + newSubscriptions->Set(topic, topic); + SetSubscriptions(newSubscriptions); + } } /** @@ -56,9 +150,15 @@ void Endpoint::RegisterSubscription(String topic) * * @param topic The name of the topic. */ -void Endpoint::UnregisterSubscription(String topic) +void Endpoint::UnregisterSubscription(const String& topic) { - m_Subscriptions.erase(topic); + Dictionary::Ptr subscriptions = GetSubscriptions(); + + if (subscriptions && subscriptions->Contains(topic)) { + Dictionary::Ptr newSubscriptions = subscriptions->ShallowClone(); + newSubscriptions->Remove(topic); + SetSubscriptions(newSubscriptions); + } } /** @@ -67,9 +167,11 @@ void Endpoint::UnregisterSubscription(String topic) * @param topic The name of the topic. * @returns true if the endpoint is subscribed to the topic, false otherwise. */ -bool Endpoint::HasSubscription(String topic) const +bool Endpoint::HasSubscription(const String& topic) const { - return (m_Subscriptions.find(topic) != m_Subscriptions.end()); + Dictionary::Ptr subscriptions = GetSubscriptions(); + + return (subscriptions && subscriptions->Contains(topic)); } /** @@ -77,65 +179,168 @@ bool Endpoint::HasSubscription(String topic) const */ void Endpoint::ClearSubscriptions(void) { - m_Subscriptions.clear(); + Set("subscriptions", Empty); } -/** - * Returns the beginning of the subscriptions list. - * - * @returns An iterator that points to the first subscription. - */ -Endpoint::ConstTopicIterator Endpoint::BeginSubscriptions(void) const +Dictionary::Ptr Endpoint::GetSubscriptions(void) const { - return m_Subscriptions.begin(); + return Get("subscriptions"); } -/** - * Returns the end of the subscriptions list. - * - * @returns An iterator that points past the last subscription. - */ -Endpoint::ConstTopicIterator Endpoint::EndSubscriptions(void) const +void Endpoint::SetSubscriptions(const Dictionary::Ptr& subscriptions) { - return m_Subscriptions.end(); + Set("subscriptions", subscriptions); } -/** - * Sets whether a welcome message has been received from this endpoint. - * - * @param value Whether we've received a welcome message. - */ -void Endpoint::SetReceivedWelcome(bool value) +void Endpoint::RegisterTopicHandler(const String& topic, const function& callback) { - m_ReceivedWelcome = value; + map > >::iterator it; + it = m_TopicHandlers.find(topic); + + shared_ptr > sig; + + if (it == m_TopicHandlers.end()) { + sig = boost::make_shared >(); + m_TopicHandlers.insert(make_pair(topic, sig)); + } else { + sig = it->second; + } + + sig->connect(callback); + + RegisterSubscription(topic); } -/** - * Retrieves whether a welcome message has been received from this endpoint. - * - * @returns Whether we've received a welcome message. - */ -bool Endpoint::HasReceivedWelcome(void) const +void Endpoint::UnregisterTopicHandler(const String& topic, const function& callback) { - return m_ReceivedWelcome; + // TODO: implement + //m_TopicHandlers[method] -= callback; + //UnregisterSubscription(method); + + throw_exception(NotImplementedException()); } -/** - * Sets whether a welcome message has been sent to this endpoint. - * - * @param value Whether we've sent a welcome message. - */ -void Endpoint::SetSentWelcome(bool value) +void Endpoint::OnAttributeChanged(const String& name, const Value& oldValue) { - m_SentWelcome = value; + if (name == "subscriptions") { + Dictionary::Ptr oldSubscriptions, newSubscriptions; + + if (oldValue.IsObjectType()) + oldSubscriptions = oldValue; + + newSubscriptions = GetSubscriptions(); + + if (oldSubscriptions) { + String subscription; + BOOST_FOREACH(tie(tuples::ignore, subscription), oldSubscriptions) { + if (!newSubscriptions || !newSubscriptions->Contains(subscription)) { + Logger::Write(LogInformation, "icinga", "Removed subscription for '" + GetName() + "': " + subscription); + OnSubscriptionUnregistered(GetSelf(), subscription); + } + } + } + + if (newSubscriptions) { + String subscription; + BOOST_FOREACH(tie(tuples::ignore, subscription), newSubscriptions) { + if (!oldSubscriptions || !oldSubscriptions->Contains(subscription)) { + Logger::Write(LogInformation, "icinga", "New subscription for '" + GetName() + "': " + subscription); + OnSubscriptionRegistered(GetSelf(), subscription); + } + } + } + } } -/** - * Retrieves whether a welcome message has been sent to this endpoint. - * - * @returns Whether we've sent a welcome message. - */ -bool Endpoint::HasSentWelcome(void) const +void Endpoint::ProcessRequest(const Endpoint::Ptr& sender, const RequestMessage& request) { - return m_SentWelcome; + if (!IsConnected()) { + // TODO: persist the message + return; + } + + if (IsLocalEndpoint()) { + String method; + if (!request.GetMethod(&method)) + return; + + map > >::iterator it; + it = m_TopicHandlers.find(method); + + if (it == m_TopicHandlers.end()) + return; + + (*it->second)(GetSelf(), sender, request); + } else { + GetClient()->SendMessage(request); + } } + +void Endpoint::ProcessResponse(const Endpoint::Ptr& sender, const ResponseMessage& response) +{ + if (!IsConnected()) + return; + + if (IsLocalEndpoint()) + EndpointManager::GetInstance()->ProcessResponseMessage(sender, response); + else + GetClient()->SendMessage(response); +} + +void Endpoint::NewMessageHandler(const MessagePart& message) +{ + Endpoint::Ptr sender = GetSelf(); + + if (ResponseMessage::IsResponseMessage(message)) { + /* rather than routing the message to the right virtual + * endpoint we just process it here right away. */ + EndpointManager::GetInstance()->ProcessResponseMessage(sender, message); + return; + } + + RequestMessage request = message; + + String method; + if (!request.GetMethod(&method)) + return; + + String id; + if (request.GetID(&id)) + EndpointManager::GetInstance()->SendAnycastMessage(sender, request); + else + EndpointManager::GetInstance()->SendMulticastMessage(sender, request); +} + +void Endpoint::ClientClosedHandler(void) +{ + try { + GetClient()->CheckException(); + } catch (const exception& ex) { + stringstream message; + message << "Error occured for JSON-RPC socket: Message=" << ex.what(); + + Logger::Write(LogWarning, "jsonrpc", message.str()); + } + + Logger::Write(LogWarning, "jsonrpc", "Lost connection to endpoint: identity=" + GetName()); + + // TODO: _only_ clear non-persistent subscriptions + // unregister ourselves if no persistent subscriptions are left (use a + // timer for that, once we have a TTL property for the topics) + ClearSubscriptions(); + + Set("client", Empty); + + OnDisconnected(GetSelf()); +} + +String Endpoint::GetNode(void) const +{ + return Get("node"); +} + +String Endpoint::GetService(void) const +{ + return Get("service"); +} + diff --git a/icinga/endpoint.h b/icinga/endpoint.h index f6586bbe3..d2799e823 100644 --- a/icinga/endpoint.h +++ b/icinga/endpoint.h @@ -30,60 +30,65 @@ class EndpointManager; * * @ingroup icinga */ -class I2_ICINGA_API Endpoint : public Object +class I2_ICINGA_API Endpoint : public DynamicObject { public: typedef shared_ptr Ptr; typedef weak_ptr WeakPtr; - typedef set::const_iterator ConstTopicIterator; + typedef void (Callback)(const Endpoint::Ptr&, const Endpoint::Ptr&, const RequestMessage&); - Endpoint(void) - : m_ReceivedWelcome(false), m_SentWelcome(false) - { } + Endpoint(const Dictionary::Ptr& serializedUpdate); - virtual String GetIdentity(void) const = 0; - virtual String GetAddress(void) const = 0; + static bool Exists(const String& name); + static Endpoint::Ptr GetByName(const String& name); - void SetReceivedWelcome(bool value); - bool HasReceivedWelcome(void) const; + String GetAddress(void) const; - void SetSentWelcome(bool value); - bool HasSentWelcome(void) const; + JsonRpcClient::Ptr GetClient(void) const; + void SetClient(const JsonRpcClient::Ptr& client); - shared_ptr GetEndpointManager(void) const; - void SetEndpointManager(weak_ptr manager); + void RegisterSubscription(const String& topic); + void UnregisterSubscription(const String& topic); + bool HasSubscription(const String& topic) const; - void RegisterSubscription(String topic); - void UnregisterSubscription(String topic); - bool HasSubscription(String topic) const; + Dictionary::Ptr GetSubscriptions(void) const; + void SetSubscriptions(const Dictionary::Ptr& subscriptions); - virtual bool IsLocal(void) const = 0; - virtual bool IsConnected(void) const = 0; + bool IsLocalEndpoint(void) const; + bool IsConnected(void) const; - virtual void ProcessRequest(Endpoint::Ptr sender, const RequestMessage& message) = 0; - virtual void ProcessResponse(Endpoint::Ptr sender, const ResponseMessage& message) = 0; - - virtual void Stop(void) = 0; + void ProcessRequest(const Endpoint::Ptr& sender, const RequestMessage& message); + void ProcessResponse(const Endpoint::Ptr& sender, const ResponseMessage& message); void ClearSubscriptions(void); - ConstTopicIterator BeginSubscriptions(void) const; - ConstTopicIterator EndSubscriptions(void) const; + void RegisterTopicHandler(const String& topic, const function& callback); + void UnregisterTopicHandler(const String& topic, const function& callback); + + virtual void OnAttributeChanged(const String& name, const Value& oldValue); + + String GetNode(void) const; + String GetService(void) const; - boost::signal OnSessionEstablished; + static Endpoint::Ptr MakeEndpoint(const String& name, bool local); + + static boost::signal OnConnected; + static boost::signal OnDisconnected; + + static boost::signal OnSubscriptionRegistered; + static boost::signal OnSubscriptionUnregistered; private: - set m_Subscriptions; /**< The topics this endpoint is - subscribed to. */ bool m_ReceivedWelcome; /**< Have we received a welcome message from this endpoint? */ bool m_SentWelcome; /**< Have we sent a welcome message to this endpoint? */ - weak_ptr m_EndpointManager; /**< The endpoint manager - this endpoint is - registered with. */ + map > > m_TopicHandlers; + + void NewMessageHandler(const MessagePart& message); + void ClientClosedHandler(void); }; } diff --git a/icinga/endpointmanager.cpp b/icinga/endpointmanager.cpp index 1e7dc3a4a..4bbef5110 100644 --- a/icinga/endpointmanager.cpp +++ b/icinga/endpointmanager.cpp @@ -31,6 +31,16 @@ EndpointManager::EndpointManager(void) m_RequestTimer->OnTimerExpired.connect(boost::bind(&EndpointManager::RequestTimerHandler, this)); m_RequestTimer->SetInterval(5); m_RequestTimer->Start(); + + m_SubscriptionTimer = boost::make_shared(); + m_SubscriptionTimer->OnTimerExpired.connect(boost::bind(&EndpointManager::SubscriptionTimerHandler, this)); + m_SubscriptionTimer->SetInterval(10); + m_SubscriptionTimer->Start(); + + m_ReconnectTimer = boost::make_shared(); + m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&EndpointManager::ReconnectTimerHandler, this)); + m_ReconnectTimer->SetInterval(10); + m_ReconnectTimer->Start(); } /** @@ -42,6 +52,16 @@ EndpointManager::EndpointManager(void) void EndpointManager::SetIdentity(const String& identity) { m_Identity = identity; + + if (m_Endpoint) + m_Endpoint->Unregister(); + + DynamicObject::Ptr object = DynamicObject::GetObject("Endpoint", identity); + + if (object) + m_Endpoint = dynamic_pointer_cast(object); + else + m_Endpoint = Endpoint::MakeEndpoint(identity, false); } /** @@ -54,26 +74,6 @@ String EndpointManager::GetIdentity(void) const return m_Identity; } -/** - * Sets the SSL context that is used for remote connections. - * - * @param sslContext The new SSL context. - */ -void EndpointManager::SetSSLContext(const shared_ptr& sslContext) -{ - m_SSLContext = sslContext; -} - -/** - * Retrieves the SSL context that is used for remote connections. - * - * @returns The SSL context. - */ -shared_ptr EndpointManager::GetSSLContext(void) const -{ - return m_SSLContext; -} - /** * Creates a new JSON-RPC listener on the specified port. * @@ -81,15 +81,20 @@ shared_ptr EndpointManager::GetSSLContext(void) const */ void EndpointManager::AddListener(const String& service) { - if (!GetSSLContext()) + shared_ptr sslContext = IcingaApplication::GetInstance()->GetSSLContext(); + + if (!sslContext) throw_exception(logic_error("SSL context is required for AddListener()")); stringstream s; s << "Adding new listener: port " << service; Logger::Write(LogInformation, "icinga", s.str()); - JsonRpcServer::Ptr server = boost::make_shared(m_SSLContext); - RegisterServer(server); + JsonRpcServer::Ptr server = boost::make_shared(sslContext); + + m_Servers.insert(server); + server->OnNewClient.connect(boost::bind(&EndpointManager::NewClientHandler, + this, _2)); server->Bind(service, AF_INET6); server->Listen(); @@ -102,107 +107,47 @@ void EndpointManager::AddListener(const String& service) * @param node The remote host. * @param service The remote port. */ -void EndpointManager::AddConnection(const String& node, const String& service) -{ - stringstream s; - s << "Adding new endpoint: [" << node << "]:" << service; - Logger::Write(LogInformation, "icinga", s.str()); - - JsonRpcEndpoint::Ptr endpoint = boost::make_shared(); - RegisterEndpoint(endpoint); - endpoint->Connect(node, service, m_SSLContext); -} - -/** - * Registers a new JSON-RPC server with this endpoint manager. - * - * @param server The JSON-RPC server. - */ -void EndpointManager::RegisterServer(const JsonRpcServer::Ptr& server) -{ - m_Servers.push_back(server); - server->OnNewClient.connect(boost::bind(&EndpointManager::NewClientHandler, - this, _2)); +void EndpointManager::AddConnection(const String& node, const String& service) { + JsonRpcClient::Ptr client = boost::make_shared(RoleOutbound, + IcingaApplication::GetInstance()->GetSSLContext()); + client->Connect(node, service); + NewClientHandler(client); } /** * Processes a new client connection. * - * @param ncea Event arguments. + * @param client The new client. */ void EndpointManager::NewClientHandler(const TcpClient::Ptr& client) { - Logger::Write(LogInformation, "icinga", "Accepted new client from " + client->GetPeerAddress()); + JsonRpcClient::Ptr jclient = static_pointer_cast(client); - JsonRpcEndpoint::Ptr endpoint = boost::make_shared(); - endpoint->SetClient(static_pointer_cast(client)); - client->Start(); - RegisterEndpoint(endpoint); -} + Logger::Write(LogInformation, "icinga", "New client connection from " + jclient->GetPeerAddress()); -/** - * Unregisters a JSON-RPC server. - * - * @param server The JSON-RPC server. - */ -void EndpointManager::UnregisterServer(const JsonRpcServer::Ptr& server) -{ - m_Servers.erase( - remove(m_Servers.begin(), m_Servers.end(), server), - m_Servers.end()); - // TODO: unbind event + m_PendingClients.insert(jclient); + jclient->OnConnected.connect(boost::bind(&EndpointManager::ClientConnectedHandler, this, _1)); + jclient->Start(); } -/** - * Registers a new endpoint with this endpoint manager. - * - * @param endpoint The new endpoint. - */ -void EndpointManager::RegisterEndpoint(const Endpoint::Ptr& endpoint) +void EndpointManager::ClientConnectedHandler(const TcpClient::Ptr& client) { - endpoint->SetEndpointManager(GetSelf()); + JsonRpcClient::Ptr jclient = static_pointer_cast(client); - UnregisterEndpoint(endpoint); + m_PendingClients.erase(jclient); - String identity = endpoint->GetIdentity(); + shared_ptr cert = jclient->GetPeerCertificate(); - if (!identity.IsEmpty()) { - m_Endpoints[identity] = endpoint; - OnNewEndpoint(GetSelf(), endpoint); - } else { - m_PendingEndpoints.push_back(endpoint); - } + String identity = Utility::GetCertificateCN(cert); - if (endpoint->IsLocal()) { - /* this endpoint might have introduced new subscriptions - * or publications which affect remote endpoints, we need - * to close all fully-connected remote endpoints to make sure - * these subscriptions/publications are kept up-to-date. */ - Iterator prev, it; - for (it = m_Endpoints.begin(); it != m_Endpoints.end(); ) { - prev = it; - it++; - - if (!prev->second->IsLocal()) - m_Endpoints.erase(prev); - } - } -} + Endpoint::Ptr endpoint; -/** - * Unregisters an endpoint. - * - * @param endpoint The endpoint. - */ -void EndpointManager::UnregisterEndpoint(const Endpoint::Ptr& endpoint) -{ - m_PendingEndpoints.erase( - remove(m_PendingEndpoints.begin(), m_PendingEndpoints.end(), endpoint), - m_PendingEndpoints.end()); + if (Endpoint::Exists(identity)) + endpoint = Endpoint::GetByName(identity); + else + endpoint = Endpoint::MakeEndpoint(identity, false); - String identity = endpoint->GetIdentity(); - if (!identity.IsEmpty()) - m_Endpoints.erase(identity); + endpoint->SetClient(jclient); } /** @@ -240,8 +185,9 @@ void EndpointManager::SendAnycastMessage(const Endpoint::Ptr& sender, throw_exception(invalid_argument("Message is missing the 'method' property.")); vector candidates; - Endpoint::Ptr endpoint; - BOOST_FOREACH(tie(tuples::ignore, endpoint), m_Endpoints) { + DynamicObject::Ptr object; + BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) { + Endpoint::Ptr endpoint = dynamic_pointer_cast(object); /* don't forward messages between non-local endpoints */ if (!sender->IsLocal() && !endpoint->IsLocal()) continue; @@ -275,8 +221,10 @@ void EndpointManager::SendMulticastMessage(const Endpoint::Ptr& sender, if (!message.GetMethod(&method)) throw_exception(invalid_argument("Message is missing the 'method' property.")); - Endpoint::Ptr recipient; - BOOST_FOREACH(tie(tuples::ignore, recipient), m_Endpoints) { + DynamicObject::Ptr object; + BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) { + Endpoint::Ptr recipient = dynamic_pointer_cast(object); + /* don't forward messages back to the sender */ if (sender == recipient) continue; @@ -291,31 +239,16 @@ void EndpointManager::SendMulticastMessage(const Endpoint::Ptr& sender, * * @param callback The callback function. */ -void EndpointManager::ForEachEndpoint(function callback) -{ - map::iterator prev, i; - for (i = m_Endpoints.begin(); i != m_Endpoints.end(); ) { - prev = i; - i++; - - callback(GetSelf(), prev->second); - } -} - -/** - * Retrieves an endpoint that has the specified identity. - * - * @param identity The identity of the endpoint. - */ -Endpoint::Ptr EndpointManager::GetEndpointByIdentity(const String& identity) const -{ - map::const_iterator i; - i = m_Endpoints.find(identity); - if (i != m_Endpoints.end()) - return i->second; - else - return Endpoint::Ptr(); -} +//void EndpointManager::ForEachEndpoint(function callback) +//{ +// map::iterator prev, i; +// for (i = m_Endpoints.begin(); i != m_Endpoints.end(); ) { +// prev = i; +// i++; +// +// callback(GetSelf(), prev->second); +// } +//} void EndpointManager::SendAPIMessage(const Endpoint::Ptr& sender, const Endpoint::Ptr& recipient, RequestMessage& message, @@ -348,6 +281,46 @@ bool EndpointManager::RequestTimeoutLessComparer(const pair(); + + DynamicObject::Ptr object; + BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) { + Endpoint::Ptr endpoint = dynamic_pointer_cast(object); + + if (!endpoint->IsLocalEndpoint()) + continue; + + String topic; + BOOST_FOREACH(tie(tuples::ignore, topic), endpoint->GetSubscriptions()) { + subscriptions->Set(topic, topic); + } + } + + m_Endpoint->SetSubscriptions(subscriptions); +} + +void EndpointManager::ReconnectTimerHandler(void) +{ + DynamicObject::Ptr object; + BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) { + Endpoint::Ptr endpoint = dynamic_pointer_cast(object); + + if (endpoint->IsConnected()) + continue; + + String node, service; + node = endpoint->GetNode(); + service = endpoint->GetService(); + + if (node.IsEmpty() || service.IsEmpty()) + continue; + + AddConnection(node, service); + } +} + void EndpointManager::RequestTimerHandler(void) { map::iterator it; @@ -379,15 +352,15 @@ void EndpointManager::ProcessResponseMessage(const Endpoint::Ptr& sender, const m_Requests.erase(it); } -EndpointManager::Iterator EndpointManager::Begin(void) -{ - return m_Endpoints.begin(); -} +//EndpointManager::Iterator EndpointManager::Begin(void) +//{ +// return m_Endpoints.begin(); +//} -EndpointManager::Iterator EndpointManager::End(void) -{ - return m_Endpoints.end(); -} +//EndpointManager::Iterator EndpointManager::End(void) +//{ +// return m_Endpoints.end(); +//} EndpointManager::Ptr EndpointManager::GetInstance(void) { diff --git a/icinga/endpointmanager.h b/icinga/endpointmanager.h index d8dd41f74..fe57d74d5 100644 --- a/icinga/endpointmanager.h +++ b/icinga/endpointmanager.h @@ -34,7 +34,7 @@ public: typedef shared_ptr Ptr; typedef weak_ptr WeakPtr; - typedef map::iterator Iterator; +// typedef map::iterator Iterator; EndpointManager(void); @@ -49,9 +49,6 @@ public: void AddListener(const String& service); void AddConnection(const String& node, const String& service); - void RegisterEndpoint(const Endpoint::Ptr& endpoint); - void UnregisterEndpoint(const Endpoint::Ptr& endpoint); - void SendUnicastMessage(const Endpoint::Ptr& sender, const Endpoint::Ptr& recipient, const MessagePart& message); void SendAnycastMessage(const Endpoint::Ptr& sender, const RequestMessage& message); void SendMulticastMessage(const Endpoint::Ptr& sender, const RequestMessage& message); @@ -61,21 +58,22 @@ public: void ProcessResponseMessage(const Endpoint::Ptr& sender, const ResponseMessage& message); - void ForEachEndpoint(function callback); - Iterator Begin(void); - Iterator End(void); - - Endpoint::Ptr GetEndpointByIdentity(const String& identity) const; +// void ForEachEndpoint(function callback); +// Iterator Begin(void); +// Iterator End(void); boost::signal OnNewEndpoint; private: String m_Identity; - shared_ptr m_SSLContext; + Endpoint::Ptr m_Endpoint; + + Timer::Ptr m_SubscriptionTimer; + + Timer::Ptr m_ReconnectTimer; - vector m_Servers; - vector m_PendingEndpoints; - map m_Endpoints; + set m_Servers; + set m_PendingClients; /** * Information about a pending API request. @@ -98,13 +96,15 @@ private: map m_Requests; Timer::Ptr m_RequestTimer; - void RegisterServer(const JsonRpcServer::Ptr& server); - void UnregisterServer(const JsonRpcServer::Ptr& server); - static bool RequestTimeoutLessComparer(const pair& a, const pair& b); void RequestTimerHandler(void); + void SubscriptionTimerHandler(void); + + void ReconnectTimerHandler(void); + void NewClientHandler(const TcpClient::Ptr& client); + void ClientConnectedHandler(const TcpClient::Ptr& client); }; } diff --git a/icinga/i2-icinga.h b/icinga/i2-icinga.h index 67dc8c27a..85637333a 100644 --- a/icinga/i2-icinga.h +++ b/icinga/i2-icinga.h @@ -42,8 +42,6 @@ using boost::algorithm::is_any_of; #endif /* I2_ICINGA_BUILD */ #include "endpoint.h" -#include "jsonrpcendpoint.h" -#include "virtualendpoint.h" #include "endpointmanager.h" #include "icingaapplication.h" diff --git a/icinga/icingaapplication.cpp b/icinga/icingaapplication.cpp index 330e28d36..9f65bd9c1 100644 --- a/icinga/icingaapplication.cpp +++ b/icinga/icingaapplication.cpp @@ -133,8 +133,7 @@ int IcingaApplication::Main(const vector& args) Logger::Write(LogInformation, "icinga", "My identity: " + identity); EndpointManager::GetInstance()->SetIdentity(identity); - shared_ptr sslContext = Utility::MakeSSLContext(GetCertificateFile(), GetCertificateFile(), GetCAFile()); - EndpointManager::GetInstance()->SetSSLContext(sslContext); + m_SSLContext = Utility::MakeSSLContext(GetCertificateFile(), GetCertificateFile(), GetCAFile()); } /* create the primary RPC listener */ @@ -215,3 +214,8 @@ double IcingaApplication::GetStartTime(void) const { return m_StartTime; } + +shared_ptr IcingaApplication::GetSSLContext(void) const +{ + return m_SSLContext; +} diff --git a/icinga/icingaapplication.h b/icinga/icingaapplication.h index 2045e1a4c..d1175758b 100644 --- a/icinga/icingaapplication.h +++ b/icinga/icingaapplication.h @@ -47,6 +47,7 @@ public: String GetPidPath(void) const; String GetStatePath(void) const; Dictionary::Ptr GetMacros(void) const; + shared_ptr GetSSLContext(void) const; double GetStartTime(void) const; @@ -61,6 +62,7 @@ private: String m_PidPath; String m_StatePath; Dictionary::Ptr m_Macros; + shared_ptr m_SSLContext; double m_StartTime; diff --git a/icinga/jsonrpcendpoint.cpp b/icinga/jsonrpcendpoint.cpp deleted file mode 100644 index e21215cc5..000000000 --- a/icinga/jsonrpcendpoint.cpp +++ /dev/null @@ -1,148 +0,0 @@ -/****************************************************************************** - * Icinga 2 * - * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * - * * - * This program is free software; you can redistribute it and/or * - * modify it under the terms of the GNU General Public License * - * as published by the Free Software Foundation; either version 2 * - * of the License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU General Public License * - * along with this program; if not, write to the Free Software Foundation * - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * - ******************************************************************************/ - -#include "i2-icinga.h" - -using namespace icinga; - -String JsonRpcEndpoint::GetIdentity(void) const -{ - return m_Identity; -} - -String JsonRpcEndpoint::GetAddress(void) const -{ - if (!m_Client) - return ""; - - return m_Client->GetPeerAddress(); -} - -JsonRpcClient::Ptr JsonRpcEndpoint::GetClient(void) -{ - return m_Client; -} - -void JsonRpcEndpoint::Connect(String node, String service, shared_ptr sslContext) -{ - JsonRpcClient::Ptr client = boost::make_shared(RoleOutbound, sslContext); - SetClient(client); - client->Connect(node, service); - client->Start(); -} - -void JsonRpcEndpoint::SetClient(JsonRpcClient::Ptr client) -{ - m_Client = client; - client->OnNewMessage.connect(boost::bind(&JsonRpcEndpoint::NewMessageHandler, this, _2)); - client->OnClosed.connect(boost::bind(&JsonRpcEndpoint::ClientClosedHandler, this)); - client->OnConnected.connect(boost::bind(&JsonRpcEndpoint::ClientConnectedHandler, this)); -} - -bool JsonRpcEndpoint::IsLocal(void) const -{ - return false; -} - -bool JsonRpcEndpoint::IsConnected(void) const -{ - return (m_Client && m_Client->IsConnected()); -} - -void JsonRpcEndpoint::ProcessRequest(Endpoint::Ptr sender, const RequestMessage& message) -{ - if (IsConnected()) { - m_Client->SendMessage(message); - } else { - // TODO: persist the event - } -} - -void JsonRpcEndpoint::ProcessResponse(Endpoint::Ptr sender, const ResponseMessage& message) -{ - m_Client->SendMessage(message); -} - -void JsonRpcEndpoint::NewMessageHandler(const MessagePart& message) -{ - Endpoint::Ptr sender = GetSelf(); - - if (ResponseMessage::IsResponseMessage(message)) { - /* rather than routing the message to the right virtual - * endpoint we just process it here right away. */ - GetEndpointManager()->ProcessResponseMessage(sender, message); - return; - } - - RequestMessage request = message; - - String method; - if (!request.GetMethod(&method)) - return; - - String id; - if (request.GetID(&id)) - GetEndpointManager()->SendAnycastMessage(sender, request); - else - GetEndpointManager()->SendMulticastMessage(sender, request); -} - -void JsonRpcEndpoint::ClientClosedHandler(void) -{ - try { - m_Client->CheckException(); - } catch (const exception& ex) { - stringstream message; - message << "Error occured for JSON-RPC socket: Message=" << ex.what(); - - Logger::Write(LogWarning, "jsonrpc", message.str()); - } - - Logger::Write(LogWarning, "jsonrpc", "Lost connection to endpoint: identity=" + GetIdentity()); - - // TODO: _only_ clear non-persistent subscriptions - // unregister ourselves if no persistent subscriptions are left (use a timer for that, once we have a TTL property for the topics) - ClearSubscriptions(); - - // remove the endpoint if there are no more subscriptions */ - if (BeginSubscriptions() == EndSubscriptions()) { - Hold(); - GetEndpointManager()->UnregisterEndpoint(GetSelf()); - } - - m_Client.reset(); - - // TODO: persist events, etc., for now we just disable the endpoint -} - -void JsonRpcEndpoint::ClientConnectedHandler(void) -{ - String identity = Utility::GetCertificateCN(m_Client->GetPeerCertificate()); - - if (GetIdentity().IsEmpty() && !identity.IsEmpty()) { - m_Identity = identity; - GetEndpointManager()->RegisterEndpoint(GetSelf()); - } -} - -void JsonRpcEndpoint::Stop(void) -{ - if (m_Client) - m_Client->Close(); -} diff --git a/icinga/jsonrpcendpoint.h b/icinga/jsonrpcendpoint.h deleted file mode 100644 index e685aac30..000000000 --- a/icinga/jsonrpcendpoint.h +++ /dev/null @@ -1,71 +0,0 @@ -/****************************************************************************** - * Icinga 2 * - * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * - * * - * This program is free software; you can redistribute it and/or * - * modify it under the terms of the GNU General Public License * - * as published by the Free Software Foundation; either version 2 * - * of the License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU General Public License * - * along with this program; if not, write to the Free Software Foundation * - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * - ******************************************************************************/ - -#ifndef JSONRPCENDPOINT_H -#define JSONRPCENDPOINT_H - -namespace icinga -{ - -/** - * A JSON-RPC endpoint that can be used to communicate with a remote - * Icinga instance. - * - * @ingroup icinga - */ -class I2_ICINGA_API JsonRpcEndpoint : public Endpoint -{ -public: - typedef shared_ptr Ptr; - typedef weak_ptr WeakPtr; - - void Connect(String node, String service, - shared_ptr sslContext); - - JsonRpcClient::Ptr GetClient(void); - void SetClient(JsonRpcClient::Ptr client); - - virtual String GetIdentity(void) const; - virtual String GetAddress(void) const; - - virtual bool IsLocal(void) const; - virtual bool IsConnected(void) const; - - virtual void ProcessRequest(Endpoint::Ptr sender, const RequestMessage& message); - virtual void ProcessResponse(Endpoint::Ptr sender, const ResponseMessage& message); - - virtual void Stop(void); - -private: - String m_Identity; /**< The identity of this endpoint. */ - - shared_ptr m_SSLContext; - String m_Address; - JsonRpcClient::Ptr m_Client; - - void SetAddress(String address); - - void NewMessageHandler(const MessagePart& message); - void ClientClosedHandler(void); - void ClientConnectedHandler(void); -}; - -} - -#endif /* JSONRPCENDPOINT_H */ diff --git a/icinga/virtualendpoint.cpp b/icinga/virtualendpoint.cpp deleted file mode 100644 index c82bf3f24..000000000 --- a/icinga/virtualendpoint.cpp +++ /dev/null @@ -1,97 +0,0 @@ -/****************************************************************************** - * Icinga 2 * - * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * - * * - * This program is free software; you can redistribute it and/or * - * modify it under the terms of the GNU General Public License * - * as published by the Free Software Foundation; either version 2 * - * of the License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU General Public License * - * along with this program; if not, write to the Free Software Foundation * - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * - ******************************************************************************/ - -#include "i2-icinga.h" - -using namespace icinga; - -String VirtualEndpoint::GetIdentity(void) const -{ - return "__" + GetAddress(); -} - -String VirtualEndpoint::GetAddress(void) const -{ - char address[50]; - sprintf(address, "virtual:%p", (void *)this); - return address; -} - -bool VirtualEndpoint::IsLocal(void) const -{ - return true; -} - -bool VirtualEndpoint::IsConnected(void) const -{ - return true; -} - -void VirtualEndpoint::RegisterTopicHandler(String topic, function callback) -{ - map > >::iterator it; - it = m_TopicHandlers.find(topic); - - shared_ptr > sig; - - if (it == m_TopicHandlers.end()) { - sig = boost::make_shared >(); - m_TopicHandlers.insert(make_pair(topic, sig)); - } else { - sig = it->second; - } - - sig->connect(callback); - - RegisterSubscription(topic); -} - -void VirtualEndpoint::UnregisterTopicHandler(String topic, function callback) -{ - // TODO: implement - //m_TopicHandlers[method] -= callback; - //UnregisterMethodSubscription(method); - - throw_exception(NotImplementedException()); -} - -void VirtualEndpoint::ProcessRequest(Endpoint::Ptr sender, const RequestMessage& request) -{ - String method; - if (!request.GetMethod(&method)) - return; - - map > >::iterator it; - it = m_TopicHandlers.find(method); - - if (it == m_TopicHandlers.end()) - return; - - (*it->second)(GetSelf(), sender, request); -} - -void VirtualEndpoint::ProcessResponse(Endpoint::Ptr sender, const ResponseMessage& response) -{ - GetEndpointManager()->ProcessResponseMessage(sender, response); -} - -void VirtualEndpoint::Stop(void) -{ - /* Nothing to do here. */ -} diff --git a/icinga/virtualendpoint.h b/icinga/virtualendpoint.h deleted file mode 100644 index f81b6f833..000000000 --- a/icinga/virtualendpoint.h +++ /dev/null @@ -1,57 +0,0 @@ -/****************************************************************************** - * Icinga 2 * - * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * - * * - * This program is free software; you can redistribute it and/or * - * modify it under the terms of the GNU General Public License * - * as published by the Free Software Foundation; either version 2 * - * of the License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU General Public License * - * along with this program; if not, write to the Free Software Foundation * - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * - ******************************************************************************/ - -#ifndef VIRTUALENDPOINT_H -#define VIRTUALENDPOINT_H - -namespace icinga -{ - -/** - * A local endpoint. - * - * @ingroup icinga - */ -class I2_ICINGA_API VirtualEndpoint : public Endpoint -{ -public: - typedef shared_ptr Ptr; - typedef weak_ptr WeakPtr; - - void RegisterTopicHandler(String topic, function callback); - void UnregisterTopicHandler(String topic, function callback); - - virtual String GetIdentity(void) const; - virtual String GetAddress(void) const; - - virtual bool IsLocal(void) const; - virtual bool IsConnected(void) const; - - virtual void ProcessRequest(Endpoint::Ptr sender, const RequestMessage& message); - virtual void ProcessResponse(Endpoint::Ptr sender, const ResponseMessage& message); - - virtual void Stop(void); - -private: - map< String, shared_ptr > > m_TopicHandlers; -}; - -} - -#endif /* VIRTUALENDPOINT_H */ diff --git a/jsonrpc/jsonrpcclient.cpp b/jsonrpc/jsonrpcclient.cpp index 29cca0888..496c0f273 100644 --- a/jsonrpc/jsonrpcclient.cpp +++ b/jsonrpc/jsonrpcclient.cpp @@ -41,7 +41,9 @@ JsonRpcClient::JsonRpcClient(TcpClientRole role, shared_ptr sslContext) void JsonRpcClient::SendMessage(const MessagePart& message) { Value value = message.GetDictionary(); - NetString::WriteStringToIOQueue(this, value.Serialize()); + String json = value.Serialize(); + //std::cerr << ">> " << json << std::endl; + NetString::WriteStringToIOQueue(this, json); } /** @@ -52,6 +54,8 @@ void JsonRpcClient::DataAvailableHandler(void) String jsonString; while (NetString::ReadStringFromIOQueue(this, &jsonString)) { + //std::cerr << "<< " << jsonString << std::endl; + try { Value value = Value::Deserialize(jsonString); -- 2.50.1