]> granicus.if.org Git - icinga2/commitdiff
Implemented replication for Endpoint objects.
authorGunnar Beutner <gunnar.beutner@netways.de>
Mon, 3 Sep 2012 08:28:14 +0000 (10:28 +0200)
committerGunnar Beutner <gunnar.beutner@netways.de>
Mon, 3 Sep 2012 08:29:02 +0000 (10:29 +0200)
37 files changed:
base/dictionary.cpp
base/dictionary.h
base/dynamicobject.cpp
base/dynamicobject.h
base/process.cpp
components/Makefile.am
components/checker/checkercomponent.cpp
components/checker/checkercomponent.h
components/cibsync/cibsynccomponent.cpp
components/cibsync/cibsynccomponent.h
components/delegation/delegationcomponent.cpp
components/demo/democomponent.cpp
components/demo/democomponent.h
components/discovery/Makefile.am [deleted file]
components/discovery/discovery.vcxproj [deleted file]
components/discovery/discovery.vcxproj.filters [deleted file]
components/discovery/discoverycomponent.cpp [deleted file]
components/discovery/discoverycomponent.h [deleted file]
components/discovery/discoverymessage.cpp [deleted file]
components/discovery/discoverymessage.h [deleted file]
components/discovery/i2-discovery.h [deleted file]
configure.ac
doc/icinga2-config.odt
icinga-app/Makefile.am
icinga/Makefile.am
icinga/endpoint.cpp
icinga/endpoint.h
icinga/endpointmanager.cpp
icinga/endpointmanager.h
icinga/i2-icinga.h
icinga/icingaapplication.cpp
icinga/icingaapplication.h
icinga/jsonrpcendpoint.cpp [deleted file]
icinga/jsonrpcendpoint.h [deleted file]
icinga/virtualendpoint.cpp [deleted file]
icinga/virtualendpoint.h [deleted file]
jsonrpc/jsonrpcclient.cpp

index 8f42647b1f5d12da5f362cb935c4343a188f47ce..6fcfeaf594fd503b38374050548edb1c36b421a0 100644 (file)
@@ -175,6 +175,24 @@ void Dictionary::Remove(Dictionary::Iterator it)
        m_Data.erase(it);
 }
 
+/**
+ * Makes a shallow copy of a dictionary.
+ *
+ * @returns a copy of the dictionary.
+ */
+Dictionary::Ptr Dictionary::ShallowClone(void) const
+{
+       Dictionary::Ptr clone = boost::make_shared<Dictionary>();
+
+       String key;
+       Value value;
+       BOOST_FOREACH(tie(key, value), m_Data) {
+               clone->Set(key, value);
+       }
+
+       return clone;
+}
+
 /**
  * Converts a JSON object to a dictionary.
  *
index bdfd2af61ad5fcb8ce264fa0b911b1ccd9e33aea..43dbc61c9f1d4703d932c01e5e52a6476172b06e 100644 (file)
@@ -50,6 +50,8 @@ public:
        void Remove(const String& key);
        void Remove(Iterator it);
 
+       Dictionary::Ptr ShallowClone(void) const;
+
        static Dictionary::Ptr FromJson(cJSON *json);
        cJSON *ToJson(void) const;
 
index b7faafccf4381b9c2705e3554f7eeca7bf3c0cf4..60471c2888a60d453ced62621dd6d0c7a283cfaa 100644 (file)
@@ -145,6 +145,11 @@ void DynamicObject::Set(const String& name, const Value& data)
        InternalSetAttribute(name, data, GetCurrentTx());
 }
 
+void DynamicObject::Touch(const String& name)
+{
+       InternalSetAttribute(name, InternalGetAttribute(name), GetCurrentTx());
+}
+
 Value DynamicObject::Get(const String& name) const
 {
        return InternalGetAttribute(name);
index c85a006ca8076d8c10920571805819cd65a06e18..6ff8b77ac7d8b58ba22368dbbe56fb79e9576768 100644 (file)
@@ -79,6 +79,7 @@ public:
        void RegisterAttribute(const String& name, DynamicAttributeType type);
 
        void Set(const String& name, const Value& data);
+       void Touch(const String& name);
        Value Get(const String& name) const;
 
        bool HasAttribute(const String& name) const;
@@ -162,8 +163,11 @@ shared_ptr<T> DynamicObjectFactory(const Dictionary::Ptr& serializedUpdate)
        return boost::make_shared<T>(serializedUpdate);
 }
 
+#define REGISTER_CLASS_ALIAS(klass, alias) \
+       static RegisterClassHelper g_Register ## klass(alias, DynamicObjectFactory<klass>)
+
 #define REGISTER_CLASS(klass) \
-       static RegisterClassHelper g_Register ## klass(#klass, DynamicObjectFactory<klass>)
+       REGISTER_CLASS_ALIAS(klass, #klass)
 
 }
 
index e8abd61f23ace0d7e07022f92672d82e51d06418..1a8e8aed89f13b50fe832003fad17017649db513 100644 (file)
@@ -33,6 +33,8 @@ condition_variable Process::m_TasksCV;
 Process::Process(const String& command)
        : AsyncTask<Process, ProcessResult>(), m_Command(command), m_UsePopen(false)
 {
+       assert(Application::IsMainThread());
+
        if (!m_ThreadCreated) {
                thread t(&Process::WorkerThreadProc);
                t.detach();
index edf2f295266d5083b807873cca3b9c9b78737d9c..7b1c745fb21ab986bea2b35a50a44e3b47ba77d6 100644 (file)
@@ -7,5 +7,4 @@ SUBDIRS = \
        compat \
        convenience \
        delegation \
-       demo \
-       discovery
+       demo
index ed98ee86d5953bca77a6704cf13b8afc1d0ee760..648373735e3708d0145f2f4faf1251a960d90e42 100644 (file)
@@ -23,14 +23,12 @@ using namespace icinga;
 
 void CheckerComponent::Start(void)
 {
-       m_Endpoint = boost::make_shared<VirtualEndpoint>();
+       m_Endpoint = Endpoint::MakeEndpoint("checker", true);
 
        /* dummy registration so the delegation module knows this is a checker
           TODO: figure out a better way for this */
        m_Endpoint->RegisterSubscription("checker");
 
-       EndpointManager::GetInstance()->RegisterEndpoint(m_Endpoint);
-
        Service::OnCheckerChanged.connect(bind(&CheckerComponent::CheckerChangedHandler, this, _1));
        DynamicObject::OnUnregistered.connect(bind(&CheckerComponent::ServiceRemovedHandler, this, _1));
 
@@ -50,10 +48,7 @@ void CheckerComponent::Start(void)
 
 void CheckerComponent::Stop(void)
 {
-       EndpointManager::Ptr mgr = EndpointManager::GetInstance();
-
-       if (mgr)
-               mgr->UnregisterEndpoint(m_Endpoint);
+       m_Endpoint->Unregister();
 }
 
 void CheckerComponent::CheckTimerHandler(void)
@@ -158,7 +153,7 @@ void CheckerComponent::CheckerChangedHandler(const Service::Ptr& service)
 {
        String checker = service->GetChecker();
 
-       if (checker == EndpointManager::GetInstance()->GetIdentity() || checker == m_Endpoint->GetIdentity()) {
+       if (checker == EndpointManager::GetInstance()->GetIdentity() || checker == m_Endpoint->GetName()) {
                if (m_PendingServices.find(service) != m_PendingServices.end())
                        return;
 
index 8e59f61317c6f4a603c7e9f1f576997c871368bc..b54d5d97f9ff2e55a93d9077e2603b385f0c2f0b 100644 (file)
@@ -54,7 +54,7 @@ public:
        virtual void Stop(void);
 
 private:
-       VirtualEndpoint::Ptr m_Endpoint;
+       Endpoint::Ptr m_Endpoint;
 
        ServiceSet m_IdleServices;
        ServiceSet m_PendingServices;
index a44a168fb27b63fe3f14bd59f96b4be8219eee18..ed66ef43e5f333a4afe68370351ab76dbedb6ea4 100644 (file)
@@ -26,18 +26,14 @@ using namespace icinga;
  */
 void CIBSyncComponent::Start(void)
 {
-       m_Endpoint = boost::make_shared<VirtualEndpoint>();
-
-       /* config objects */
-       m_Endpoint->RegisterTopicHandler("config::FetchObjects",
-           boost::bind(&CIBSyncComponent::FetchObjectsHandler, this, _2));
+       m_Endpoint = Endpoint::MakeEndpoint("cibsync", true);
 
        DynamicObject::OnRegistered.connect(boost::bind(&CIBSyncComponent::LocalObjectRegisteredHandler, this, _1));
        DynamicObject::OnUnregistered.connect(boost::bind(&CIBSyncComponent::LocalObjectUnregisteredHandler, this, _1));
        DynamicObject::OnTransactionClosing.connect(boost::bind(&CIBSyncComponent::TransactionClosingHandler, this, _1));
 
-       EndpointManager::GetInstance()->OnNewEndpoint.connect(boost::bind(&CIBSyncComponent::NewEndpointHandler, this, _2));
-
+       Endpoint::OnConnected.connect(boost::bind(&CIBSyncComponent::EndpointConnectedHandler, this, _1));
+       
        m_Endpoint->RegisterTopicHandler("config::ObjectUpdate",
            boost::bind(&CIBSyncComponent::RemoteObjectUpdateHandler, this, _2, _3));
        m_Endpoint->RegisterTopicHandler("config::ObjectRemoved",
@@ -46,8 +42,6 @@ void CIBSyncComponent::Start(void)
        /* service status */
        m_Endpoint->RegisterTopicHandler("checker::ServiceStateChange",
            boost::bind(&CIBSyncComponent::ServiceStateChangeRequestHandler, _2, _3));
-
-       EndpointManager::GetInstance()->RegisterEndpoint(m_Endpoint);
 }
 
 /**
@@ -55,10 +49,7 @@ void CIBSyncComponent::Start(void)
  */
 void CIBSyncComponent::Stop(void)
 {
-       EndpointManager::Ptr endpointManager = EndpointManager::GetInstance();
-
-       if (endpointManager)
-               endpointManager->UnregisterEndpoint(m_Endpoint);
+       m_Endpoint->Unregister();
 }
 
 void CIBSyncComponent::ServiceStateChangeRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request)
@@ -84,21 +75,28 @@ void CIBSyncComponent::ServiceStateChangeRequestHandler(const Endpoint::Ptr& sen
        CIB::UpdateTaskStatistics(now, 1);
 }
 
-void CIBSyncComponent::NewEndpointHandler(const Endpoint::Ptr& endpoint)
+void CIBSyncComponent::EndpointConnectedHandler(const Endpoint::Ptr& endpoint)
 {
        /* no need to sync the config with local endpoints */
-       if (endpoint->IsLocal())
+       if (endpoint->IsLocalEndpoint())
                return;
 
-       endpoint->OnSessionEstablished.connect(boost::bind(&CIBSyncComponent::SessionEstablishedHandler, this, _1));
-}
+       /* we just assume the other endpoint wants object updates */
+       endpoint->RegisterSubscription("config::ObjectUpdate");
+       endpoint->RegisterSubscription("config::ObjectRemoved");
 
-void CIBSyncComponent::SessionEstablishedHandler(const Endpoint::Ptr& endpoint)
-{
-       RequestMessage request;
-       request.SetMethod("config::FetchObjects");
+       pair<DynamicObject::TypeMap::iterator, DynamicObject::TypeMap::iterator> trange = DynamicObject::GetTypes();
+       DynamicObject::TypeMap::iterator tt;
+       for (tt = trange.first; tt != trange.second; tt++) {
+               DynamicObject::Ptr object;
+               BOOST_FOREACH(tie(tuples::ignore, object), tt->second) {
+                       if (!ShouldReplicateObject(object))
+                               continue;
 
-       EndpointManager::GetInstance()->SendUnicastMessage(m_Endpoint, endpoint, request);
+                       RequestMessage request = MakeObjectMessage(object, "config::ObjectUpdate", 0, true);
+                       EndpointManager::GetInstance()->SendUnicastMessage(m_Endpoint, endpoint, request);
+               }
+       }
 }
 
 RequestMessage CIBSyncComponent::MakeObjectMessage(const DynamicObject::Ptr& object, const String& method, double sinceTx, bool includeProperties)
@@ -123,23 +121,6 @@ bool CIBSyncComponent::ShouldReplicateObject(const DynamicObject::Ptr& object)
        return (!object->IsLocal());
 }
 
-void CIBSyncComponent::FetchObjectsHandler(const Endpoint::Ptr& sender)
-{
-       pair<DynamicObject::TypeMap::iterator, DynamicObject::TypeMap::iterator> trange = DynamicObject::GetTypes();
-       DynamicObject::TypeMap::iterator tt;
-       for (tt = trange.first; tt != trange.second; tt++) {
-               DynamicObject::Ptr object;
-               BOOST_FOREACH(tie(tuples::ignore, object), tt->second) {
-                       if (!ShouldReplicateObject(object))
-                               continue;
-
-                       RequestMessage request = MakeObjectMessage(object, "config::ObjectUpdate", 0, true);
-
-                       EndpointManager::GetInstance()->SendUnicastMessage(m_Endpoint, sender, request);
-               }
-       }
-}
-
 void CIBSyncComponent::LocalObjectRegisteredHandler(const DynamicObject::Ptr& object)
 {
        if (!ShouldReplicateObject(object))
@@ -212,7 +193,7 @@ void CIBSyncComponent::RemoteObjectUpdateHandler(const Endpoint::Ptr& sender, co
                }
 
                if (object->GetSource().IsEmpty())
-                       object->SetSource(sender->GetIdentity());
+                       object->SetSource(sender->GetName());
 
                object->Register();
        } else {
index 56456682fe7d273cfd6684ebce47ada4a0e48331..89fae5513ec4c4f87a2f3e82d1e5f92f1a0f3fc2 100644 (file)
@@ -33,18 +33,16 @@ public:
        virtual void Stop(void);
 
 private:
-       VirtualEndpoint::Ptr m_Endpoint;
+       Endpoint::Ptr m_Endpoint;
 
        static void ServiceStateChangeRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request);
 
-       void NewEndpointHandler(const Endpoint::Ptr& endpoint);
-       void SessionEstablishedHandler(const Endpoint::Ptr& endpoint);
+       void EndpointConnectedHandler(const Endpoint::Ptr& endpoint);
 
        void LocalObjectRegisteredHandler(const DynamicObject::Ptr& object);
        void LocalObjectUnregisteredHandler(const DynamicObject::Ptr& object);
        void TransactionClosingHandler(const set<DynamicObject::Ptr>& modifiedObjects);
 
-       void FetchObjectsHandler(const Endpoint::Ptr& sender);
        void RemoteObjectUpdateHandler(const Endpoint::Ptr& sender, const RequestMessage& request);
        void RemoteObjectRemovedHandler(const RequestMessage& request);
 
index 560dac5919d86dfd77630bbe7d4806485db9df6b..7acbccd82e6862441918b19db681a683e58618f3 100644 (file)
@@ -40,9 +40,9 @@ vector<Endpoint::Ptr> DelegationComponent::GetCheckerCandidates(const Service::P
 {
        vector<Endpoint::Ptr> candidates;
 
-       EndpointManager::Iterator it;
-       for (it = EndpointManager::GetInstance()->Begin(); it != EndpointManager::GetInstance()->End(); it++) {
-               Endpoint::Ptr endpoint = it->second;
+       DynamicObject::Ptr object;
+       BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) {
+               Endpoint::Ptr endpoint = dynamic_pointer_cast<Endpoint>(object);
 
                /* ignore disconnected endpoints */
                if (!endpoint->IsConnected())
@@ -53,7 +53,7 @@ vector<Endpoint::Ptr> DelegationComponent::GetCheckerCandidates(const Service::P
                        continue;
 
                /* ignore endpoints that aren't allowed to check this service */
-               if (!service->IsAllowedChecker(it->first))
+               if (!service->IsAllowedChecker(endpoint->GetName()))
                        continue;
 
                candidates.push_back(endpoint);
@@ -66,14 +66,16 @@ void DelegationComponent::DelegationTimerHandler(void)
 {
        map<Endpoint::Ptr, int> histogram;
 
-       EndpointManager::Iterator eit;
-       for (eit = EndpointManager::GetInstance()->Begin(); eit != EndpointManager::GetInstance()->End(); eit++)
-               histogram[eit->second] = 0;
+       DynamicObject::Ptr object;
+       BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) {
+               Endpoint::Ptr endpoint = dynamic_pointer_cast<Endpoint>(object);
+
+               histogram[endpoint] = 0;
+       }
 
        vector<Service::Ptr> services;
 
        /* build "checker -> service count" histogram */
-       DynamicObject::Ptr object;
        BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Service")) {
                Service::Ptr service = dynamic_pointer_cast<Service>(object);
 
@@ -86,10 +88,11 @@ void DelegationComponent::DelegationTimerHandler(void)
                if (checker.IsEmpty())
                        continue;
 
-               Endpoint::Ptr endpoint = EndpointManager::GetInstance()->GetEndpointByIdentity(checker);
-               if (!endpoint)
+               if (!Endpoint::Exists(checker))
                        continue;
 
+               Endpoint::Ptr endpoint = Endpoint::GetByName(checker);
+
                histogram[endpoint]++;
        }
 
@@ -102,8 +105,8 @@ void DelegationComponent::DelegationTimerHandler(void)
                String checker = service->GetChecker();
 
                Endpoint::Ptr oldEndpoint;
-               if (!checker.IsEmpty())
-                       oldEndpoint = EndpointManager::GetInstance()->GetEndpointByIdentity(checker);
+               if (Endpoint::Exists(checker))
+                       oldEndpoint = Endpoint::GetByName(checker);
 
                vector<Endpoint::Ptr> candidates = GetCheckerCandidates(service);
 
@@ -146,7 +149,7 @@ void DelegationComponent::DelegationTimerHandler(void)
                        if (histogram[candidate] > avg_services)
                                continue;
 
-                       service->SetChecker(candidate->GetIdentity());
+                       service->SetChecker(candidate->GetName());
                        histogram[candidate]++;
 
                        delegated++;
@@ -161,7 +164,7 @@ void DelegationComponent::DelegationTimerHandler(void)
        int count;
        BOOST_FOREACH(tie(endpoint, count), histogram) {
                stringstream msgbuf;
-               msgbuf << "histogram: " << endpoint->GetIdentity() << " - " << count;
+               msgbuf << "histogram: " << endpoint->GetName() << " - " << count;
                Logger::Write(LogInformation, "delegation", msgbuf.str());
        }
 
index edeae95c2fad247138b95049f390878b635bf90e..2474ed9b11ae705e26d9e1cbef086fb0c6da2c83 100644 (file)
@@ -26,10 +26,9 @@ using namespace icinga;
  */
 void DemoComponent::Start(void)
 {
-       m_Endpoint = boost::make_shared<VirtualEndpoint>();
+       m_Endpoint = Endpoint::MakeEndpoint("demo", true);
        m_Endpoint->RegisterTopicHandler("demo::HelloWorld",
            boost::bind(&DemoComponent::HelloWorldRequestHandler, this, _2, _3));
-       EndpointManager::GetInstance()->RegisterEndpoint(m_Endpoint);
 
        m_DemoTimer = boost::make_shared<Timer>();
        m_DemoTimer->SetInterval(5);
@@ -42,10 +41,7 @@ void DemoComponent::Start(void)
  */
 void DemoComponent::Stop(void)
 {
-       EndpointManager::Ptr endpointManager = EndpointManager::GetInstance();
-
-       if (endpointManager)
-               endpointManager->UnregisterEndpoint(m_Endpoint);
+       m_Endpoint->Unregister();
 }
 
 /**
@@ -68,7 +64,7 @@ void DemoComponent::DemoTimerHandler(void)
  */
 void DemoComponent::HelloWorldRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request)
 {
-       Logger::Write(LogInformation, "demo", "Got 'hello world' from address=" + sender->GetAddress() + ", identity=" + sender->GetIdentity());
+       Logger::Write(LogInformation, "demo", "Got 'hello world' from address=" + sender->GetAddress() + ", identity=" + sender->GetName());
 }
 
 EXPORT_COMPONENT(demo, DemoComponent);
index 6eb93af5cd57f20858e737801306ef45b71d7a2f..181c8ae09bcf4afec873dde5ee145b34c71a2fca 100644 (file)
@@ -34,7 +34,7 @@ public:
 
 private:
        Timer::Ptr m_DemoTimer;
-       VirtualEndpoint::Ptr m_Endpoint;
+       Endpoint::Ptr m_Endpoint;
 
        void DemoTimerHandler(void);
        void HelloWorldRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request);
diff --git a/components/discovery/Makefile.am b/components/discovery/Makefile.am
deleted file mode 100644 (file)
index 7a41350..0000000
+++ /dev/null
@@ -1,35 +0,0 @@
-## Process this file with automake to produce Makefile.in
-
-pkglib_LTLIBRARIES =  \
-       discovery.la
-
-discovery_la_SOURCES =  \
-       discoverycomponent.cpp \
-       discoverycomponent.h \
-       discoverymessage.cpp \
-       discoverymessage.h \
-       i2-discovery.h
-
-discovery_la_CPPFLAGS = \
-       $(BOOST_CPPFLAGS) \
-       -I${top_srcdir}/base \
-       -I${top_srcdir}/dyn \
-       -I${top_srcdir}/jsonrpc \
-       -I${top_srcdir}/icinga \
-       -I${top_srcdir}/cib
-
-discovery_la_LDFLAGS = \
-       $(BOOST_LDFLAGS) \
-       -module \
-       -no-undefined \
-       @RELEASE_INFO@ \
-       @VERSION_INFO@
-
-discovery_la_LIBADD = \
-       $(BOOST_SIGNALS_LIB) \
-       $(BOOST_THREAD_LIB) \
-       ${top_builddir}/base/libbase.la \
-       ${top_builddir}/dyn/libdyn.la \
-       ${top_builddir}/jsonrpc/libjsonrpc.la \
-       ${top_builddir}/icinga/libicinga.la \
-       ${top_builddir}/cib/libcib.la
diff --git a/components/discovery/discovery.vcxproj b/components/discovery/discovery.vcxproj
deleted file mode 100644 (file)
index ba7b393..0000000
+++ /dev/null
@@ -1,94 +0,0 @@
-<?xml version="1.0" encoding="utf-8"?>
-<Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
-  <ItemGroup Label="ProjectConfigurations">
-    <ProjectConfiguration Include="Debug|Win32">
-      <Configuration>Debug</Configuration>
-      <Platform>Win32</Platform>
-    </ProjectConfiguration>
-    <ProjectConfiguration Include="Release|Win32">
-      <Configuration>Release</Configuration>
-      <Platform>Win32</Platform>
-    </ProjectConfiguration>
-  </ItemGroup>
-  <PropertyGroup Label="Globals">
-    <ProjectGuid>{EAD41628-BB96-4F99-9070-8A9676801295}</ProjectGuid>
-    <Keyword>Win32Proj</Keyword>
-    <RootNamespace>discovery</RootNamespace>
-  </PropertyGroup>
-  <Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
-  <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'" Label="Configuration">
-    <ConfigurationType>DynamicLibrary</ConfigurationType>
-    <UseDebugLibraries>true</UseDebugLibraries>
-    <CharacterSet>MultiByte</CharacterSet>
-  </PropertyGroup>
-  <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'" Label="Configuration">
-    <ConfigurationType>DynamicLibrary</ConfigurationType>
-    <UseDebugLibraries>false</UseDebugLibraries>
-    <WholeProgramOptimization>true</WholeProgramOptimization>
-    <CharacterSet>MultiByte</CharacterSet>
-  </PropertyGroup>
-  <Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" />
-  <ImportGroup Label="ExtensionSettings">
-  </ImportGroup>
-  <ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
-    <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
-  </ImportGroup>
-  <ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
-    <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
-  </ImportGroup>
-  <PropertyGroup Label="UserMacros" />
-  <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
-    <LinkIncremental>true</LinkIncremental>
-    <IncludePath>$(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\icinga;$(SolutionDir)\cib;$(SolutionDir)\dyn;$(IncludePath)</IncludePath>
-    <LibraryPath>$(OutDir);$(LibraryPath)</LibraryPath>
-  </PropertyGroup>
-  <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
-    <LinkIncremental>false</LinkIncremental>
-    <IncludePath>$(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\icinga;$(SolutionDir)\cib;$(SolutionDir)\dyn;$(IncludePath)</IncludePath>
-    <LibraryPath>$(OutDir);$(LibraryPath)</LibraryPath>
-  </PropertyGroup>
-  <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
-    <ClCompile>
-      <Optimization>Disabled</Optimization>
-      <PreprocessorDefinitions>WIN32;_DEBUG;_WINDOWS;_USRDLL;DISCOVERY_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
-      <WarningLevel>Level3</WarningLevel>
-      <MinimalRebuild>false</MinimalRebuild>
-      <MultiProcessorCompilation>true</MultiProcessorCompilation>
-    </ClCompile>
-    <Link>
-      <SubSystem>Windows</SubSystem>
-      <GenerateDebugInformation>true</GenerateDebugInformation>
-      <AdditionalDependencies>base.lib;jsonrpc.lib;icinga.lib;cib.lib;%(AdditionalDependencies)</AdditionalDependencies>
-    </Link>
-  </ItemDefinitionGroup>
-  <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
-    <ClCompile>
-      <Optimization>MaxSpeed</Optimization>
-      <FunctionLevelLinking>true</FunctionLevelLinking>
-      <IntrinsicFunctions>true</IntrinsicFunctions>
-      <PreprocessorDefinitions>WIN32;NDEBUG;_WINDOWS;_USRDLL;DISCOVERY_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
-      <WarningLevel>Level3</WarningLevel>
-      <MinimalRebuild>false</MinimalRebuild>
-      <MultiProcessorCompilation>true</MultiProcessorCompilation>
-    </ClCompile>
-    <Link>
-      <SubSystem>Windows</SubSystem>
-      <GenerateDebugInformation>true</GenerateDebugInformation>
-      <EnableCOMDATFolding>true</EnableCOMDATFolding>
-      <OptimizeReferences>true</OptimizeReferences>
-      <AdditionalDependencies>base.lib;jsonrpc.lib;icinga.lib;cib.lib;%(AdditionalDependencies)</AdditionalDependencies>
-    </Link>
-  </ItemDefinitionGroup>
-  <ItemGroup>
-    <ClCompile Include="discoverycomponent.cpp" />
-    <ClCompile Include="discoverymessage.cpp" />
-  </ItemGroup>
-  <ItemGroup>
-    <ClInclude Include="discoverycomponent.h" />
-    <ClInclude Include="discoverymessage.h" />
-    <ClInclude Include="i2-discovery.h" />
-  </ItemGroup>
-  <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
-  <ImportGroup Label="ExtensionTargets">
-  </ImportGroup>
-</Project>
\ No newline at end of file
diff --git a/components/discovery/discovery.vcxproj.filters b/components/discovery/discovery.vcxproj.filters
deleted file mode 100644 (file)
index a356d8a..0000000
+++ /dev/null
@@ -1,30 +0,0 @@
-<?xml version="1.0" encoding="utf-8"?>
-<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
-  <ItemGroup>
-    <ClCompile Include="discoverycomponent.cpp">
-      <Filter>Quelldateien</Filter>
-    </ClCompile>
-    <ClCompile Include="discoverymessage.cpp">
-      <Filter>Quelldateien</Filter>
-    </ClCompile>
-  </ItemGroup>
-  <ItemGroup>
-    <ClInclude Include="discoverycomponent.h">
-      <Filter>Headerdateien</Filter>
-    </ClInclude>
-    <ClInclude Include="i2-discovery.h">
-      <Filter>Headerdateien</Filter>
-    </ClInclude>
-    <ClInclude Include="discoverymessage.h">
-      <Filter>Headerdateien</Filter>
-    </ClInclude>
-  </ItemGroup>
-  <ItemGroup>
-    <Filter Include="Headerdateien">
-      <UniqueIdentifier>{53341f7e-6bad-4cf1-92cf-be906efe1704}</UniqueIdentifier>
-    </Filter>
-    <Filter Include="Quelldateien">
-      <UniqueIdentifier>{c7b2deba-743b-4449-ae46-0b7ba1b1350a}</UniqueIdentifier>
-    </Filter>
-  </ItemGroup>
-</Project>
\ No newline at end of file
diff --git a/components/discovery/discoverycomponent.cpp b/components/discovery/discoverycomponent.cpp
deleted file mode 100644 (file)
index cdbfddb..0000000
+++ /dev/null
@@ -1,448 +0,0 @@
-/******************************************************************************
- * Icinga 2                                                                   *
- * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/)        *
- *                                                                            *
- * This program is free software; you can redistribute it and/or              *
- * modify it under the terms of the GNU General Public License                *
- * as published by the Free Software Foundation; either version 2             *
- * of the License, or (at your option) any later version.                     *
- *                                                                            *
- * This program is distributed in the hope that it will be useful,            *
- * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
- * GNU General Public License for more details.                               *
- *                                                                            *
- * You should have received a copy of the GNU General Public License          *
- * along with this program; if not, write to the Free Software Foundation     *
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
- ******************************************************************************/
-
-#include "i2-discovery.h"
-
-using namespace icinga;
-
-/**
- * Starts the discovery component.
- */
-void DiscoveryComponent::Start(void)
-{
-       m_Endpoint = boost::make_shared<VirtualEndpoint>();
-
-       m_Endpoint->RegisterTopicHandler("discovery::RegisterComponent",
-               boost::bind(&DiscoveryComponent::RegisterComponentMessageHandler, this, _2, _3));
-
-       m_Endpoint->RegisterTopicHandler("discovery::NewComponent",
-               boost::bind(&DiscoveryComponent::NewComponentMessageHandler, this, _3));
-
-       m_Endpoint->RegisterTopicHandler("discovery::Welcome",
-               boost::bind(&DiscoveryComponent::WelcomeMessageHandler, this, _2, _3));
-
-       EndpointManager::GetInstance()->ForEachEndpoint(boost::bind(&DiscoveryComponent::NewEndpointHandler, this, _2));
-       EndpointManager::GetInstance()->OnNewEndpoint.connect(boost::bind(&DiscoveryComponent::NewEndpointHandler, this, _2));
-
-       EndpointManager::GetInstance()->RegisterEndpoint(m_Endpoint);
-
-       /* create the reconnect timer */
-       m_DiscoveryTimer = boost::make_shared<Timer>();
-       m_DiscoveryTimer->SetInterval(30);
-       m_DiscoveryTimer->OnTimerExpired.connect(boost::bind(&DiscoveryComponent::DiscoveryTimerHandler, this));
-       m_DiscoveryTimer->Start();
-
-       /* call the timer as soon as possible */
-       m_DiscoveryTimer->Reschedule(0);
-}
-
-/**
- * Stops the discovery component.
- */
-void DiscoveryComponent::Stop(void)
-{
-       EndpointManager::Ptr mgr = EndpointManager::GetInstance();
-
-       if (mgr)
-               mgr->UnregisterEndpoint(m_Endpoint);
-}
-
-/**
- * Checks whether the specified endpoint is already connected
- * and disconnects older endpoints.
- *
- * @param self The endpoint that is to be checked.
- * @param other The other endpoint.
- */
-void DiscoveryComponent::CheckExistingEndpoint(const Endpoint::Ptr& self, const Endpoint::Ptr& other)
-{
-       if (self == other)
-               return;
-
-       if (!other->IsConnected())
-               return;
-
-       if (self->GetIdentity() == other->GetIdentity()) {
-               Logger::Write(LogWarning, "discovery", "Detected duplicate identity:" + other->GetIdentity() + " - Disconnecting old endpoint.");
-
-               other->Stop();
-               EndpointManager::GetInstance()->UnregisterEndpoint(other);
-       }
-}
-
-/**
- * Deals with a new endpoint.
- *
- * @param endpoint The endpoint.
- */
-void DiscoveryComponent::NewEndpointHandler(const Endpoint::Ptr& endpoint)
-{
-       /* immediately finish session setup for local endpoints */
-       if (endpoint->IsLocal()) {
-               endpoint->OnSessionEstablished(endpoint);
-               return;
-       }
-
-       String identity = endpoint->GetIdentity();
-
-       if (identity == EndpointManager::GetInstance()->GetIdentity()) {
-               Logger::Write(LogWarning, "discovery", "Detected loop-back connection - Disconnecting endpoint.");
-
-               endpoint->Stop();
-               EndpointManager::GetInstance()->UnregisterEndpoint(endpoint);
-
-               return;
-       }
-
-       EndpointManager::GetInstance()->ForEachEndpoint(boost::bind(&DiscoveryComponent::CheckExistingEndpoint, this, endpoint, _2));
-
-       // we assume the other component _always_ wants
-       // discovery::RegisterComponent messages from us
-       endpoint->RegisterSubscription("discovery::RegisterComponent");
-
-       // send a discovery::RegisterComponent message, if the
-       // other component is a broker this makes sure
-       // the broker knows about our message types
-       SendDiscoveryMessage("discovery::RegisterComponent", EndpointManager::GetInstance()->GetIdentity(), endpoint);
-
-       map<String, ComponentDiscoveryInfo::Ptr>::iterator ic;
-
-       // we assume the other component _always_ wants
-       // discovery::NewComponent messages from us
-       endpoint->RegisterSubscription("discovery::NewComponent");
-
-       // send discovery::NewComponent message for ourselves
-       SendDiscoveryMessage("discovery::NewComponent", EndpointManager::GetInstance()->GetIdentity(), endpoint);
-
-       // send discovery::NewComponent messages for all components
-       // we know about
-       for (ic = m_Components.begin(); ic != m_Components.end(); ic++) {
-               SendDiscoveryMessage("discovery::NewComponent", ic->first, endpoint);
-       }
-
-       // check if we already know the other component
-       ic = m_Components.find(endpoint->GetIdentity());
-
-       if (ic == m_Components.end()) {
-               // we don't know the other component yet, so
-               // wait until we get a discovery::NewComponent message
-               // from a broker
-               return;
-       }
-
-       // register published/subscribed topics for this endpoint
-       ComponentDiscoveryInfo::Ptr info = ic->second;
-       BOOST_FOREACH(String subscription, info->Subscriptions) {
-               endpoint->RegisterSubscription(subscription);
-       }
-
-       FinishDiscoverySetup(endpoint);
-}
-
-/**
- * Registers message Subscriptions/sources in the specified component information object.
- *
- * @param neea Event arguments for the endpoint.
- * @param info Component information object.
- * @return 0
- */
-void DiscoveryComponent::DiscoveryEndpointHandler(const Endpoint::Ptr& endpoint, const ComponentDiscoveryInfo::Ptr& info) const
-{
-       Endpoint::ConstTopicIterator i;
-
-       for (i = endpoint->BeginSubscriptions(); i != endpoint->EndSubscriptions(); i++)
-               info->Subscriptions.insert(*i);
-}
-
-/**
- * Retrieves the component information object for the specified component.
- *
- * @param component The identity of the component.
- * @param info Pointer to the information object.
- * @returns true if the info object was successfully retrieved, false otherwise.
- */
-bool DiscoveryComponent::GetComponentDiscoveryInfo(String component, ComponentDiscoveryInfo::Ptr *info) const
-{
-       if (component == EndpointManager::GetInstance()->GetIdentity()) {
-               /* Build fake discovery info for ourselves */
-               *info = boost::make_shared<ComponentDiscoveryInfo>();
-               EndpointManager::GetInstance()->ForEachEndpoint(boost::bind(&DiscoveryComponent::DiscoveryEndpointHandler, this, _2, *info));
-               
-               (*info)->LastSeen = 0;
-               (*info)->Node = IcingaApplication::GetInstance()->GetNode();
-               (*info)->Service = IcingaApplication::GetInstance()->GetService();
-
-               return true;
-       }
-
-       map<String, ComponentDiscoveryInfo::Ptr>::const_iterator i;
-
-       i = m_Components.find(component);
-
-       if (i == m_Components.end())
-               return false;
-
-       *info = i->second;
-       return true;
-}
-
-/**
- * Processes discovery::Welcome messages.
- *
- * @param nrea Event arguments for the request.
- * @returns 0
- */
-void DiscoveryComponent::WelcomeMessageHandler(const Endpoint::Ptr& sender, const RequestMessage& request)
-{
-       if (sender->HasReceivedWelcome())
-               return;
-
-       sender->SetReceivedWelcome(true);
-
-       if (sender->HasSentWelcome())
-               sender->OnSessionEstablished(sender);
-}
-
-/**
- * Finishes the welcome handshake for a new component
- * by registering message Subscriptions/sources for the component
- * and sending a welcome message if necessary.
- *
- * @param endpoint The endpoint to set up.
- */
-void DiscoveryComponent::FinishDiscoverySetup(const Endpoint::Ptr& endpoint)
-{
-       if (endpoint->HasSentWelcome())
-               return;
-
-       // we assume the other component _always_ wants
-       // discovery::Welcome messages from us
-       endpoint->RegisterSubscription("discovery::Welcome");
-       RequestMessage request;
-       request.SetMethod("discovery::Welcome");
-       EndpointManager::GetInstance()->SendUnicastMessage(m_Endpoint, endpoint, request);
-
-       endpoint->SetSentWelcome(true);
-
-       if (endpoint->HasReceivedWelcome())
-               endpoint->OnSessionEstablished(endpoint);
-}
-
-/**
- * Sends a discovery message for the specified identity using the
- * specified message type.
- *
- * @param method The method to use for the message ("discovery::NewComponent" or "discovery::RegisterComponent").
- * @param identity The identity of the component for which a message should be sent.
- * @param recipient The recipient of the message. A multicast message is sent if this parameter is empty.
- */
-void DiscoveryComponent::SendDiscoveryMessage(const String& method, const String& identity, const Endpoint::Ptr& recipient)
-{
-       RequestMessage request;
-       request.SetMethod(method);
-       
-       DiscoveryMessage params;
-       request.SetParams(params);
-
-       params.SetIdentity(identity);
-
-       ComponentDiscoveryInfo::Ptr info;
-
-       if (!GetComponentDiscoveryInfo(identity, &info))
-               return;
-
-       if (!info->Node.IsEmpty() && !info->Service.IsEmpty()) {
-               params.SetNode(info->Node);
-               params.SetService(info->Service);
-       }
-
-       set<String>::iterator i;
-       Dictionary::Ptr subscriptions = boost::make_shared<Dictionary>();
-       BOOST_FOREACH(String subscription, info->Subscriptions) {
-               subscriptions->Add(subscription);
-       }
-
-       params.SetSubscriptions(subscriptions);
-
-       if (recipient)
-               EndpointManager::GetInstance()->SendUnicastMessage(m_Endpoint, recipient, request);
-       else
-               EndpointManager::GetInstance()->SendMulticastMessage(m_Endpoint, request);
-}
-
-/**
- * Processes a discovery message by registering the component in the
- * discovery component registry.
- *
- * @param identity The authorative identity of the component.
- * @param message The discovery message.
- * @param trusted Whether the message comes from a trusted source (i.e. a broker).
- */
-void DiscoveryComponent::ProcessDiscoveryMessage(const String& identity, const DiscoveryMessage& message, bool trusted)
-{
-       /* ignore discovery messages that are about ourselves */
-       if (identity == EndpointManager::GetInstance()->GetIdentity())
-               return;
-
-       ComponentDiscoveryInfo::Ptr info = boost::make_shared<ComponentDiscoveryInfo>();
-
-       info->LastSeen = Utility::GetTime();
-
-       String node;
-       if (message.GetNode(&node) && !node.IsEmpty())
-               info->Node = node;
-
-       String service;
-       if (message.GetService(&service) && !service.IsEmpty())
-               info->Service = service;
-
-       DynamicObject::Ptr endpointConfig = DynamicObject::GetObject("Endpoint", identity);
-       Dictionary::Ptr roles;
-       if (endpointConfig)
-               roles = endpointConfig->Get("roles");
-
-       Endpoint::Ptr endpoint = EndpointManager::GetInstance()->GetEndpointByIdentity(identity);
-
-       Dictionary::Ptr subscriptions;
-       if (message.GetSubscriptions(&subscriptions)) {
-               Value subscription;
-               BOOST_FOREACH(tie(tuples::ignore, subscription), subscriptions) {
-                       info->Subscriptions.insert(subscription);
-                       if (endpoint)
-                               endpoint->RegisterSubscription(subscription);
-               }
-       }
-
-       map<String, ComponentDiscoveryInfo::Ptr>::iterator i;
-
-       i = m_Components.find(identity);
-
-       if (i != m_Components.end())
-               m_Components.erase(i);
-
-       m_Components[identity] = info;
-
-       SendDiscoveryMessage("discovery::NewComponent", identity, Endpoint::Ptr());
-
-       /* don't send a welcome message for discovery::NewComponent messages */
-       if (endpoint && !trusted)
-               FinishDiscoverySetup(endpoint);
-}
-
-/**
- * Processes "discovery::NewComponent" messages.
- *
- * @param nrea Event arguments for the request.
- */
-void DiscoveryComponent::NewComponentMessageHandler(const RequestMessage& request)
-{
-       DiscoveryMessage message;
-       request.GetParams(&message);
-
-       String identity;
-       if (!message.GetIdentity(&identity))
-               return;
-
-       ProcessDiscoveryMessage(identity, message, true);
-}
-
-/**
- * Processes "discovery::RegisterComponent" messages.
- *
- * @param nrea Event arguments for the request.
- */
-void DiscoveryComponent::RegisterComponentMessageHandler(const Endpoint::Ptr& sender, const RequestMessage& request)
-{
-       DiscoveryMessage message;
-       request.GetParams(&message);
-       ProcessDiscoveryMessage(sender->GetIdentity(), message, false);
-}
-
-/**
- * Checks whether we have to reconnect to other components and removes stale
- * components from the registry.
- */
-void DiscoveryComponent::DiscoveryTimerHandler(void)
-{
-       EndpointManager::Ptr endpointManager = EndpointManager::GetInstance();
-       
-       double now = Utility::GetTime();
-
-       /* check whether we have to reconnect to one of our upstream endpoints */
-       DynamicObject::Ptr object;
-       BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) {
-               /* Check if we're already connected to this endpoint. */
-               if (endpointManager->GetEndpointByIdentity(object->GetName()))
-                       continue;
-
-               String node = object->Get("node");
-               String service = object->Get("service");
-               if (!node.IsEmpty() && !service.IsEmpty()) {
-                       /* reconnect to this endpoint */
-                       endpointManager->AddConnection(node, service);
-               }
-       }
-
-       map<String, ComponentDiscoveryInfo::Ptr>::iterator curr, i;
-       for (i = m_Components.begin(); i != m_Components.end(); ) {
-               const String& identity = i->first;
-               const ComponentDiscoveryInfo::Ptr& info = i->second;
-
-               curr = i;
-               i++;
-
-               /* there's no need to reconnect to ourself */
-               if (identity == EndpointManager::GetInstance()->GetIdentity())
-                       continue;
-
-               /* for explicitly-configured upstream endpoints
-                * we prefer to use the node/service from the
-                * config object - which is what the for loop above does */
-               if (DynamicObject::GetObject("endpoint", identity))
-                       continue;
-
-               if (info->LastSeen < now - DiscoveryComponent::RegistrationTTL) {
-                       /* unregister this component if its registration has expired */
-                       m_Components.erase(curr);
-                       continue;
-               }
-
-               /* send discovery message to all connected components to
-                       refresh their TTL for this component */
-               SendDiscoveryMessage("discovery::NewComponent", identity, Endpoint::Ptr());
-
-               Endpoint::Ptr endpoint = endpointManager->GetEndpointByIdentity(identity);
-               if (endpoint && endpoint->IsConnected()) {
-                       /* update LastSeen if we're still connected to this endpoint */
-                       info->LastSeen = now;
-               } else {
-                       /* try and reconnect to this component */
-                       try {
-                               if (!info->Node.IsEmpty() && !info->Service.IsEmpty())
-                                       endpointManager->AddConnection(info->Node, info->Service);
-                       } catch (const exception& ex) {
-                               stringstream msgbuf;
-                               msgbuf << "Exception while trying to reconnect to endpoint '" << endpoint->GetIdentity() << "': " << ex.what();;
-                               Logger::Write(LogInformation, "discovery", msgbuf.str());
-                       }
-               }
-       }
-}
-
-EXPORT_COMPONENT(discovery, DiscoveryComponent);
diff --git a/components/discovery/discoverycomponent.h b/components/discovery/discoverycomponent.h
deleted file mode 100644 (file)
index 1160a58..0000000
+++ /dev/null
@@ -1,82 +0,0 @@
-/******************************************************************************
- * Icinga 2                                                                   *
- * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/)        *
- *                                                                            *
- * This program is free software; you can redistribute it and/or              *
- * modify it under the terms of the GNU General Public License                *
- * as published by the Free Software Foundation; either version 2             *
- * of the License, or (at your option) any later version.                     *
- *                                                                            *
- * This program is distributed in the hope that it will be useful,            *
- * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
- * GNU General Public License for more details.                               *
- *                                                                            *
- * You should have received a copy of the GNU General Public License          *
- * along with this program; if not, write to the Free Software Foundation     *
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
- ******************************************************************************/
-
-#ifndef DISCOVERYCOMPONENT_H
-#define DISCOVERYCOMPONENT_H
-
-namespace icinga
-{
-
-/**
- * @ingroup discovery
- */
-class ComponentDiscoveryInfo : public Object
-{
-public:
-       typedef shared_ptr<ComponentDiscoveryInfo> Ptr;
-       typedef weak_ptr<ComponentDiscoveryInfo> WeakPtr;
-
-       String Node;
-       String Service;
-
-       set<String> Subscriptions;
-       set<String> Publications;
-
-       double LastSeen;
-};
-
-/**
- * @ingroup discovery
- */
-class DiscoveryComponent : public IComponent
-{
-public:
-       virtual void Start(void);
-       virtual void Stop(void);
-
-private:
-       VirtualEndpoint::Ptr m_Endpoint;
-       map<String, ComponentDiscoveryInfo::Ptr> m_Components;
-       Timer::Ptr m_DiscoveryTimer;
-
-       void NewEndpointHandler(const Endpoint::Ptr& endpoint);
-
-       void NewComponentMessageHandler(const RequestMessage& request);
-       void RegisterComponentMessageHandler(const Endpoint::Ptr& sender, const RequestMessage& request);
-
-       void WelcomeMessageHandler(const Endpoint::Ptr& sender, const RequestMessage& request);
-
-       void SendDiscoveryMessage(const String& method, const String& identity, const Endpoint::Ptr& recipient);
-       void ProcessDiscoveryMessage(const String& identity, const DiscoveryMessage& message, bool trusted);
-
-       bool GetComponentDiscoveryInfo(String component, ComponentDiscoveryInfo::Ptr *info) const;
-
-       void CheckExistingEndpoint(const Endpoint::Ptr& self, const Endpoint::Ptr& other);
-       void DiscoveryEndpointHandler(const Endpoint::Ptr& endpoint, const ComponentDiscoveryInfo::Ptr& info) const;
-
-       void DiscoveryTimerHandler(void);
-
-       void FinishDiscoverySetup(const Endpoint::Ptr& endpoint);
-
-       static const int RegistrationTTL = 300;
-};
-
-}
-
-#endif /* DISCOVERYCOMPONENT_H */
diff --git a/components/discovery/discoverymessage.cpp b/components/discovery/discoverymessage.cpp
deleted file mode 100644 (file)
index fde9df4..0000000
+++ /dev/null
@@ -1,71 +0,0 @@
-/******************************************************************************
- * Icinga 2                                                                   *
- * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/)        *
- *                                                                            *
- * This program is free software; you can redistribute it and/or              *
- * modify it under the terms of the GNU General Public License                *
- * as published by the Free Software Foundation; either version 2             *
- * of the License, or (at your option) any later version.                     *
- *                                                                            *
- * This program is distributed in the hope that it will be useful,            *
- * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
- * GNU General Public License for more details.                               *
- *                                                                            *
- * You should have received a copy of the GNU General Public License          *
- * along with this program; if not, write to the Free Software Foundation     *
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
- ******************************************************************************/
-
-#include "i2-discovery.h"
-
-using namespace icinga;
-
-DiscoveryMessage::DiscoveryMessage(void)
-       : MessagePart()
-{ }
-
-DiscoveryMessage::DiscoveryMessage(const MessagePart& message)
-       : MessagePart(message)
-{ }
-
-bool DiscoveryMessage::GetIdentity(String *value) const
-{
-       return Get("identity", value);
-}
-
-void DiscoveryMessage::SetIdentity(const String& value)
-{
-       Set("identity", value);
-}
-
-bool DiscoveryMessage::GetNode(String *value) const
-{
-       return Get("node", value);
-}
-
-void DiscoveryMessage::SetNode(const String& value)
-{
-       Set("node", value);
-}
-
-bool DiscoveryMessage::GetService(String *value) const
-{
-       return Get("service", value);
-}
-
-void DiscoveryMessage::SetService(const String& value)
-{
-       Set("service", value);
-}
-
-bool DiscoveryMessage::GetSubscriptions(Dictionary::Ptr *value) const
-{
-       return Get("subscriptions", value);
-}
-
-void DiscoveryMessage::SetSubscriptions(const Dictionary::Ptr& value)
-{
-       Set("subscriptions", value);
-}
-
diff --git a/components/discovery/discoverymessage.h b/components/discovery/discoverymessage.h
deleted file mode 100644 (file)
index 6f984b4..0000000
+++ /dev/null
@@ -1,50 +0,0 @@
-/******************************************************************************
- * Icinga 2                                                                   *
- * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/)        *
- *                                                                            *
- * This program is free software; you can redistribute it and/or              *
- * modify it under the terms of the GNU General Public License                *
- * as published by the Free Software Foundation; either version 2             *
- * of the License, or (at your option) any later version.                     *
- *                                                                            *
- * This program is distributed in the hope that it will be useful,            *
- * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
- * GNU General Public License for more details.                               *
- *                                                                            *
- * You should have received a copy of the GNU General Public License          *
- * along with this program; if not, write to the Free Software Foundation     *
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
- ******************************************************************************/
-
-#ifndef DISCOVERYMESSAGE_H
-#define DISCOVERYMESSAGE_H
-
-namespace icinga
-{
-
-/**
- * @ingroup discovery
- */
-class DiscoveryMessage : public MessagePart
-{
-public:
-       DiscoveryMessage(void);
-       DiscoveryMessage(const MessagePart& message);
-
-       bool GetIdentity(String *value) const;
-       void SetIdentity(const String& value);
-
-       bool GetNode(String *value) const;
-       void SetNode(const String& value);
-
-       bool GetService(String *value) const;
-       void SetService(const String& value);
-
-       bool GetSubscriptions(Dictionary::Ptr *value) const;
-       void SetSubscriptions(const Dictionary::Ptr& value);
-};
-
-}
-
-#endif /* SUBSCRIPTIONMESSAGE_H */
diff --git a/components/discovery/i2-discovery.h b/components/discovery/i2-discovery.h
deleted file mode 100644 (file)
index 8d1139b..0000000
+++ /dev/null
@@ -1,38 +0,0 @@
-/******************************************************************************
- * Icinga 2                                                                   *
- * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/)        *
- *                                                                            *
- * This program is free software; you can redistribute it and/or              *
- * modify it under the terms of the GNU General Public License                *
- * as published by the Free Software Foundation; either version 2             *
- * of the License, or (at your option) any later version.                     *
- *                                                                            *
- * This program is distributed in the hope that it will be useful,            *
- * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
- * GNU General Public License for more details.                               *
- *                                                                            *
- * You should have received a copy of the GNU General Public License          *
- * along with this program; if not, write to the Free Software Foundation     *
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
- ******************************************************************************/
-
-#ifndef I2DISCOVERY_H
-#define I2DISCOVERY_H
-
-/**
- * @defgroup discovery Discovery component
- *
- * The Discovery component takes care of connecting peers to each other
- * and performs authorisation checks for the message subscriptions.
- */
-
-#include <i2-base.h>
-#include <i2-jsonrpc.h>
-#include <i2-icinga.h>
-#include <i2-cib.h>
-
-#include "discoverymessage.h"
-#include "discoverycomponent.h"
-
-#endif /* I2DISCOVERY_H */
index c9c39f6bd8edf7c511cf39609a0880e399ce7a5c..859abae8fdde3ef8c5139557be1f272695ea20cf 100644 (file)
@@ -76,7 +76,6 @@ components/compat/Makefile
 components/convenience/Makefile
 components/delegation/Makefile
 components/demo/Makefile
-components/discovery/Makefile
 dyn/Makefile
 icinga/Makefile
 icinga-app/Makefile
index d9d449c3fa16057341199b41f6032fe52e10ecb0..8b93cea751fdcc0e0aff36dfbf770ef0fe71fb55 100644 (file)
Binary files a/doc/icinga2-config.odt and b/doc/icinga2-config.odt differ
index 8fb735efc09dc13684d171452db0fa111fcd6bf0..dbf9c06a4257713af9b25fe52c296abe82d5df05 100644 (file)
@@ -33,8 +33,7 @@ icinga_LDADD = \
        -dlopen ${top_builddir}/components/compat/compat.la \
        -dlopen ${top_builddir}/components/convenience/convenience.la \
        -dlopen ${top_builddir}/components/delegation/delegation.la \
-       -dlopen ${top_builddir}/components/demo/demo.la \
-       -dlopen ${top_builddir}/components/discovery/discovery.la
+       -dlopen ${top_builddir}/components/demo/demo.la
 
 icinga_DEPENDENCIES = \
        ${top_builddir}/components/cibsync/cibsync.la \
index 744f808256a2b063bcdf3c48289d831fc22f74b6..91952ff2fe3f0d7c2501a6104d2de0dc93222bff 100644 (file)
@@ -11,11 +11,7 @@ libicinga_la_SOURCES =  \
        endpointmanager.h \
        icingaapplication.cpp \
        icingaapplication.h \
-       i2-icinga.h \
-       jsonrpcendpoint.cpp \
-       jsonrpcendpoint.h \
-       virtualendpoint.cpp \
-       virtualendpoint.h
+       i2-icinga.h
 
 libicinga_la_CPPFLAGS = \
        -DI2_ICINGA_BUILD \
index 1209963dff7bb0128730132deb4f40c8eaef0fd2..701a240cadbf0724e8bca70ea583bfffac51d973 100644 (file)
 
 using namespace icinga;
 
+REGISTER_CLASS(Endpoint);
+
+boost::signal<void (const Endpoint::Ptr&)> Endpoint::OnConnected;
+boost::signal<void (const Endpoint::Ptr&)> Endpoint::OnDisconnected;
+boost::signal<void (const Endpoint::Ptr&, const String& topic)> Endpoint::OnSubscriptionRegistered;
+boost::signal<void (const Endpoint::Ptr&, const String& topic)> Endpoint::OnSubscriptionUnregistered;
+
+Endpoint::Endpoint(const Dictionary::Ptr& serializedUpdate)
+       : DynamicObject(serializedUpdate)
+{
+       RegisterAttribute("node", Attribute_Replicated);
+       RegisterAttribute("service", Attribute_Replicated);
+       RegisterAttribute("local", Attribute_Config);
+       RegisterAttribute("subscriptions", Attribute_Replicated);
+       RegisterAttribute("client", Attribute_Transient);
+}
+
+bool Endpoint::Exists(const String& name)
+{
+       return (DynamicObject::GetObject("Endpoint", name));
+}
+
+Endpoint::Ptr Endpoint::GetByName(const String& name)
+{
+        DynamicObject::Ptr configObject = DynamicObject::GetObject("Endpoint", name);
+
+        if (!configObject)
+                throw_exception(invalid_argument("Endpoint '" + name + "' does not exist."));
+
+        return dynamic_pointer_cast<Endpoint>(configObject);
+}
+
+Endpoint::Ptr Endpoint::MakeEndpoint(const String& name, bool local)
+{
+       ConfigItemBuilder::Ptr endpointConfig = boost::make_shared<ConfigItemBuilder>();
+       endpointConfig->SetType("Endpoint");
+       endpointConfig->SetName(local ? "local:" + name : name);
+       endpointConfig->SetLocal(local ? 1 : 0);
+       endpointConfig->AddExpression("local", OperatorSet, local);
+
+       DynamicObject::Ptr object = endpointConfig->Compile()->Commit();
+       return dynamic_pointer_cast<Endpoint>(object);
+}
+
+/**
+ * Checks whether this is a local endpoint.
+ *
+ * @returns true if this is a local endpoint, false otherwise.
+ */
+bool Endpoint::IsLocalEndpoint(void) const
+{
+       Value value = Get("local");
+
+       return (!value.IsEmpty() && value);
+}
+
 /**
- * Retrieves the endpoint manager this endpoint is registered with.
+ * Checks whether this endpoint is connected.
  *
- * @returns The EndpointManager object.
+ * @returns true if the endpoint is connected, false otherwise.
  */
-EndpointManager::Ptr Endpoint::GetEndpointManager(void) const
+bool Endpoint::IsConnected(void) const
 {
-       return m_EndpointManager.lock();
+       if (IsLocalEndpoint()) {
+               return true;
+       } else {
+               JsonRpcClient::Ptr client = GetClient();
+
+               return (client && client->IsConnected());
+       }
 }
 
 /**
- * Sets the endpoint manager this endpoint is registered with.
+ * Retrieves the address for the endpoint.
  *
- * @param manager The EndpointManager object.
+ * @returns The endpoint's address.
  */
-void Endpoint::SetEndpointManager(EndpointManager::WeakPtr manager)
+String Endpoint::GetAddress(void) const
+{
+       if (IsLocalEndpoint()) {
+               return "local:" + GetName();
+       } else {
+               JsonRpcClient::Ptr client = GetClient();
+
+               if (!client)
+                       return "<disconnected endpoint>";
+
+               return client->GetPeerAddress();
+       }
+}
+
+JsonRpcClient::Ptr Endpoint::GetClient(void) const
 {
-       m_EndpointManager = manager;
+       return Get("client");
+}
+
+void Endpoint::SetClient(const JsonRpcClient::Ptr& client)
+{
+       Set("client", client);
+       client->OnNewMessage.connect(boost::bind(&Endpoint::NewMessageHandler, this, _2));
+       client->OnClosed.connect(boost::bind(&Endpoint::ClientClosedHandler, this));
+
+       OnConnected(GetSelf());
 }
 
 /**
@@ -46,9 +131,18 @@ void Endpoint::SetEndpointManager(EndpointManager::WeakPtr manager)
  *
  * @param topic The name of the topic.
  */
-void Endpoint::RegisterSubscription(String topic)
+void Endpoint::RegisterSubscription(const String& topic)
 {
-       m_Subscriptions.insert(topic);
+       Dictionary::Ptr subscriptions = GetSubscriptions();
+
+       if (!subscriptions)
+               subscriptions = boost::make_shared<Dictionary>();
+
+       if (!subscriptions->Contains(topic)) {
+               Dictionary::Ptr newSubscriptions = subscriptions->ShallowClone();
+               newSubscriptions->Set(topic, topic);
+               SetSubscriptions(newSubscriptions);
+       }
 }
 
 /**
@@ -56,9 +150,15 @@ void Endpoint::RegisterSubscription(String topic)
  *
  * @param topic The name of the topic.
  */
-void Endpoint::UnregisterSubscription(String topic)
+void Endpoint::UnregisterSubscription(const String& topic)
 {
-       m_Subscriptions.erase(topic);
+       Dictionary::Ptr subscriptions = GetSubscriptions();
+
+       if (subscriptions && subscriptions->Contains(topic)) {
+               Dictionary::Ptr newSubscriptions = subscriptions->ShallowClone();
+               newSubscriptions->Remove(topic);
+               SetSubscriptions(newSubscriptions);
+       }
 }
 
 /**
@@ -67,9 +167,11 @@ void Endpoint::UnregisterSubscription(String topic)
  * @param topic The name of the topic.
  * @returns true if the endpoint is subscribed to the topic, false otherwise.
  */
-bool Endpoint::HasSubscription(String topic) const
+bool Endpoint::HasSubscription(const String& topic) const
 {
-       return (m_Subscriptions.find(topic) != m_Subscriptions.end());
+       Dictionary::Ptr subscriptions = GetSubscriptions();
+
+       return (subscriptions && subscriptions->Contains(topic));
 }
 
 /**
@@ -77,65 +179,168 @@ bool Endpoint::HasSubscription(String topic) const
  */
 void Endpoint::ClearSubscriptions(void)
 {
-       m_Subscriptions.clear();
+       Set("subscriptions", Empty);
 }
 
-/**
- * Returns the beginning of the subscriptions list.
- *
- * @returns An iterator that points to the first subscription.
- */
-Endpoint::ConstTopicIterator Endpoint::BeginSubscriptions(void) const
+Dictionary::Ptr Endpoint::GetSubscriptions(void) const
 {
-       return m_Subscriptions.begin();
+       return Get("subscriptions");
 }
 
-/**
- * Returns the end of the subscriptions list.
- *
- * @returns An iterator that points past the last subscription.
- */
-Endpoint::ConstTopicIterator Endpoint::EndSubscriptions(void) const
+void Endpoint::SetSubscriptions(const Dictionary::Ptr& subscriptions)
 {
-       return m_Subscriptions.end();
+       Set("subscriptions", subscriptions);
 }
 
-/**
- * Sets whether a welcome message has been received from this endpoint.
- *
- * @param value Whether we've received a welcome message.
- */
-void Endpoint::SetReceivedWelcome(bool value)
+void Endpoint::RegisterTopicHandler(const String& topic, const function<Endpoint::Callback>& callback)
 {
-       m_ReceivedWelcome = value;
+       map<String, shared_ptr<boost::signal<Endpoint::Callback> > >::iterator it;
+       it = m_TopicHandlers.find(topic);
+
+       shared_ptr<boost::signal<Endpoint::Callback> > sig;
+
+       if (it == m_TopicHandlers.end()) {
+               sig = boost::make_shared<boost::signal<Endpoint::Callback> >();
+               m_TopicHandlers.insert(make_pair(topic, sig));
+       } else {
+               sig = it->second;
+       }
+
+       sig->connect(callback);
+
+       RegisterSubscription(topic);
 }
 
-/**
- * Retrieves whether a welcome message has been received from this endpoint.
- *
- * @returns Whether we've received a welcome message.
- */
-bool Endpoint::HasReceivedWelcome(void) const
+void Endpoint::UnregisterTopicHandler(const String& topic, const function<Endpoint::Callback>& callback)
 {
-       return m_ReceivedWelcome;
+       // TODO: implement
+       //m_TopicHandlers[method] -= callback;
+       //UnregisterSubscription(method);
+
+       throw_exception(NotImplementedException());
 }
 
-/**
- * Sets whether a welcome message has been sent to this endpoint.
- *
- * @param value Whether we've sent a welcome message.
- */
-void Endpoint::SetSentWelcome(bool value)
+void Endpoint::OnAttributeChanged(const String& name, const Value& oldValue)
 {
-       m_SentWelcome = value;
+       if (name == "subscriptions") {
+               Dictionary::Ptr oldSubscriptions, newSubscriptions;
+
+               if (oldValue.IsObjectType<Dictionary>())
+                       oldSubscriptions = oldValue;
+
+               newSubscriptions = GetSubscriptions();
+
+               if (oldSubscriptions) {
+                       String subscription;
+                       BOOST_FOREACH(tie(tuples::ignore, subscription), oldSubscriptions) {
+                               if (!newSubscriptions || !newSubscriptions->Contains(subscription)) {
+                                       Logger::Write(LogInformation, "icinga", "Removed subscription for '" + GetName() + "': " + subscription);
+                                       OnSubscriptionUnregistered(GetSelf(), subscription);
+                               }
+                       }
+               }
+
+               if (newSubscriptions) {
+                       String subscription;
+                       BOOST_FOREACH(tie(tuples::ignore, subscription), newSubscriptions) {
+                               if (!oldSubscriptions || !oldSubscriptions->Contains(subscription)) {
+                                       Logger::Write(LogInformation, "icinga", "New subscription for '" + GetName() + "': " + subscription);
+                                       OnSubscriptionRegistered(GetSelf(), subscription);
+                               }
+                       }
+               }
+       }
 }
 
-/**
- * Retrieves whether a welcome message has been sent to this endpoint.
- *
- * @returns Whether we've sent a welcome message.
- */
-bool Endpoint::HasSentWelcome(void) const
+void Endpoint::ProcessRequest(const Endpoint::Ptr& sender, const RequestMessage& request)
 {
-       return m_SentWelcome;
+       if (!IsConnected()) {
+               // TODO: persist the message
+               return;
+       }
+
+       if (IsLocalEndpoint()) {
+               String method;
+               if (!request.GetMethod(&method))
+                       return;
+
+               map<String, shared_ptr<boost::signal<Endpoint::Callback> > >::iterator it;
+               it = m_TopicHandlers.find(method);
+
+               if (it == m_TopicHandlers.end())
+                       return;
+
+               (*it->second)(GetSelf(), sender, request);
+       } else {
+               GetClient()->SendMessage(request);
+       }
 }
+
+void Endpoint::ProcessResponse(const Endpoint::Ptr& sender, const ResponseMessage& response)
+{
+       if (!IsConnected())
+               return;
+
+       if (IsLocalEndpoint())
+               EndpointManager::GetInstance()->ProcessResponseMessage(sender, response);
+       else
+               GetClient()->SendMessage(response);
+}
+
+void Endpoint::NewMessageHandler(const MessagePart& message)
+{
+       Endpoint::Ptr sender = GetSelf();
+
+       if (ResponseMessage::IsResponseMessage(message)) {
+               /* rather than routing the message to the right virtual
+                * endpoint we just process it here right away. */
+               EndpointManager::GetInstance()->ProcessResponseMessage(sender, message);
+               return;
+       }
+
+       RequestMessage request = message;
+
+       String method;
+       if (!request.GetMethod(&method))
+               return;
+
+       String id;
+       if (request.GetID(&id))
+               EndpointManager::GetInstance()->SendAnycastMessage(sender, request);
+       else
+               EndpointManager::GetInstance()->SendMulticastMessage(sender, request);
+}
+
+void Endpoint::ClientClosedHandler(void)
+{
+       try {
+               GetClient()->CheckException();
+       } catch (const exception& ex) {
+               stringstream message;
+               message << "Error occured for JSON-RPC socket: Message=" << ex.what();
+
+               Logger::Write(LogWarning, "jsonrpc", message.str());
+       }
+
+       Logger::Write(LogWarning, "jsonrpc", "Lost connection to endpoint: identity=" + GetName());
+
+       // TODO: _only_ clear non-persistent subscriptions
+       // unregister ourselves if no persistent subscriptions are left (use a
+       // timer for that, once we have a TTL property for the topics)
+       ClearSubscriptions();
+
+       Set("client", Empty);
+
+       OnDisconnected(GetSelf());
+}
+
+String Endpoint::GetNode(void) const
+{
+       return Get("node");
+}
+
+String Endpoint::GetService(void) const
+{
+       return Get("service");
+}
+
index f6586bbe311f09869500d6063bb7e37d0cca09e1..d2799e823e47764fdf78febabde8d69451b05cfc 100644 (file)
@@ -30,60 +30,65 @@ class EndpointManager;
  *
  * @ingroup icinga
  */
-class I2_ICINGA_API Endpoint : public Object
+class I2_ICINGA_API Endpoint : public DynamicObject
 {
 public:
        typedef shared_ptr<Endpoint> Ptr;
        typedef weak_ptr<Endpoint> WeakPtr;
 
-       typedef set<String>::const_iterator ConstTopicIterator;
+       typedef void (Callback)(const Endpoint::Ptr&, const Endpoint::Ptr&, const RequestMessage&);
 
-       Endpoint(void)
-               : m_ReceivedWelcome(false), m_SentWelcome(false)
-       { }
+       Endpoint(const Dictionary::Ptr& serializedUpdate);
 
-       virtual String GetIdentity(void) const = 0;
-       virtual String GetAddress(void) const = 0;
+       static bool Exists(const String& name);
+       static Endpoint::Ptr GetByName(const String& name);
 
-       void SetReceivedWelcome(bool value);
-       bool HasReceivedWelcome(void) const;
+       String GetAddress(void) const;
 
-       void SetSentWelcome(bool value);
-       bool HasSentWelcome(void) const;
+       JsonRpcClient::Ptr GetClient(void) const;
+       void SetClient(const JsonRpcClient::Ptr& client);
 
-       shared_ptr<EndpointManager> GetEndpointManager(void) const;
-       void SetEndpointManager(weak_ptr<EndpointManager> manager);
+       void RegisterSubscription(const String& topic);
+       void UnregisterSubscription(const String& topic);
+       bool HasSubscription(const String& topic) const;
 
-       void RegisterSubscription(String topic);
-       void UnregisterSubscription(String topic);
-       bool HasSubscription(String topic) const;
+       Dictionary::Ptr GetSubscriptions(void) const;
+       void SetSubscriptions(const Dictionary::Ptr& subscriptions);
 
-       virtual bool IsLocal(void) const = 0;
-       virtual bool IsConnected(void) const = 0;
+       bool IsLocalEndpoint(void) const;
+       bool IsConnected(void) const;
 
-       virtual void ProcessRequest(Endpoint::Ptr sender, const RequestMessage& message) = 0;
-       virtual void ProcessResponse(Endpoint::Ptr sender, const ResponseMessage& message) = 0;
-
-       virtual void Stop(void) = 0;
+       void ProcessRequest(const Endpoint::Ptr& sender, const RequestMessage& message);
+       void ProcessResponse(const Endpoint::Ptr& sender, const ResponseMessage& message);
 
        void ClearSubscriptions(void);
 
-       ConstTopicIterator BeginSubscriptions(void) const;
-       ConstTopicIterator EndSubscriptions(void) const;
+       void RegisterTopicHandler(const String& topic, const function<Callback>& callback);
+       void UnregisterTopicHandler(const String& topic, const function<Callback>& callback);
+
+       virtual void OnAttributeChanged(const String& name, const Value& oldValue);
+
+       String GetNode(void) const;
+       String GetService(void) const;
 
-       boost::signal<void (const Endpoint::Ptr&)> OnSessionEstablished;
+       static Endpoint::Ptr MakeEndpoint(const String& name, bool local);
+
+       static boost::signal<void (const Endpoint::Ptr&)> OnConnected;
+       static boost::signal<void (const Endpoint::Ptr&)> OnDisconnected;
+
+       static boost::signal<void (const Endpoint::Ptr&, const String& topic)> OnSubscriptionRegistered;
+       static boost::signal<void (const Endpoint::Ptr&, const String& topic)> OnSubscriptionUnregistered;
 
 private:
-       set<String> m_Subscriptions; /**< The topics this endpoint is
-                                         subscribed to. */
        bool m_ReceivedWelcome; /**< Have we received a welcome message
                                     from this endpoint? */
        bool m_SentWelcome; /**< Have we sent a welcome message to this
                                 endpoint? */
 
-       weak_ptr<EndpointManager> m_EndpointManager; /**< The endpoint manager
-                                                         this endpoint is
-                                                         registered with. */
+       map<String, shared_ptr<boost::signal<Callback> > > m_TopicHandlers;
+
+       void NewMessageHandler(const MessagePart& message);
+       void ClientClosedHandler(void);
 };
 
 }
index 1e7dc3a4a21dbdb657a7b05dc02a6765f1957902..4bbef5110733a8cbd3ebf7dbd554e9b1d8d21a93 100644 (file)
@@ -31,6 +31,16 @@ EndpointManager::EndpointManager(void)
        m_RequestTimer->OnTimerExpired.connect(boost::bind(&EndpointManager::RequestTimerHandler, this));
        m_RequestTimer->SetInterval(5);
        m_RequestTimer->Start();
+
+       m_SubscriptionTimer = boost::make_shared<Timer>();
+       m_SubscriptionTimer->OnTimerExpired.connect(boost::bind(&EndpointManager::SubscriptionTimerHandler, this));
+       m_SubscriptionTimer->SetInterval(10);
+       m_SubscriptionTimer->Start();
+
+       m_ReconnectTimer = boost::make_shared<Timer>();
+       m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&EndpointManager::ReconnectTimerHandler, this));
+       m_ReconnectTimer->SetInterval(10);
+       m_ReconnectTimer->Start();
 }
 
 /**
@@ -42,6 +52,16 @@ EndpointManager::EndpointManager(void)
 void EndpointManager::SetIdentity(const String& identity)
 {
        m_Identity = identity;
+
+       if (m_Endpoint)
+               m_Endpoint->Unregister();
+
+       DynamicObject::Ptr object = DynamicObject::GetObject("Endpoint", identity);
+
+       if (object)
+               m_Endpoint = dynamic_pointer_cast<Endpoint>(object);
+       else
+               m_Endpoint = Endpoint::MakeEndpoint(identity, false);
 }
 
 /**
@@ -54,26 +74,6 @@ String EndpointManager::GetIdentity(void) const
        return m_Identity;
 }
 
-/**
- * Sets the SSL context that is used for remote connections.
- *
- * @param sslContext The new SSL context.
- */
-void EndpointManager::SetSSLContext(const shared_ptr<SSL_CTX>& sslContext)
-{
-       m_SSLContext = sslContext;
-}
-
-/**
- * Retrieves the SSL context that is used for remote connections.
- *
- * @returns The SSL context.
- */
-shared_ptr<SSL_CTX> EndpointManager::GetSSLContext(void) const
-{
-       return m_SSLContext;
-}
-
 /**
  * Creates a new JSON-RPC listener on the specified port.
  *
@@ -81,15 +81,20 @@ shared_ptr<SSL_CTX> EndpointManager::GetSSLContext(void) const
  */
 void EndpointManager::AddListener(const String& service)
 {
-       if (!GetSSLContext())
+       shared_ptr<SSL_CTX> sslContext = IcingaApplication::GetInstance()->GetSSLContext();
+
+       if (!sslContext)
                throw_exception(logic_error("SSL context is required for AddListener()"));
 
        stringstream s;
        s << "Adding new listener: port " << service;
        Logger::Write(LogInformation, "icinga", s.str());
 
-       JsonRpcServer::Ptr server = boost::make_shared<JsonRpcServer>(m_SSLContext);
-       RegisterServer(server);
+       JsonRpcServer::Ptr server = boost::make_shared<JsonRpcServer>(sslContext);
+
+       m_Servers.insert(server);
+       server->OnNewClient.connect(boost::bind(&EndpointManager::NewClientHandler,
+          this, _2));
 
        server->Bind(service, AF_INET6);
        server->Listen();
@@ -102,107 +107,47 @@ void EndpointManager::AddListener(const String& service)
  * @param node The remote host.
  * @param service The remote port.
  */
-void EndpointManager::AddConnection(const String& node, const String& service)
-{
-       stringstream s;
-       s << "Adding new endpoint: [" << node << "]:" << service;
-       Logger::Write(LogInformation, "icinga", s.str());
-
-       JsonRpcEndpoint::Ptr endpoint = boost::make_shared<JsonRpcEndpoint>();
-       RegisterEndpoint(endpoint);
-       endpoint->Connect(node, service, m_SSLContext);
-}
-
-/**
- * Registers a new JSON-RPC server with this endpoint manager.
- *
- * @param server The JSON-RPC server.
- */
-void EndpointManager::RegisterServer(const JsonRpcServer::Ptr& server)
-{
-       m_Servers.push_back(server);
-       server->OnNewClient.connect(boost::bind(&EndpointManager::NewClientHandler,
-           this, _2));
+void EndpointManager::AddConnection(const String& node, const String& service) {
+       JsonRpcClient::Ptr client = boost::make_shared<JsonRpcClient>(RoleOutbound,
+           IcingaApplication::GetInstance()->GetSSLContext());
+       client->Connect(node, service);
+       NewClientHandler(client);
 }
 
 /**
  * Processes a new client connection.
  *
- * @param ncea Event arguments.
+ * @param client The new client.
  */
 void EndpointManager::NewClientHandler(const TcpClient::Ptr& client)
 {
-       Logger::Write(LogInformation, "icinga", "Accepted new client from " + client->GetPeerAddress());
+       JsonRpcClient::Ptr jclient = static_pointer_cast<JsonRpcClient>(client);
 
-       JsonRpcEndpoint::Ptr endpoint = boost::make_shared<JsonRpcEndpoint>();
-       endpoint->SetClient(static_pointer_cast<JsonRpcClient>(client));
-       client->Start();
-       RegisterEndpoint(endpoint);
-}
+       Logger::Write(LogInformation, "icinga", "New client connection from " + jclient->GetPeerAddress());
 
-/**
- * Unregisters a JSON-RPC server.
- *
- * @param server The JSON-RPC server.
- */
-void EndpointManager::UnregisterServer(const JsonRpcServer::Ptr& server)
-{
-       m_Servers.erase(
-           remove(m_Servers.begin(), m_Servers.end(), server),
-           m_Servers.end());
-       // TODO: unbind event
+       m_PendingClients.insert(jclient);
+       jclient->OnConnected.connect(boost::bind(&EndpointManager::ClientConnectedHandler, this, _1));
+       jclient->Start();
 }
 
-/**
- * Registers a new endpoint with this endpoint manager.
- *
- * @param endpoint The new endpoint.
- */
-void EndpointManager::RegisterEndpoint(const Endpoint::Ptr& endpoint)
+void EndpointManager::ClientConnectedHandler(const TcpClient::Ptr& client)
 {
-       endpoint->SetEndpointManager(GetSelf());
+       JsonRpcClient::Ptr jclient = static_pointer_cast<JsonRpcClient>(client);
 
-       UnregisterEndpoint(endpoint);
+       m_PendingClients.erase(jclient);
 
-       String identity = endpoint->GetIdentity();
+       shared_ptr<X509> cert = jclient->GetPeerCertificate();
 
-       if (!identity.IsEmpty()) {
-               m_Endpoints[identity] = endpoint;
-               OnNewEndpoint(GetSelf(), endpoint);
-       } else {
-               m_PendingEndpoints.push_back(endpoint);
-       }
+       String identity = Utility::GetCertificateCN(cert);
 
-       if (endpoint->IsLocal()) {
-               /* this endpoint might have introduced new subscriptions
-                * or publications which affect remote endpoints, we need
-                * to close all fully-connected remote endpoints to make sure
-                * these subscriptions/publications are kept up-to-date. */
-               Iterator prev, it;
-               for (it = m_Endpoints.begin(); it != m_Endpoints.end(); ) {
-                       prev = it;
-                       it++;
-
-                       if (!prev->second->IsLocal())
-                               m_Endpoints.erase(prev);
-               }
-       }
-}
+       Endpoint::Ptr endpoint;
 
-/**
- * Unregisters an endpoint.
- *
- * @param endpoint The endpoint.
- */
-void EndpointManager::UnregisterEndpoint(const Endpoint::Ptr& endpoint)
-{
-       m_PendingEndpoints.erase(
-           remove(m_PendingEndpoints.begin(), m_PendingEndpoints.end(), endpoint),
-           m_PendingEndpoints.end());
+       if (Endpoint::Exists(identity))
+               endpoint = Endpoint::GetByName(identity);
+       else
+               endpoint = Endpoint::MakeEndpoint(identity, false);
 
-       String identity = endpoint->GetIdentity();
-       if (!identity.IsEmpty())
-               m_Endpoints.erase(identity);
+       endpoint->SetClient(jclient);
 }
 
 /**
@@ -240,8 +185,9 @@ void EndpointManager::SendAnycastMessage(const Endpoint::Ptr& sender,
                throw_exception(invalid_argument("Message is missing the 'method' property."));
 
        vector<Endpoint::Ptr> candidates;
-       Endpoint::Ptr endpoint;
-       BOOST_FOREACH(tie(tuples::ignore, endpoint), m_Endpoints) {
+       DynamicObject::Ptr object;
+       BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) {
+               Endpoint::Ptr endpoint = dynamic_pointer_cast<Endpoint>(object);
                /* don't forward messages between non-local endpoints */
                if (!sender->IsLocal() && !endpoint->IsLocal())
                        continue;
@@ -275,8 +221,10 @@ void EndpointManager::SendMulticastMessage(const Endpoint::Ptr& sender,
        if (!message.GetMethod(&method))
                throw_exception(invalid_argument("Message is missing the 'method' property."));
 
-       Endpoint::Ptr recipient;
-       BOOST_FOREACH(tie(tuples::ignore, recipient), m_Endpoints) {
+       DynamicObject::Ptr object;
+       BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) {
+               Endpoint::Ptr recipient = dynamic_pointer_cast<Endpoint>(object);
+
                /* don't forward messages back to the sender */
                if (sender == recipient)
                        continue;
@@ -291,31 +239,16 @@ void EndpointManager::SendMulticastMessage(const Endpoint::Ptr& sender,
  *
  * @param callback The callback function.
  */
-void EndpointManager::ForEachEndpoint(function<void (const EndpointManager::Ptr&, const Endpoint::Ptr&)> callback)
-{
-       map<String, Endpoint::Ptr>::iterator prev, i;
-       for (i = m_Endpoints.begin(); i != m_Endpoints.end(); ) {
-               prev = i;
-               i++;
-
-               callback(GetSelf(), prev->second);
-       }
-}
-
-/**
- * Retrieves an endpoint that has the specified identity.
- *
- * @param identity The identity of the endpoint.
- */
-Endpoint::Ptr EndpointManager::GetEndpointByIdentity(const String& identity) const
-{
-       map<String, Endpoint::Ptr>::const_iterator i;
-       i = m_Endpoints.find(identity);
-       if (i != m_Endpoints.end())
-               return i->second;
-       else
-               return Endpoint::Ptr();
-}
+//void EndpointManager::ForEachEndpoint(function<void (const EndpointManager::Ptr&, const Endpoint::Ptr&)> callback)
+//{
+//     map<String, Endpoint::Ptr>::iterator prev, i;
+//     for (i = m_Endpoints.begin(); i != m_Endpoints.end(); ) {
+//             prev = i;
+//             i++;
+//
+//             callback(GetSelf(), prev->second);
+//     }
+//}
 
 void EndpointManager::SendAPIMessage(const Endpoint::Ptr& sender, const Endpoint::Ptr& recipient,
     RequestMessage& message,
@@ -348,6 +281,46 @@ bool EndpointManager::RequestTimeoutLessComparer(const pair<String, PendingReque
        return a.second.Timeout < b.second.Timeout;
 }
 
+void EndpointManager::SubscriptionTimerHandler(void)
+{
+       Dictionary::Ptr subscriptions = boost::make_shared<Dictionary>();
+
+       DynamicObject::Ptr object;
+       BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) {
+               Endpoint::Ptr endpoint = dynamic_pointer_cast<Endpoint>(object);
+
+               if (!endpoint->IsLocalEndpoint())
+                       continue;
+
+               String topic;
+               BOOST_FOREACH(tie(tuples::ignore, topic), endpoint->GetSubscriptions()) {
+                       subscriptions->Set(topic, topic);
+               }
+       }
+
+       m_Endpoint->SetSubscriptions(subscriptions);
+}
+
+void EndpointManager::ReconnectTimerHandler(void)
+{
+       DynamicObject::Ptr object;
+       BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) {
+               Endpoint::Ptr endpoint = dynamic_pointer_cast<Endpoint>(object);
+
+               if (endpoint->IsConnected())
+                       continue;
+
+               String node, service;
+               node = endpoint->GetNode();
+               service = endpoint->GetService();
+
+               if (node.IsEmpty() || service.IsEmpty())
+                       continue;
+
+               AddConnection(node, service);
+       }
+}
+
 void EndpointManager::RequestTimerHandler(void)
 {
        map<String, PendingRequest>::iterator it;
@@ -379,15 +352,15 @@ void EndpointManager::ProcessResponseMessage(const Endpoint::Ptr& sender, const
        m_Requests.erase(it);
 }
 
-EndpointManager::Iterator EndpointManager::Begin(void)
-{
-       return m_Endpoints.begin();
-}
+//EndpointManager::Iterator EndpointManager::Begin(void)
+//{
+//     return m_Endpoints.begin();
+//}
 
-EndpointManager::Iterator EndpointManager::End(void)
-{
-       return m_Endpoints.end();
-}
+//EndpointManager::Iterator EndpointManager::End(void)
+//{
+//     return m_Endpoints.end();
+//}
 
 EndpointManager::Ptr EndpointManager::GetInstance(void)
 {
index d8dd41f742198d3d955963f4d510d51f0f983a6a..fe57d74d5104ff27711a58ebab05241c6e9906f1 100644 (file)
@@ -34,7 +34,7 @@ public:
        typedef shared_ptr<EndpointManager> Ptr;
        typedef weak_ptr<EndpointManager> WeakPtr;
 
-       typedef map<String, Endpoint::Ptr>::iterator Iterator;
+//     typedef map<String, Endpoint::Ptr>::iterator Iterator;
 
        EndpointManager(void);
 
@@ -49,9 +49,6 @@ public:
        void AddListener(const String& service);
        void AddConnection(const String& node, const String& service);
 
-       void RegisterEndpoint(const Endpoint::Ptr& endpoint);
-       void UnregisterEndpoint(const Endpoint::Ptr& endpoint);
-
        void SendUnicastMessage(const Endpoint::Ptr& sender, const Endpoint::Ptr& recipient, const MessagePart& message);
        void SendAnycastMessage(const Endpoint::Ptr& sender, const RequestMessage& message);
        void SendMulticastMessage(const Endpoint::Ptr& sender, const RequestMessage& message);
@@ -61,21 +58,22 @@ public:
 
        void ProcessResponseMessage(const Endpoint::Ptr& sender, const ResponseMessage& message);
 
-       void ForEachEndpoint(function<void (const EndpointManager::Ptr&, const Endpoint::Ptr&)> callback);
-       Iterator Begin(void);
-       Iterator End(void);
-
-       Endpoint::Ptr GetEndpointByIdentity(const String& identity) const;
+//     void ForEachEndpoint(function<void (const EndpointManager::Ptr&, const Endpoint::Ptr&)> callback);
+//     Iterator Begin(void);
+//     Iterator End(void);
 
        boost::signal<void (const EndpointManager::Ptr&, const Endpoint::Ptr&)> OnNewEndpoint;
 
 private:
        String m_Identity;
-       shared_ptr<SSL_CTX> m_SSLContext;
+       Endpoint::Ptr m_Endpoint;
+
+       Timer::Ptr m_SubscriptionTimer;
+
+       Timer::Ptr m_ReconnectTimer;
 
-       vector<JsonRpcServer::Ptr> m_Servers;
-       vector<Endpoint::Ptr> m_PendingEndpoints;
-       map<String, Endpoint::Ptr> m_Endpoints;
+       set<JsonRpcServer::Ptr> m_Servers;
+       set<JsonRpcClient::Ptr> m_PendingClients;
 
        /**
         * Information about a pending API request.
@@ -98,13 +96,15 @@ private:
        map<String, PendingRequest> m_Requests;
        Timer::Ptr m_RequestTimer;
 
-       void RegisterServer(const JsonRpcServer::Ptr& server);
-       void UnregisterServer(const JsonRpcServer::Ptr& server);
-
        static bool RequestTimeoutLessComparer(const pair<String, PendingRequest>& a, const pair<String, PendingRequest>& b);
        void RequestTimerHandler(void);
 
+       void SubscriptionTimerHandler(void);
+
+       void ReconnectTimerHandler(void);
+
        void NewClientHandler(const TcpClient::Ptr& client);
+       void ClientConnectedHandler(const TcpClient::Ptr& client);
 };
 
 }
index 67dc8c27a2c4f317de9b8bf4f758b22fd0585a8a..85637333a35d15f2019dce9f1b936962cf877d8a 100644 (file)
@@ -42,8 +42,6 @@ using boost::algorithm::is_any_of;
 #endif /* I2_ICINGA_BUILD */
 
 #include "endpoint.h"
-#include "jsonrpcendpoint.h"
-#include "virtualendpoint.h"
 #include "endpointmanager.h"
 #include "icingaapplication.h"
 
index 330e28d36f9e23dabf85a8a0e9a1d4abe7cc8d76..9f65bd9c16fe05c9d4e47421ced480416f874f9c 100644 (file)
@@ -133,8 +133,7 @@ int IcingaApplication::Main(const vector<String>& args)
                Logger::Write(LogInformation, "icinga", "My identity: " + identity);
                EndpointManager::GetInstance()->SetIdentity(identity);
 
-               shared_ptr<SSL_CTX> sslContext = Utility::MakeSSLContext(GetCertificateFile(), GetCertificateFile(), GetCAFile());
-               EndpointManager::GetInstance()->SetSSLContext(sslContext);
+               m_SSLContext = Utility::MakeSSLContext(GetCertificateFile(), GetCertificateFile(), GetCAFile());
        }
 
        /* create the primary RPC listener */
@@ -215,3 +214,8 @@ double IcingaApplication::GetStartTime(void) const
 {
        return m_StartTime;
 }
+
+shared_ptr<SSL_CTX> IcingaApplication::GetSSLContext(void) const
+{
+       return m_SSLContext;
+}
index 2045e1a4c8ff26866a4e45f04729aa14fa3e2d8f..d1175758b3330f6f4a5edac79edd16ec630ff7a5 100644 (file)
@@ -47,6 +47,7 @@ public:
        String GetPidPath(void) const;
        String GetStatePath(void) const;
        Dictionary::Ptr GetMacros(void) const;
+       shared_ptr<SSL_CTX> GetSSLContext(void) const;
 
        double GetStartTime(void) const;
 
@@ -61,6 +62,7 @@ private:
        String m_PidPath;
        String m_StatePath;
        Dictionary::Ptr m_Macros;
+       shared_ptr<SSL_CTX> m_SSLContext;
 
        double m_StartTime;
 
diff --git a/icinga/jsonrpcendpoint.cpp b/icinga/jsonrpcendpoint.cpp
deleted file mode 100644 (file)
index e21215c..0000000
+++ /dev/null
@@ -1,148 +0,0 @@
-/******************************************************************************
- * Icinga 2                                                                   *
- * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/)        *
- *                                                                            *
- * This program is free software; you can redistribute it and/or              *
- * modify it under the terms of the GNU General Public License                *
- * as published by the Free Software Foundation; either version 2             *
- * of the License, or (at your option) any later version.                     *
- *                                                                            *
- * This program is distributed in the hope that it will be useful,            *
- * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
- * GNU General Public License for more details.                               *
- *                                                                            *
- * You should have received a copy of the GNU General Public License          *
- * along with this program; if not, write to the Free Software Foundation     *
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
- ******************************************************************************/
-
-#include "i2-icinga.h"
-
-using namespace icinga;
-
-String JsonRpcEndpoint::GetIdentity(void) const
-{
-       return m_Identity;
-}
-
-String JsonRpcEndpoint::GetAddress(void) const
-{
-       if (!m_Client)
-               return "<disconnected endpoint>";
-
-       return m_Client->GetPeerAddress();
-}
-
-JsonRpcClient::Ptr JsonRpcEndpoint::GetClient(void)
-{
-       return m_Client;
-}
-
-void JsonRpcEndpoint::Connect(String node, String service, shared_ptr<SSL_CTX> sslContext)
-{
-       JsonRpcClient::Ptr client = boost::make_shared<JsonRpcClient>(RoleOutbound, sslContext);
-       SetClient(client);
-       client->Connect(node, service);
-       client->Start();
-}
-
-void JsonRpcEndpoint::SetClient(JsonRpcClient::Ptr client)
-{
-       m_Client = client;
-       client->OnNewMessage.connect(boost::bind(&JsonRpcEndpoint::NewMessageHandler, this, _2));
-       client->OnClosed.connect(boost::bind(&JsonRpcEndpoint::ClientClosedHandler, this));
-       client->OnConnected.connect(boost::bind(&JsonRpcEndpoint::ClientConnectedHandler, this));
-}
-
-bool JsonRpcEndpoint::IsLocal(void) const
-{
-       return false;
-}
-
-bool JsonRpcEndpoint::IsConnected(void) const
-{
-       return (m_Client && m_Client->IsConnected());
-}
-
-void JsonRpcEndpoint::ProcessRequest(Endpoint::Ptr sender, const RequestMessage& message)
-{
-       if (IsConnected()) {
-               m_Client->SendMessage(message);
-       } else {
-               // TODO: persist the event
-       }
-}
-
-void JsonRpcEndpoint::ProcessResponse(Endpoint::Ptr sender, const ResponseMessage& message)
-{
-       m_Client->SendMessage(message);
-}
-
-void JsonRpcEndpoint::NewMessageHandler(const MessagePart& message)
-{
-       Endpoint::Ptr sender = GetSelf();
-
-       if (ResponseMessage::IsResponseMessage(message)) {
-               /* rather than routing the message to the right virtual
-                * endpoint we just process it here right away. */
-               GetEndpointManager()->ProcessResponseMessage(sender, message);
-               return;
-       }
-
-       RequestMessage request = message;
-
-       String method;
-       if (!request.GetMethod(&method))
-               return;
-
-       String id;
-       if (request.GetID(&id))
-               GetEndpointManager()->SendAnycastMessage(sender, request);
-       else
-               GetEndpointManager()->SendMulticastMessage(sender, request);
-}
-
-void JsonRpcEndpoint::ClientClosedHandler(void)
-{
-       try {
-               m_Client->CheckException();
-       } catch (const exception& ex) {
-               stringstream message;
-               message << "Error occured for JSON-RPC socket: Message=" << ex.what();
-
-               Logger::Write(LogWarning, "jsonrpc", message.str());
-       }
-
-       Logger::Write(LogWarning, "jsonrpc", "Lost connection to endpoint: identity=" + GetIdentity());
-
-       // TODO: _only_ clear non-persistent subscriptions
-       // unregister ourselves if no persistent subscriptions are left (use a timer for that, once we have a TTL property for the topics)
-       ClearSubscriptions();
-
-       // remove the endpoint if there are no more subscriptions */
-       if (BeginSubscriptions() == EndSubscriptions()) {
-               Hold();
-               GetEndpointManager()->UnregisterEndpoint(GetSelf());
-       }
-
-       m_Client.reset();
-
-       // TODO: persist events, etc., for now we just disable the endpoint
-}
-
-void JsonRpcEndpoint::ClientConnectedHandler(void)
-{
-       String identity = Utility::GetCertificateCN(m_Client->GetPeerCertificate());
-
-       if (GetIdentity().IsEmpty() && !identity.IsEmpty()) {
-               m_Identity = identity;
-               GetEndpointManager()->RegisterEndpoint(GetSelf());
-       }
-}
-
-void JsonRpcEndpoint::Stop(void)
-{
-       if (m_Client)
-               m_Client->Close();
-}
diff --git a/icinga/jsonrpcendpoint.h b/icinga/jsonrpcendpoint.h
deleted file mode 100644 (file)
index e685aac..0000000
+++ /dev/null
@@ -1,71 +0,0 @@
-/******************************************************************************
- * Icinga 2                                                                   *
- * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/)        *
- *                                                                            *
- * This program is free software; you can redistribute it and/or              *
- * modify it under the terms of the GNU General Public License                *
- * as published by the Free Software Foundation; either version 2             *
- * of the License, or (at your option) any later version.                     *
- *                                                                            *
- * This program is distributed in the hope that it will be useful,            *
- * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
- * GNU General Public License for more details.                               *
- *                                                                            *
- * You should have received a copy of the GNU General Public License          *
- * along with this program; if not, write to the Free Software Foundation     *
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
- ******************************************************************************/
-
-#ifndef JSONRPCENDPOINT_H
-#define JSONRPCENDPOINT_H
-
-namespace icinga
-{
-
-/**
- * A JSON-RPC endpoint that can be used to communicate with a remote
- * Icinga instance.
- *
- * @ingroup icinga
- */
-class I2_ICINGA_API JsonRpcEndpoint : public Endpoint
-{
-public:
-       typedef shared_ptr<JsonRpcEndpoint> Ptr;
-       typedef weak_ptr<JsonRpcEndpoint> WeakPtr;
-
-       void Connect(String node, String service,
-           shared_ptr<SSL_CTX> sslContext);
-
-       JsonRpcClient::Ptr GetClient(void);
-       void SetClient(JsonRpcClient::Ptr client);
-
-       virtual String GetIdentity(void) const;
-       virtual String GetAddress(void) const;
-
-       virtual bool IsLocal(void) const;
-       virtual bool IsConnected(void) const;
-
-       virtual void ProcessRequest(Endpoint::Ptr sender, const RequestMessage& message);
-       virtual void ProcessResponse(Endpoint::Ptr sender, const ResponseMessage& message);
-
-       virtual void Stop(void);
-
-private:
-       String m_Identity; /**< The identity of this endpoint. */
-
-       shared_ptr<SSL_CTX> m_SSLContext;
-       String m_Address;
-       JsonRpcClient::Ptr m_Client;
-
-       void SetAddress(String address);
-
-       void NewMessageHandler(const MessagePart& message);
-       void ClientClosedHandler(void);
-       void ClientConnectedHandler(void);
-};
-
-}
-
-#endif /* JSONRPCENDPOINT_H */
diff --git a/icinga/virtualendpoint.cpp b/icinga/virtualendpoint.cpp
deleted file mode 100644 (file)
index c82bf3f..0000000
+++ /dev/null
@@ -1,97 +0,0 @@
-/******************************************************************************
- * Icinga 2                                                                   *
- * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/)        *
- *                                                                            *
- * This program is free software; you can redistribute it and/or              *
- * modify it under the terms of the GNU General Public License                *
- * as published by the Free Software Foundation; either version 2             *
- * of the License, or (at your option) any later version.                     *
- *                                                                            *
- * This program is distributed in the hope that it will be useful,            *
- * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
- * GNU General Public License for more details.                               *
- *                                                                            *
- * You should have received a copy of the GNU General Public License          *
- * along with this program; if not, write to the Free Software Foundation     *
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
- ******************************************************************************/
-
-#include "i2-icinga.h"
-
-using namespace icinga;
-
-String VirtualEndpoint::GetIdentity(void) const
-{
-       return "__" + GetAddress();
-}
-
-String VirtualEndpoint::GetAddress(void) const
-{
-       char address[50];
-       sprintf(address, "virtual:%p", (void *)this);
-       return address;
-}
-
-bool VirtualEndpoint::IsLocal(void) const
-{
-       return true;
-}
-
-bool VirtualEndpoint::IsConnected(void) const
-{
-       return true;
-}
-
-void VirtualEndpoint::RegisterTopicHandler(String topic, function<void (const VirtualEndpoint::Ptr&, const Endpoint::Ptr, const RequestMessage&)> callback)
-{
-       map<String, shared_ptr<boost::signal<void (const VirtualEndpoint::Ptr&, const Endpoint::Ptr, const RequestMessage&)> > >::iterator it;
-       it = m_TopicHandlers.find(topic);
-
-       shared_ptr<boost::signal<void (const VirtualEndpoint::Ptr&, const Endpoint::Ptr, const RequestMessage&)> > sig;
-
-       if (it == m_TopicHandlers.end()) {
-               sig = boost::make_shared<boost::signal<void (const VirtualEndpoint::Ptr&, const Endpoint::Ptr, const RequestMessage&)> >();
-               m_TopicHandlers.insert(make_pair(topic, sig));
-       } else {
-               sig = it->second;
-       }
-
-       sig->connect(callback);
-
-       RegisterSubscription(topic);
-}
-
-void VirtualEndpoint::UnregisterTopicHandler(String topic, function<void (const VirtualEndpoint::Ptr&, const Endpoint::Ptr, const RequestMessage&)> callback)
-{
-       // TODO: implement
-       //m_TopicHandlers[method] -= callback;
-       //UnregisterMethodSubscription(method);
-
-       throw_exception(NotImplementedException());
-}
-
-void VirtualEndpoint::ProcessRequest(Endpoint::Ptr sender, const RequestMessage& request)
-{
-       String method;
-       if (!request.GetMethod(&method))
-               return;
-
-       map<String, shared_ptr<boost::signal<void (const VirtualEndpoint::Ptr&, const Endpoint::Ptr, const RequestMessage&)> > >::iterator it;
-       it = m_TopicHandlers.find(method);
-
-       if (it == m_TopicHandlers.end())
-               return;
-
-       (*it->second)(GetSelf(), sender, request);
-}
-
-void VirtualEndpoint::ProcessResponse(Endpoint::Ptr sender, const ResponseMessage& response)
-{
-       GetEndpointManager()->ProcessResponseMessage(sender, response);
-}
-
-void VirtualEndpoint::Stop(void)
-{
-       /* Nothing to do here. */
-}
diff --git a/icinga/virtualendpoint.h b/icinga/virtualendpoint.h
deleted file mode 100644 (file)
index f81b6f8..0000000
+++ /dev/null
@@ -1,57 +0,0 @@
-/******************************************************************************
- * Icinga 2                                                                   *
- * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/)        *
- *                                                                            *
- * This program is free software; you can redistribute it and/or              *
- * modify it under the terms of the GNU General Public License                *
- * as published by the Free Software Foundation; either version 2             *
- * of the License, or (at your option) any later version.                     *
- *                                                                            *
- * This program is distributed in the hope that it will be useful,            *
- * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
- * GNU General Public License for more details.                               *
- *                                                                            *
- * You should have received a copy of the GNU General Public License          *
- * along with this program; if not, write to the Free Software Foundation     *
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
- ******************************************************************************/
-
-#ifndef VIRTUALENDPOINT_H
-#define VIRTUALENDPOINT_H
-
-namespace icinga
-{
-
-/**
- * A local endpoint.
- *
- * @ingroup icinga
- */
-class I2_ICINGA_API VirtualEndpoint : public Endpoint
-{
-public:
-       typedef shared_ptr<VirtualEndpoint> Ptr;
-       typedef weak_ptr<VirtualEndpoint> WeakPtr;
-
-       void RegisterTopicHandler(String topic, function<void (const VirtualEndpoint::Ptr&, const Endpoint::Ptr, const RequestMessage&)> callback);
-       void UnregisterTopicHandler(String topic, function<void (const VirtualEndpoint::Ptr&, const Endpoint::Ptr, const RequestMessage&)> callback);
-
-       virtual String GetIdentity(void) const;
-       virtual String GetAddress(void) const;
-
-       virtual bool IsLocal(void) const;
-       virtual bool IsConnected(void) const;
-
-       virtual void ProcessRequest(Endpoint::Ptr sender, const RequestMessage& message);
-       virtual void ProcessResponse(Endpoint::Ptr sender, const ResponseMessage& message);
-
-       virtual void Stop(void);
-
-private:
-       map< String, shared_ptr<boost::signal<void (const VirtualEndpoint::Ptr&, const Endpoint::Ptr, const RequestMessage&)> > > m_TopicHandlers;
-};
-
-}
-
-#endif /* VIRTUALENDPOINT_H */
index 29cca0888e0afeb2546d1b0518e495d69d5c07ae..496c0f27304c71b41fc565b9503351182da1957b 100644 (file)
@@ -41,7 +41,9 @@ JsonRpcClient::JsonRpcClient(TcpClientRole role, shared_ptr<SSL_CTX> sslContext)
 void JsonRpcClient::SendMessage(const MessagePart& message)
 {
        Value value = message.GetDictionary();
-       NetString::WriteStringToIOQueue(this, value.Serialize());
+       String json = value.Serialize();
+       //std::cerr << ">> " << json << std::endl;
+       NetString::WriteStringToIOQueue(this, json);
 }
 
 /**
@@ -52,6 +54,8 @@ void JsonRpcClient::DataAvailableHandler(void)
        String jsonString;
 
        while (NetString::ReadStringFromIOQueue(this, &jsonString)) {
+               //std::cerr << "<< " << jsonString << std::endl;
+
                try {
                        Value value = Value::Deserialize(jsonString);