m_Data.erase(it);
}
+/**
+ * Makes a shallow copy of a dictionary.
+ *
+ * @returns a copy of the dictionary.
+ */
+Dictionary::Ptr Dictionary::ShallowClone(void) const
+{
+ Dictionary::Ptr clone = boost::make_shared<Dictionary>();
+
+ String key;
+ Value value;
+ BOOST_FOREACH(tie(key, value), m_Data) {
+ clone->Set(key, value);
+ }
+
+ return clone;
+}
+
/**
* Converts a JSON object to a dictionary.
*
void Remove(const String& key);
void Remove(Iterator it);
+ Dictionary::Ptr ShallowClone(void) const;
+
static Dictionary::Ptr FromJson(cJSON *json);
cJSON *ToJson(void) const;
InternalSetAttribute(name, data, GetCurrentTx());
}
+void DynamicObject::Touch(const String& name)
+{
+ InternalSetAttribute(name, InternalGetAttribute(name), GetCurrentTx());
+}
+
Value DynamicObject::Get(const String& name) const
{
return InternalGetAttribute(name);
void RegisterAttribute(const String& name, DynamicAttributeType type);
void Set(const String& name, const Value& data);
+ void Touch(const String& name);
Value Get(const String& name) const;
bool HasAttribute(const String& name) const;
return boost::make_shared<T>(serializedUpdate);
}
+#define REGISTER_CLASS_ALIAS(klass, alias) \
+ static RegisterClassHelper g_Register ## klass(alias, DynamicObjectFactory<klass>)
+
#define REGISTER_CLASS(klass) \
- static RegisterClassHelper g_Register ## klass(#klass, DynamicObjectFactory<klass>)
+ REGISTER_CLASS_ALIAS(klass, #klass)
}
Process::Process(const String& command)
: AsyncTask<Process, ProcessResult>(), m_Command(command), m_UsePopen(false)
{
+ assert(Application::IsMainThread());
+
if (!m_ThreadCreated) {
thread t(&Process::WorkerThreadProc);
t.detach();
compat \
convenience \
delegation \
- demo \
- discovery
+ demo
void CheckerComponent::Start(void)
{
- m_Endpoint = boost::make_shared<VirtualEndpoint>();
+ m_Endpoint = Endpoint::MakeEndpoint("checker", true);
/* dummy registration so the delegation module knows this is a checker
TODO: figure out a better way for this */
m_Endpoint->RegisterSubscription("checker");
- EndpointManager::GetInstance()->RegisterEndpoint(m_Endpoint);
-
Service::OnCheckerChanged.connect(bind(&CheckerComponent::CheckerChangedHandler, this, _1));
DynamicObject::OnUnregistered.connect(bind(&CheckerComponent::ServiceRemovedHandler, this, _1));
void CheckerComponent::Stop(void)
{
- EndpointManager::Ptr mgr = EndpointManager::GetInstance();
-
- if (mgr)
- mgr->UnregisterEndpoint(m_Endpoint);
+ m_Endpoint->Unregister();
}
void CheckerComponent::CheckTimerHandler(void)
{
String checker = service->GetChecker();
- if (checker == EndpointManager::GetInstance()->GetIdentity() || checker == m_Endpoint->GetIdentity()) {
+ if (checker == EndpointManager::GetInstance()->GetIdentity() || checker == m_Endpoint->GetName()) {
if (m_PendingServices.find(service) != m_PendingServices.end())
return;
virtual void Stop(void);
private:
- VirtualEndpoint::Ptr m_Endpoint;
+ Endpoint::Ptr m_Endpoint;
ServiceSet m_IdleServices;
ServiceSet m_PendingServices;
*/
void CIBSyncComponent::Start(void)
{
- m_Endpoint = boost::make_shared<VirtualEndpoint>();
-
- /* config objects */
- m_Endpoint->RegisterTopicHandler("config::FetchObjects",
- boost::bind(&CIBSyncComponent::FetchObjectsHandler, this, _2));
+ m_Endpoint = Endpoint::MakeEndpoint("cibsync", true);
DynamicObject::OnRegistered.connect(boost::bind(&CIBSyncComponent::LocalObjectRegisteredHandler, this, _1));
DynamicObject::OnUnregistered.connect(boost::bind(&CIBSyncComponent::LocalObjectUnregisteredHandler, this, _1));
DynamicObject::OnTransactionClosing.connect(boost::bind(&CIBSyncComponent::TransactionClosingHandler, this, _1));
- EndpointManager::GetInstance()->OnNewEndpoint.connect(boost::bind(&CIBSyncComponent::NewEndpointHandler, this, _2));
-
+ Endpoint::OnConnected.connect(boost::bind(&CIBSyncComponent::EndpointConnectedHandler, this, _1));
+
m_Endpoint->RegisterTopicHandler("config::ObjectUpdate",
boost::bind(&CIBSyncComponent::RemoteObjectUpdateHandler, this, _2, _3));
m_Endpoint->RegisterTopicHandler("config::ObjectRemoved",
/* service status */
m_Endpoint->RegisterTopicHandler("checker::ServiceStateChange",
boost::bind(&CIBSyncComponent::ServiceStateChangeRequestHandler, _2, _3));
-
- EndpointManager::GetInstance()->RegisterEndpoint(m_Endpoint);
}
/**
*/
void CIBSyncComponent::Stop(void)
{
- EndpointManager::Ptr endpointManager = EndpointManager::GetInstance();
-
- if (endpointManager)
- endpointManager->UnregisterEndpoint(m_Endpoint);
+ m_Endpoint->Unregister();
}
void CIBSyncComponent::ServiceStateChangeRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request)
CIB::UpdateTaskStatistics(now, 1);
}
-void CIBSyncComponent::NewEndpointHandler(const Endpoint::Ptr& endpoint)
+void CIBSyncComponent::EndpointConnectedHandler(const Endpoint::Ptr& endpoint)
{
/* no need to sync the config with local endpoints */
- if (endpoint->IsLocal())
+ if (endpoint->IsLocalEndpoint())
return;
- endpoint->OnSessionEstablished.connect(boost::bind(&CIBSyncComponent::SessionEstablishedHandler, this, _1));
-}
+ /* we just assume the other endpoint wants object updates */
+ endpoint->RegisterSubscription("config::ObjectUpdate");
+ endpoint->RegisterSubscription("config::ObjectRemoved");
-void CIBSyncComponent::SessionEstablishedHandler(const Endpoint::Ptr& endpoint)
-{
- RequestMessage request;
- request.SetMethod("config::FetchObjects");
+ pair<DynamicObject::TypeMap::iterator, DynamicObject::TypeMap::iterator> trange = DynamicObject::GetTypes();
+ DynamicObject::TypeMap::iterator tt;
+ for (tt = trange.first; tt != trange.second; tt++) {
+ DynamicObject::Ptr object;
+ BOOST_FOREACH(tie(tuples::ignore, object), tt->second) {
+ if (!ShouldReplicateObject(object))
+ continue;
- EndpointManager::GetInstance()->SendUnicastMessage(m_Endpoint, endpoint, request);
+ RequestMessage request = MakeObjectMessage(object, "config::ObjectUpdate", 0, true);
+ EndpointManager::GetInstance()->SendUnicastMessage(m_Endpoint, endpoint, request);
+ }
+ }
}
RequestMessage CIBSyncComponent::MakeObjectMessage(const DynamicObject::Ptr& object, const String& method, double sinceTx, bool includeProperties)
return (!object->IsLocal());
}
-void CIBSyncComponent::FetchObjectsHandler(const Endpoint::Ptr& sender)
-{
- pair<DynamicObject::TypeMap::iterator, DynamicObject::TypeMap::iterator> trange = DynamicObject::GetTypes();
- DynamicObject::TypeMap::iterator tt;
- for (tt = trange.first; tt != trange.second; tt++) {
- DynamicObject::Ptr object;
- BOOST_FOREACH(tie(tuples::ignore, object), tt->second) {
- if (!ShouldReplicateObject(object))
- continue;
-
- RequestMessage request = MakeObjectMessage(object, "config::ObjectUpdate", 0, true);
-
- EndpointManager::GetInstance()->SendUnicastMessage(m_Endpoint, sender, request);
- }
- }
-}
-
void CIBSyncComponent::LocalObjectRegisteredHandler(const DynamicObject::Ptr& object)
{
if (!ShouldReplicateObject(object))
}
if (object->GetSource().IsEmpty())
- object->SetSource(sender->GetIdentity());
+ object->SetSource(sender->GetName());
object->Register();
} else {
virtual void Stop(void);
private:
- VirtualEndpoint::Ptr m_Endpoint;
+ Endpoint::Ptr m_Endpoint;
static void ServiceStateChangeRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request);
- void NewEndpointHandler(const Endpoint::Ptr& endpoint);
- void SessionEstablishedHandler(const Endpoint::Ptr& endpoint);
+ void EndpointConnectedHandler(const Endpoint::Ptr& endpoint);
void LocalObjectRegisteredHandler(const DynamicObject::Ptr& object);
void LocalObjectUnregisteredHandler(const DynamicObject::Ptr& object);
void TransactionClosingHandler(const set<DynamicObject::Ptr>& modifiedObjects);
- void FetchObjectsHandler(const Endpoint::Ptr& sender);
void RemoteObjectUpdateHandler(const Endpoint::Ptr& sender, const RequestMessage& request);
void RemoteObjectRemovedHandler(const RequestMessage& request);
{
vector<Endpoint::Ptr> candidates;
- EndpointManager::Iterator it;
- for (it = EndpointManager::GetInstance()->Begin(); it != EndpointManager::GetInstance()->End(); it++) {
- Endpoint::Ptr endpoint = it->second;
+ DynamicObject::Ptr object;
+ BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) {
+ Endpoint::Ptr endpoint = dynamic_pointer_cast<Endpoint>(object);
/* ignore disconnected endpoints */
if (!endpoint->IsConnected())
continue;
/* ignore endpoints that aren't allowed to check this service */
- if (!service->IsAllowedChecker(it->first))
+ if (!service->IsAllowedChecker(endpoint->GetName()))
continue;
candidates.push_back(endpoint);
{
map<Endpoint::Ptr, int> histogram;
- EndpointManager::Iterator eit;
- for (eit = EndpointManager::GetInstance()->Begin(); eit != EndpointManager::GetInstance()->End(); eit++)
- histogram[eit->second] = 0;
+ DynamicObject::Ptr object;
+ BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) {
+ Endpoint::Ptr endpoint = dynamic_pointer_cast<Endpoint>(object);
+
+ histogram[endpoint] = 0;
+ }
vector<Service::Ptr> services;
/* build "checker -> service count" histogram */
- DynamicObject::Ptr object;
BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Service")) {
Service::Ptr service = dynamic_pointer_cast<Service>(object);
if (checker.IsEmpty())
continue;
- Endpoint::Ptr endpoint = EndpointManager::GetInstance()->GetEndpointByIdentity(checker);
- if (!endpoint)
+ if (!Endpoint::Exists(checker))
continue;
+ Endpoint::Ptr endpoint = Endpoint::GetByName(checker);
+
histogram[endpoint]++;
}
String checker = service->GetChecker();
Endpoint::Ptr oldEndpoint;
- if (!checker.IsEmpty())
- oldEndpoint = EndpointManager::GetInstance()->GetEndpointByIdentity(checker);
+ if (Endpoint::Exists(checker))
+ oldEndpoint = Endpoint::GetByName(checker);
vector<Endpoint::Ptr> candidates = GetCheckerCandidates(service);
if (histogram[candidate] > avg_services)
continue;
- service->SetChecker(candidate->GetIdentity());
+ service->SetChecker(candidate->GetName());
histogram[candidate]++;
delegated++;
int count;
BOOST_FOREACH(tie(endpoint, count), histogram) {
stringstream msgbuf;
- msgbuf << "histogram: " << endpoint->GetIdentity() << " - " << count;
+ msgbuf << "histogram: " << endpoint->GetName() << " - " << count;
Logger::Write(LogInformation, "delegation", msgbuf.str());
}
*/
void DemoComponent::Start(void)
{
- m_Endpoint = boost::make_shared<VirtualEndpoint>();
+ m_Endpoint = Endpoint::MakeEndpoint("demo", true);
m_Endpoint->RegisterTopicHandler("demo::HelloWorld",
boost::bind(&DemoComponent::HelloWorldRequestHandler, this, _2, _3));
- EndpointManager::GetInstance()->RegisterEndpoint(m_Endpoint);
m_DemoTimer = boost::make_shared<Timer>();
m_DemoTimer->SetInterval(5);
*/
void DemoComponent::Stop(void)
{
- EndpointManager::Ptr endpointManager = EndpointManager::GetInstance();
-
- if (endpointManager)
- endpointManager->UnregisterEndpoint(m_Endpoint);
+ m_Endpoint->Unregister();
}
/**
*/
void DemoComponent::HelloWorldRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request)
{
- Logger::Write(LogInformation, "demo", "Got 'hello world' from address=" + sender->GetAddress() + ", identity=" + sender->GetIdentity());
+ Logger::Write(LogInformation, "demo", "Got 'hello world' from address=" + sender->GetAddress() + ", identity=" + sender->GetName());
}
EXPORT_COMPONENT(demo, DemoComponent);
private:
Timer::Ptr m_DemoTimer;
- VirtualEndpoint::Ptr m_Endpoint;
+ Endpoint::Ptr m_Endpoint;
void DemoTimerHandler(void);
void HelloWorldRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request);
+++ /dev/null
-## Process this file with automake to produce Makefile.in
-
-pkglib_LTLIBRARIES = \
- discovery.la
-
-discovery_la_SOURCES = \
- discoverycomponent.cpp \
- discoverycomponent.h \
- discoverymessage.cpp \
- discoverymessage.h \
- i2-discovery.h
-
-discovery_la_CPPFLAGS = \
- $(BOOST_CPPFLAGS) \
- -I${top_srcdir}/base \
- -I${top_srcdir}/dyn \
- -I${top_srcdir}/jsonrpc \
- -I${top_srcdir}/icinga \
- -I${top_srcdir}/cib
-
-discovery_la_LDFLAGS = \
- $(BOOST_LDFLAGS) \
- -module \
- -no-undefined \
- @RELEASE_INFO@ \
- @VERSION_INFO@
-
-discovery_la_LIBADD = \
- $(BOOST_SIGNALS_LIB) \
- $(BOOST_THREAD_LIB) \
- ${top_builddir}/base/libbase.la \
- ${top_builddir}/dyn/libdyn.la \
- ${top_builddir}/jsonrpc/libjsonrpc.la \
- ${top_builddir}/icinga/libicinga.la \
- ${top_builddir}/cib/libcib.la
+++ /dev/null
-<?xml version="1.0" encoding="utf-8"?>
-<Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
- <ItemGroup Label="ProjectConfigurations">
- <ProjectConfiguration Include="Debug|Win32">
- <Configuration>Debug</Configuration>
- <Platform>Win32</Platform>
- </ProjectConfiguration>
- <ProjectConfiguration Include="Release|Win32">
- <Configuration>Release</Configuration>
- <Platform>Win32</Platform>
- </ProjectConfiguration>
- </ItemGroup>
- <PropertyGroup Label="Globals">
- <ProjectGuid>{EAD41628-BB96-4F99-9070-8A9676801295}</ProjectGuid>
- <Keyword>Win32Proj</Keyword>
- <RootNamespace>discovery</RootNamespace>
- </PropertyGroup>
- <Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
- <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'" Label="Configuration">
- <ConfigurationType>DynamicLibrary</ConfigurationType>
- <UseDebugLibraries>true</UseDebugLibraries>
- <CharacterSet>MultiByte</CharacterSet>
- </PropertyGroup>
- <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'" Label="Configuration">
- <ConfigurationType>DynamicLibrary</ConfigurationType>
- <UseDebugLibraries>false</UseDebugLibraries>
- <WholeProgramOptimization>true</WholeProgramOptimization>
- <CharacterSet>MultiByte</CharacterSet>
- </PropertyGroup>
- <Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" />
- <ImportGroup Label="ExtensionSettings">
- </ImportGroup>
- <ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
- <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
- </ImportGroup>
- <ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
- <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
- </ImportGroup>
- <PropertyGroup Label="UserMacros" />
- <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
- <LinkIncremental>true</LinkIncremental>
- <IncludePath>$(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\icinga;$(SolutionDir)\cib;$(SolutionDir)\dyn;$(IncludePath)</IncludePath>
- <LibraryPath>$(OutDir);$(LibraryPath)</LibraryPath>
- </PropertyGroup>
- <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
- <LinkIncremental>false</LinkIncremental>
- <IncludePath>$(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\icinga;$(SolutionDir)\cib;$(SolutionDir)\dyn;$(IncludePath)</IncludePath>
- <LibraryPath>$(OutDir);$(LibraryPath)</LibraryPath>
- </PropertyGroup>
- <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
- <ClCompile>
- <Optimization>Disabled</Optimization>
- <PreprocessorDefinitions>WIN32;_DEBUG;_WINDOWS;_USRDLL;DISCOVERY_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
- <WarningLevel>Level3</WarningLevel>
- <MinimalRebuild>false</MinimalRebuild>
- <MultiProcessorCompilation>true</MultiProcessorCompilation>
- </ClCompile>
- <Link>
- <SubSystem>Windows</SubSystem>
- <GenerateDebugInformation>true</GenerateDebugInformation>
- <AdditionalDependencies>base.lib;jsonrpc.lib;icinga.lib;cib.lib;%(AdditionalDependencies)</AdditionalDependencies>
- </Link>
- </ItemDefinitionGroup>
- <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
- <ClCompile>
- <Optimization>MaxSpeed</Optimization>
- <FunctionLevelLinking>true</FunctionLevelLinking>
- <IntrinsicFunctions>true</IntrinsicFunctions>
- <PreprocessorDefinitions>WIN32;NDEBUG;_WINDOWS;_USRDLL;DISCOVERY_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
- <WarningLevel>Level3</WarningLevel>
- <MinimalRebuild>false</MinimalRebuild>
- <MultiProcessorCompilation>true</MultiProcessorCompilation>
- </ClCompile>
- <Link>
- <SubSystem>Windows</SubSystem>
- <GenerateDebugInformation>true</GenerateDebugInformation>
- <EnableCOMDATFolding>true</EnableCOMDATFolding>
- <OptimizeReferences>true</OptimizeReferences>
- <AdditionalDependencies>base.lib;jsonrpc.lib;icinga.lib;cib.lib;%(AdditionalDependencies)</AdditionalDependencies>
- </Link>
- </ItemDefinitionGroup>
- <ItemGroup>
- <ClCompile Include="discoverycomponent.cpp" />
- <ClCompile Include="discoverymessage.cpp" />
- </ItemGroup>
- <ItemGroup>
- <ClInclude Include="discoverycomponent.h" />
- <ClInclude Include="discoverymessage.h" />
- <ClInclude Include="i2-discovery.h" />
- </ItemGroup>
- <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
- <ImportGroup Label="ExtensionTargets">
- </ImportGroup>
-</Project>
\ No newline at end of file
+++ /dev/null
-<?xml version="1.0" encoding="utf-8"?>
-<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
- <ItemGroup>
- <ClCompile Include="discoverycomponent.cpp">
- <Filter>Quelldateien</Filter>
- </ClCompile>
- <ClCompile Include="discoverymessage.cpp">
- <Filter>Quelldateien</Filter>
- </ClCompile>
- </ItemGroup>
- <ItemGroup>
- <ClInclude Include="discoverycomponent.h">
- <Filter>Headerdateien</Filter>
- </ClInclude>
- <ClInclude Include="i2-discovery.h">
- <Filter>Headerdateien</Filter>
- </ClInclude>
- <ClInclude Include="discoverymessage.h">
- <Filter>Headerdateien</Filter>
- </ClInclude>
- </ItemGroup>
- <ItemGroup>
- <Filter Include="Headerdateien">
- <UniqueIdentifier>{53341f7e-6bad-4cf1-92cf-be906efe1704}</UniqueIdentifier>
- </Filter>
- <Filter Include="Quelldateien">
- <UniqueIdentifier>{c7b2deba-743b-4449-ae46-0b7ba1b1350a}</UniqueIdentifier>
- </Filter>
- </ItemGroup>
-</Project>
\ No newline at end of file
+++ /dev/null
-/******************************************************************************
- * Icinga 2 *
- * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
- * *
- * This program is free software; you can redistribute it and/or *
- * modify it under the terms of the GNU General Public License *
- * as published by the Free Software Foundation; either version 2 *
- * of the License, or (at your option) any later version. *
- * *
- * This program is distributed in the hope that it will be useful, *
- * but WITHOUT ANY WARRANTY; without even the implied warranty of *
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
- * GNU General Public License for more details. *
- * *
- * You should have received a copy of the GNU General Public License *
- * along with this program; if not, write to the Free Software Foundation *
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
- ******************************************************************************/
-
-#include "i2-discovery.h"
-
-using namespace icinga;
-
-/**
- * Starts the discovery component.
- */
-void DiscoveryComponent::Start(void)
-{
- m_Endpoint = boost::make_shared<VirtualEndpoint>();
-
- m_Endpoint->RegisterTopicHandler("discovery::RegisterComponent",
- boost::bind(&DiscoveryComponent::RegisterComponentMessageHandler, this, _2, _3));
-
- m_Endpoint->RegisterTopicHandler("discovery::NewComponent",
- boost::bind(&DiscoveryComponent::NewComponentMessageHandler, this, _3));
-
- m_Endpoint->RegisterTopicHandler("discovery::Welcome",
- boost::bind(&DiscoveryComponent::WelcomeMessageHandler, this, _2, _3));
-
- EndpointManager::GetInstance()->ForEachEndpoint(boost::bind(&DiscoveryComponent::NewEndpointHandler, this, _2));
- EndpointManager::GetInstance()->OnNewEndpoint.connect(boost::bind(&DiscoveryComponent::NewEndpointHandler, this, _2));
-
- EndpointManager::GetInstance()->RegisterEndpoint(m_Endpoint);
-
- /* create the reconnect timer */
- m_DiscoveryTimer = boost::make_shared<Timer>();
- m_DiscoveryTimer->SetInterval(30);
- m_DiscoveryTimer->OnTimerExpired.connect(boost::bind(&DiscoveryComponent::DiscoveryTimerHandler, this));
- m_DiscoveryTimer->Start();
-
- /* call the timer as soon as possible */
- m_DiscoveryTimer->Reschedule(0);
-}
-
-/**
- * Stops the discovery component.
- */
-void DiscoveryComponent::Stop(void)
-{
- EndpointManager::Ptr mgr = EndpointManager::GetInstance();
-
- if (mgr)
- mgr->UnregisterEndpoint(m_Endpoint);
-}
-
-/**
- * Checks whether the specified endpoint is already connected
- * and disconnects older endpoints.
- *
- * @param self The endpoint that is to be checked.
- * @param other The other endpoint.
- */
-void DiscoveryComponent::CheckExistingEndpoint(const Endpoint::Ptr& self, const Endpoint::Ptr& other)
-{
- if (self == other)
- return;
-
- if (!other->IsConnected())
- return;
-
- if (self->GetIdentity() == other->GetIdentity()) {
- Logger::Write(LogWarning, "discovery", "Detected duplicate identity:" + other->GetIdentity() + " - Disconnecting old endpoint.");
-
- other->Stop();
- EndpointManager::GetInstance()->UnregisterEndpoint(other);
- }
-}
-
-/**
- * Deals with a new endpoint.
- *
- * @param endpoint The endpoint.
- */
-void DiscoveryComponent::NewEndpointHandler(const Endpoint::Ptr& endpoint)
-{
- /* immediately finish session setup for local endpoints */
- if (endpoint->IsLocal()) {
- endpoint->OnSessionEstablished(endpoint);
- return;
- }
-
- String identity = endpoint->GetIdentity();
-
- if (identity == EndpointManager::GetInstance()->GetIdentity()) {
- Logger::Write(LogWarning, "discovery", "Detected loop-back connection - Disconnecting endpoint.");
-
- endpoint->Stop();
- EndpointManager::GetInstance()->UnregisterEndpoint(endpoint);
-
- return;
- }
-
- EndpointManager::GetInstance()->ForEachEndpoint(boost::bind(&DiscoveryComponent::CheckExistingEndpoint, this, endpoint, _2));
-
- // we assume the other component _always_ wants
- // discovery::RegisterComponent messages from us
- endpoint->RegisterSubscription("discovery::RegisterComponent");
-
- // send a discovery::RegisterComponent message, if the
- // other component is a broker this makes sure
- // the broker knows about our message types
- SendDiscoveryMessage("discovery::RegisterComponent", EndpointManager::GetInstance()->GetIdentity(), endpoint);
-
- map<String, ComponentDiscoveryInfo::Ptr>::iterator ic;
-
- // we assume the other component _always_ wants
- // discovery::NewComponent messages from us
- endpoint->RegisterSubscription("discovery::NewComponent");
-
- // send discovery::NewComponent message for ourselves
- SendDiscoveryMessage("discovery::NewComponent", EndpointManager::GetInstance()->GetIdentity(), endpoint);
-
- // send discovery::NewComponent messages for all components
- // we know about
- for (ic = m_Components.begin(); ic != m_Components.end(); ic++) {
- SendDiscoveryMessage("discovery::NewComponent", ic->first, endpoint);
- }
-
- // check if we already know the other component
- ic = m_Components.find(endpoint->GetIdentity());
-
- if (ic == m_Components.end()) {
- // we don't know the other component yet, so
- // wait until we get a discovery::NewComponent message
- // from a broker
- return;
- }
-
- // register published/subscribed topics for this endpoint
- ComponentDiscoveryInfo::Ptr info = ic->second;
- BOOST_FOREACH(String subscription, info->Subscriptions) {
- endpoint->RegisterSubscription(subscription);
- }
-
- FinishDiscoverySetup(endpoint);
-}
-
-/**
- * Registers message Subscriptions/sources in the specified component information object.
- *
- * @param neea Event arguments for the endpoint.
- * @param info Component information object.
- * @return 0
- */
-void DiscoveryComponent::DiscoveryEndpointHandler(const Endpoint::Ptr& endpoint, const ComponentDiscoveryInfo::Ptr& info) const
-{
- Endpoint::ConstTopicIterator i;
-
- for (i = endpoint->BeginSubscriptions(); i != endpoint->EndSubscriptions(); i++)
- info->Subscriptions.insert(*i);
-}
-
-/**
- * Retrieves the component information object for the specified component.
- *
- * @param component The identity of the component.
- * @param info Pointer to the information object.
- * @returns true if the info object was successfully retrieved, false otherwise.
- */
-bool DiscoveryComponent::GetComponentDiscoveryInfo(String component, ComponentDiscoveryInfo::Ptr *info) const
-{
- if (component == EndpointManager::GetInstance()->GetIdentity()) {
- /* Build fake discovery info for ourselves */
- *info = boost::make_shared<ComponentDiscoveryInfo>();
- EndpointManager::GetInstance()->ForEachEndpoint(boost::bind(&DiscoveryComponent::DiscoveryEndpointHandler, this, _2, *info));
-
- (*info)->LastSeen = 0;
- (*info)->Node = IcingaApplication::GetInstance()->GetNode();
- (*info)->Service = IcingaApplication::GetInstance()->GetService();
-
- return true;
- }
-
- map<String, ComponentDiscoveryInfo::Ptr>::const_iterator i;
-
- i = m_Components.find(component);
-
- if (i == m_Components.end())
- return false;
-
- *info = i->second;
- return true;
-}
-
-/**
- * Processes discovery::Welcome messages.
- *
- * @param nrea Event arguments for the request.
- * @returns 0
- */
-void DiscoveryComponent::WelcomeMessageHandler(const Endpoint::Ptr& sender, const RequestMessage& request)
-{
- if (sender->HasReceivedWelcome())
- return;
-
- sender->SetReceivedWelcome(true);
-
- if (sender->HasSentWelcome())
- sender->OnSessionEstablished(sender);
-}
-
-/**
- * Finishes the welcome handshake for a new component
- * by registering message Subscriptions/sources for the component
- * and sending a welcome message if necessary.
- *
- * @param endpoint The endpoint to set up.
- */
-void DiscoveryComponent::FinishDiscoverySetup(const Endpoint::Ptr& endpoint)
-{
- if (endpoint->HasSentWelcome())
- return;
-
- // we assume the other component _always_ wants
- // discovery::Welcome messages from us
- endpoint->RegisterSubscription("discovery::Welcome");
- RequestMessage request;
- request.SetMethod("discovery::Welcome");
- EndpointManager::GetInstance()->SendUnicastMessage(m_Endpoint, endpoint, request);
-
- endpoint->SetSentWelcome(true);
-
- if (endpoint->HasReceivedWelcome())
- endpoint->OnSessionEstablished(endpoint);
-}
-
-/**
- * Sends a discovery message for the specified identity using the
- * specified message type.
- *
- * @param method The method to use for the message ("discovery::NewComponent" or "discovery::RegisterComponent").
- * @param identity The identity of the component for which a message should be sent.
- * @param recipient The recipient of the message. A multicast message is sent if this parameter is empty.
- */
-void DiscoveryComponent::SendDiscoveryMessage(const String& method, const String& identity, const Endpoint::Ptr& recipient)
-{
- RequestMessage request;
- request.SetMethod(method);
-
- DiscoveryMessage params;
- request.SetParams(params);
-
- params.SetIdentity(identity);
-
- ComponentDiscoveryInfo::Ptr info;
-
- if (!GetComponentDiscoveryInfo(identity, &info))
- return;
-
- if (!info->Node.IsEmpty() && !info->Service.IsEmpty()) {
- params.SetNode(info->Node);
- params.SetService(info->Service);
- }
-
- set<String>::iterator i;
- Dictionary::Ptr subscriptions = boost::make_shared<Dictionary>();
- BOOST_FOREACH(String subscription, info->Subscriptions) {
- subscriptions->Add(subscription);
- }
-
- params.SetSubscriptions(subscriptions);
-
- if (recipient)
- EndpointManager::GetInstance()->SendUnicastMessage(m_Endpoint, recipient, request);
- else
- EndpointManager::GetInstance()->SendMulticastMessage(m_Endpoint, request);
-}
-
-/**
- * Processes a discovery message by registering the component in the
- * discovery component registry.
- *
- * @param identity The authorative identity of the component.
- * @param message The discovery message.
- * @param trusted Whether the message comes from a trusted source (i.e. a broker).
- */
-void DiscoveryComponent::ProcessDiscoveryMessage(const String& identity, const DiscoveryMessage& message, bool trusted)
-{
- /* ignore discovery messages that are about ourselves */
- if (identity == EndpointManager::GetInstance()->GetIdentity())
- return;
-
- ComponentDiscoveryInfo::Ptr info = boost::make_shared<ComponentDiscoveryInfo>();
-
- info->LastSeen = Utility::GetTime();
-
- String node;
- if (message.GetNode(&node) && !node.IsEmpty())
- info->Node = node;
-
- String service;
- if (message.GetService(&service) && !service.IsEmpty())
- info->Service = service;
-
- DynamicObject::Ptr endpointConfig = DynamicObject::GetObject("Endpoint", identity);
- Dictionary::Ptr roles;
- if (endpointConfig)
- roles = endpointConfig->Get("roles");
-
- Endpoint::Ptr endpoint = EndpointManager::GetInstance()->GetEndpointByIdentity(identity);
-
- Dictionary::Ptr subscriptions;
- if (message.GetSubscriptions(&subscriptions)) {
- Value subscription;
- BOOST_FOREACH(tie(tuples::ignore, subscription), subscriptions) {
- info->Subscriptions.insert(subscription);
- if (endpoint)
- endpoint->RegisterSubscription(subscription);
- }
- }
-
- map<String, ComponentDiscoveryInfo::Ptr>::iterator i;
-
- i = m_Components.find(identity);
-
- if (i != m_Components.end())
- m_Components.erase(i);
-
- m_Components[identity] = info;
-
- SendDiscoveryMessage("discovery::NewComponent", identity, Endpoint::Ptr());
-
- /* don't send a welcome message for discovery::NewComponent messages */
- if (endpoint && !trusted)
- FinishDiscoverySetup(endpoint);
-}
-
-/**
- * Processes "discovery::NewComponent" messages.
- *
- * @param nrea Event arguments for the request.
- */
-void DiscoveryComponent::NewComponentMessageHandler(const RequestMessage& request)
-{
- DiscoveryMessage message;
- request.GetParams(&message);
-
- String identity;
- if (!message.GetIdentity(&identity))
- return;
-
- ProcessDiscoveryMessage(identity, message, true);
-}
-
-/**
- * Processes "discovery::RegisterComponent" messages.
- *
- * @param nrea Event arguments for the request.
- */
-void DiscoveryComponent::RegisterComponentMessageHandler(const Endpoint::Ptr& sender, const RequestMessage& request)
-{
- DiscoveryMessage message;
- request.GetParams(&message);
- ProcessDiscoveryMessage(sender->GetIdentity(), message, false);
-}
-
-/**
- * Checks whether we have to reconnect to other components and removes stale
- * components from the registry.
- */
-void DiscoveryComponent::DiscoveryTimerHandler(void)
-{
- EndpointManager::Ptr endpointManager = EndpointManager::GetInstance();
-
- double now = Utility::GetTime();
-
- /* check whether we have to reconnect to one of our upstream endpoints */
- DynamicObject::Ptr object;
- BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) {
- /* Check if we're already connected to this endpoint. */
- if (endpointManager->GetEndpointByIdentity(object->GetName()))
- continue;
-
- String node = object->Get("node");
- String service = object->Get("service");
- if (!node.IsEmpty() && !service.IsEmpty()) {
- /* reconnect to this endpoint */
- endpointManager->AddConnection(node, service);
- }
- }
-
- map<String, ComponentDiscoveryInfo::Ptr>::iterator curr, i;
- for (i = m_Components.begin(); i != m_Components.end(); ) {
- const String& identity = i->first;
- const ComponentDiscoveryInfo::Ptr& info = i->second;
-
- curr = i;
- i++;
-
- /* there's no need to reconnect to ourself */
- if (identity == EndpointManager::GetInstance()->GetIdentity())
- continue;
-
- /* for explicitly-configured upstream endpoints
- * we prefer to use the node/service from the
- * config object - which is what the for loop above does */
- if (DynamicObject::GetObject("endpoint", identity))
- continue;
-
- if (info->LastSeen < now - DiscoveryComponent::RegistrationTTL) {
- /* unregister this component if its registration has expired */
- m_Components.erase(curr);
- continue;
- }
-
- /* send discovery message to all connected components to
- refresh their TTL for this component */
- SendDiscoveryMessage("discovery::NewComponent", identity, Endpoint::Ptr());
-
- Endpoint::Ptr endpoint = endpointManager->GetEndpointByIdentity(identity);
- if (endpoint && endpoint->IsConnected()) {
- /* update LastSeen if we're still connected to this endpoint */
- info->LastSeen = now;
- } else {
- /* try and reconnect to this component */
- try {
- if (!info->Node.IsEmpty() && !info->Service.IsEmpty())
- endpointManager->AddConnection(info->Node, info->Service);
- } catch (const exception& ex) {
- stringstream msgbuf;
- msgbuf << "Exception while trying to reconnect to endpoint '" << endpoint->GetIdentity() << "': " << ex.what();;
- Logger::Write(LogInformation, "discovery", msgbuf.str());
- }
- }
- }
-}
-
-EXPORT_COMPONENT(discovery, DiscoveryComponent);
+++ /dev/null
-/******************************************************************************
- * Icinga 2 *
- * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
- * *
- * This program is free software; you can redistribute it and/or *
- * modify it under the terms of the GNU General Public License *
- * as published by the Free Software Foundation; either version 2 *
- * of the License, or (at your option) any later version. *
- * *
- * This program is distributed in the hope that it will be useful, *
- * but WITHOUT ANY WARRANTY; without even the implied warranty of *
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
- * GNU General Public License for more details. *
- * *
- * You should have received a copy of the GNU General Public License *
- * along with this program; if not, write to the Free Software Foundation *
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
- ******************************************************************************/
-
-#ifndef DISCOVERYCOMPONENT_H
-#define DISCOVERYCOMPONENT_H
-
-namespace icinga
-{
-
-/**
- * @ingroup discovery
- */
-class ComponentDiscoveryInfo : public Object
-{
-public:
- typedef shared_ptr<ComponentDiscoveryInfo> Ptr;
- typedef weak_ptr<ComponentDiscoveryInfo> WeakPtr;
-
- String Node;
- String Service;
-
- set<String> Subscriptions;
- set<String> Publications;
-
- double LastSeen;
-};
-
-/**
- * @ingroup discovery
- */
-class DiscoveryComponent : public IComponent
-{
-public:
- virtual void Start(void);
- virtual void Stop(void);
-
-private:
- VirtualEndpoint::Ptr m_Endpoint;
- map<String, ComponentDiscoveryInfo::Ptr> m_Components;
- Timer::Ptr m_DiscoveryTimer;
-
- void NewEndpointHandler(const Endpoint::Ptr& endpoint);
-
- void NewComponentMessageHandler(const RequestMessage& request);
- void RegisterComponentMessageHandler(const Endpoint::Ptr& sender, const RequestMessage& request);
-
- void WelcomeMessageHandler(const Endpoint::Ptr& sender, const RequestMessage& request);
-
- void SendDiscoveryMessage(const String& method, const String& identity, const Endpoint::Ptr& recipient);
- void ProcessDiscoveryMessage(const String& identity, const DiscoveryMessage& message, bool trusted);
-
- bool GetComponentDiscoveryInfo(String component, ComponentDiscoveryInfo::Ptr *info) const;
-
- void CheckExistingEndpoint(const Endpoint::Ptr& self, const Endpoint::Ptr& other);
- void DiscoveryEndpointHandler(const Endpoint::Ptr& endpoint, const ComponentDiscoveryInfo::Ptr& info) const;
-
- void DiscoveryTimerHandler(void);
-
- void FinishDiscoverySetup(const Endpoint::Ptr& endpoint);
-
- static const int RegistrationTTL = 300;
-};
-
-}
-
-#endif /* DISCOVERYCOMPONENT_H */
+++ /dev/null
-/******************************************************************************
- * Icinga 2 *
- * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
- * *
- * This program is free software; you can redistribute it and/or *
- * modify it under the terms of the GNU General Public License *
- * as published by the Free Software Foundation; either version 2 *
- * of the License, or (at your option) any later version. *
- * *
- * This program is distributed in the hope that it will be useful, *
- * but WITHOUT ANY WARRANTY; without even the implied warranty of *
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
- * GNU General Public License for more details. *
- * *
- * You should have received a copy of the GNU General Public License *
- * along with this program; if not, write to the Free Software Foundation *
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
- ******************************************************************************/
-
-#include "i2-discovery.h"
-
-using namespace icinga;
-
-DiscoveryMessage::DiscoveryMessage(void)
- : MessagePart()
-{ }
-
-DiscoveryMessage::DiscoveryMessage(const MessagePart& message)
- : MessagePart(message)
-{ }
-
-bool DiscoveryMessage::GetIdentity(String *value) const
-{
- return Get("identity", value);
-}
-
-void DiscoveryMessage::SetIdentity(const String& value)
-{
- Set("identity", value);
-}
-
-bool DiscoveryMessage::GetNode(String *value) const
-{
- return Get("node", value);
-}
-
-void DiscoveryMessage::SetNode(const String& value)
-{
- Set("node", value);
-}
-
-bool DiscoveryMessage::GetService(String *value) const
-{
- return Get("service", value);
-}
-
-void DiscoveryMessage::SetService(const String& value)
-{
- Set("service", value);
-}
-
-bool DiscoveryMessage::GetSubscriptions(Dictionary::Ptr *value) const
-{
- return Get("subscriptions", value);
-}
-
-void DiscoveryMessage::SetSubscriptions(const Dictionary::Ptr& value)
-{
- Set("subscriptions", value);
-}
-
+++ /dev/null
-/******************************************************************************
- * Icinga 2 *
- * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
- * *
- * This program is free software; you can redistribute it and/or *
- * modify it under the terms of the GNU General Public License *
- * as published by the Free Software Foundation; either version 2 *
- * of the License, or (at your option) any later version. *
- * *
- * This program is distributed in the hope that it will be useful, *
- * but WITHOUT ANY WARRANTY; without even the implied warranty of *
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
- * GNU General Public License for more details. *
- * *
- * You should have received a copy of the GNU General Public License *
- * along with this program; if not, write to the Free Software Foundation *
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
- ******************************************************************************/
-
-#ifndef DISCOVERYMESSAGE_H
-#define DISCOVERYMESSAGE_H
-
-namespace icinga
-{
-
-/**
- * @ingroup discovery
- */
-class DiscoveryMessage : public MessagePart
-{
-public:
- DiscoveryMessage(void);
- DiscoveryMessage(const MessagePart& message);
-
- bool GetIdentity(String *value) const;
- void SetIdentity(const String& value);
-
- bool GetNode(String *value) const;
- void SetNode(const String& value);
-
- bool GetService(String *value) const;
- void SetService(const String& value);
-
- bool GetSubscriptions(Dictionary::Ptr *value) const;
- void SetSubscriptions(const Dictionary::Ptr& value);
-};
-
-}
-
-#endif /* SUBSCRIPTIONMESSAGE_H */
+++ /dev/null
-/******************************************************************************
- * Icinga 2 *
- * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
- * *
- * This program is free software; you can redistribute it and/or *
- * modify it under the terms of the GNU General Public License *
- * as published by the Free Software Foundation; either version 2 *
- * of the License, or (at your option) any later version. *
- * *
- * This program is distributed in the hope that it will be useful, *
- * but WITHOUT ANY WARRANTY; without even the implied warranty of *
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
- * GNU General Public License for more details. *
- * *
- * You should have received a copy of the GNU General Public License *
- * along with this program; if not, write to the Free Software Foundation *
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
- ******************************************************************************/
-
-#ifndef I2DISCOVERY_H
-#define I2DISCOVERY_H
-
-/**
- * @defgroup discovery Discovery component
- *
- * The Discovery component takes care of connecting peers to each other
- * and performs authorisation checks for the message subscriptions.
- */
-
-#include <i2-base.h>
-#include <i2-jsonrpc.h>
-#include <i2-icinga.h>
-#include <i2-cib.h>
-
-#include "discoverymessage.h"
-#include "discoverycomponent.h"
-
-#endif /* I2DISCOVERY_H */
components/convenience/Makefile
components/delegation/Makefile
components/demo/Makefile
-components/discovery/Makefile
dyn/Makefile
icinga/Makefile
icinga-app/Makefile
-dlopen ${top_builddir}/components/compat/compat.la \
-dlopen ${top_builddir}/components/convenience/convenience.la \
-dlopen ${top_builddir}/components/delegation/delegation.la \
- -dlopen ${top_builddir}/components/demo/demo.la \
- -dlopen ${top_builddir}/components/discovery/discovery.la
+ -dlopen ${top_builddir}/components/demo/demo.la
icinga_DEPENDENCIES = \
${top_builddir}/components/cibsync/cibsync.la \
endpointmanager.h \
icingaapplication.cpp \
icingaapplication.h \
- i2-icinga.h \
- jsonrpcendpoint.cpp \
- jsonrpcendpoint.h \
- virtualendpoint.cpp \
- virtualendpoint.h
+ i2-icinga.h
libicinga_la_CPPFLAGS = \
-DI2_ICINGA_BUILD \
using namespace icinga;
+REGISTER_CLASS(Endpoint);
+
+boost::signal<void (const Endpoint::Ptr&)> Endpoint::OnConnected;
+boost::signal<void (const Endpoint::Ptr&)> Endpoint::OnDisconnected;
+boost::signal<void (const Endpoint::Ptr&, const String& topic)> Endpoint::OnSubscriptionRegistered;
+boost::signal<void (const Endpoint::Ptr&, const String& topic)> Endpoint::OnSubscriptionUnregistered;
+
+Endpoint::Endpoint(const Dictionary::Ptr& serializedUpdate)
+ : DynamicObject(serializedUpdate)
+{
+ RegisterAttribute("node", Attribute_Replicated);
+ RegisterAttribute("service", Attribute_Replicated);
+ RegisterAttribute("local", Attribute_Config);
+ RegisterAttribute("subscriptions", Attribute_Replicated);
+ RegisterAttribute("client", Attribute_Transient);
+}
+
+bool Endpoint::Exists(const String& name)
+{
+ return (DynamicObject::GetObject("Endpoint", name));
+}
+
+Endpoint::Ptr Endpoint::GetByName(const String& name)
+{
+ DynamicObject::Ptr configObject = DynamicObject::GetObject("Endpoint", name);
+
+ if (!configObject)
+ throw_exception(invalid_argument("Endpoint '" + name + "' does not exist."));
+
+ return dynamic_pointer_cast<Endpoint>(configObject);
+}
+
+Endpoint::Ptr Endpoint::MakeEndpoint(const String& name, bool local)
+{
+ ConfigItemBuilder::Ptr endpointConfig = boost::make_shared<ConfigItemBuilder>();
+ endpointConfig->SetType("Endpoint");
+ endpointConfig->SetName(local ? "local:" + name : name);
+ endpointConfig->SetLocal(local ? 1 : 0);
+ endpointConfig->AddExpression("local", OperatorSet, local);
+
+ DynamicObject::Ptr object = endpointConfig->Compile()->Commit();
+ return dynamic_pointer_cast<Endpoint>(object);
+}
+
+/**
+ * Checks whether this is a local endpoint.
+ *
+ * @returns true if this is a local endpoint, false otherwise.
+ */
+bool Endpoint::IsLocalEndpoint(void) const
+{
+ Value value = Get("local");
+
+ return (!value.IsEmpty() && value);
+}
+
/**
- * Retrieves the endpoint manager this endpoint is registered with.
+ * Checks whether this endpoint is connected.
*
- * @returns The EndpointManager object.
+ * @returns true if the endpoint is connected, false otherwise.
*/
-EndpointManager::Ptr Endpoint::GetEndpointManager(void) const
+bool Endpoint::IsConnected(void) const
{
- return m_EndpointManager.lock();
+ if (IsLocalEndpoint()) {
+ return true;
+ } else {
+ JsonRpcClient::Ptr client = GetClient();
+
+ return (client && client->IsConnected());
+ }
}
/**
- * Sets the endpoint manager this endpoint is registered with.
+ * Retrieves the address for the endpoint.
*
- * @param manager The EndpointManager object.
+ * @returns The endpoint's address.
*/
-void Endpoint::SetEndpointManager(EndpointManager::WeakPtr manager)
+String Endpoint::GetAddress(void) const
+{
+ if (IsLocalEndpoint()) {
+ return "local:" + GetName();
+ } else {
+ JsonRpcClient::Ptr client = GetClient();
+
+ if (!client)
+ return "<disconnected endpoint>";
+
+ return client->GetPeerAddress();
+ }
+}
+
+JsonRpcClient::Ptr Endpoint::GetClient(void) const
{
- m_EndpointManager = manager;
+ return Get("client");
+}
+
+void Endpoint::SetClient(const JsonRpcClient::Ptr& client)
+{
+ Set("client", client);
+ client->OnNewMessage.connect(boost::bind(&Endpoint::NewMessageHandler, this, _2));
+ client->OnClosed.connect(boost::bind(&Endpoint::ClientClosedHandler, this));
+
+ OnConnected(GetSelf());
}
/**
*
* @param topic The name of the topic.
*/
-void Endpoint::RegisterSubscription(String topic)
+void Endpoint::RegisterSubscription(const String& topic)
{
- m_Subscriptions.insert(topic);
+ Dictionary::Ptr subscriptions = GetSubscriptions();
+
+ if (!subscriptions)
+ subscriptions = boost::make_shared<Dictionary>();
+
+ if (!subscriptions->Contains(topic)) {
+ Dictionary::Ptr newSubscriptions = subscriptions->ShallowClone();
+ newSubscriptions->Set(topic, topic);
+ SetSubscriptions(newSubscriptions);
+ }
}
/**
*
* @param topic The name of the topic.
*/
-void Endpoint::UnregisterSubscription(String topic)
+void Endpoint::UnregisterSubscription(const String& topic)
{
- m_Subscriptions.erase(topic);
+ Dictionary::Ptr subscriptions = GetSubscriptions();
+
+ if (subscriptions && subscriptions->Contains(topic)) {
+ Dictionary::Ptr newSubscriptions = subscriptions->ShallowClone();
+ newSubscriptions->Remove(topic);
+ SetSubscriptions(newSubscriptions);
+ }
}
/**
* @param topic The name of the topic.
* @returns true if the endpoint is subscribed to the topic, false otherwise.
*/
-bool Endpoint::HasSubscription(String topic) const
+bool Endpoint::HasSubscription(const String& topic) const
{
- return (m_Subscriptions.find(topic) != m_Subscriptions.end());
+ Dictionary::Ptr subscriptions = GetSubscriptions();
+
+ return (subscriptions && subscriptions->Contains(topic));
}
/**
*/
void Endpoint::ClearSubscriptions(void)
{
- m_Subscriptions.clear();
+ Set("subscriptions", Empty);
}
-/**
- * Returns the beginning of the subscriptions list.
- *
- * @returns An iterator that points to the first subscription.
- */
-Endpoint::ConstTopicIterator Endpoint::BeginSubscriptions(void) const
+Dictionary::Ptr Endpoint::GetSubscriptions(void) const
{
- return m_Subscriptions.begin();
+ return Get("subscriptions");
}
-/**
- * Returns the end of the subscriptions list.
- *
- * @returns An iterator that points past the last subscription.
- */
-Endpoint::ConstTopicIterator Endpoint::EndSubscriptions(void) const
+void Endpoint::SetSubscriptions(const Dictionary::Ptr& subscriptions)
{
- return m_Subscriptions.end();
+ Set("subscriptions", subscriptions);
}
-/**
- * Sets whether a welcome message has been received from this endpoint.
- *
- * @param value Whether we've received a welcome message.
- */
-void Endpoint::SetReceivedWelcome(bool value)
+void Endpoint::RegisterTopicHandler(const String& topic, const function<Endpoint::Callback>& callback)
{
- m_ReceivedWelcome = value;
+ map<String, shared_ptr<boost::signal<Endpoint::Callback> > >::iterator it;
+ it = m_TopicHandlers.find(topic);
+
+ shared_ptr<boost::signal<Endpoint::Callback> > sig;
+
+ if (it == m_TopicHandlers.end()) {
+ sig = boost::make_shared<boost::signal<Endpoint::Callback> >();
+ m_TopicHandlers.insert(make_pair(topic, sig));
+ } else {
+ sig = it->second;
+ }
+
+ sig->connect(callback);
+
+ RegisterSubscription(topic);
}
-/**
- * Retrieves whether a welcome message has been received from this endpoint.
- *
- * @returns Whether we've received a welcome message.
- */
-bool Endpoint::HasReceivedWelcome(void) const
+void Endpoint::UnregisterTopicHandler(const String& topic, const function<Endpoint::Callback>& callback)
{
- return m_ReceivedWelcome;
+ // TODO: implement
+ //m_TopicHandlers[method] -= callback;
+ //UnregisterSubscription(method);
+
+ throw_exception(NotImplementedException());
}
-/**
- * Sets whether a welcome message has been sent to this endpoint.
- *
- * @param value Whether we've sent a welcome message.
- */
-void Endpoint::SetSentWelcome(bool value)
+void Endpoint::OnAttributeChanged(const String& name, const Value& oldValue)
{
- m_SentWelcome = value;
+ if (name == "subscriptions") {
+ Dictionary::Ptr oldSubscriptions, newSubscriptions;
+
+ if (oldValue.IsObjectType<Dictionary>())
+ oldSubscriptions = oldValue;
+
+ newSubscriptions = GetSubscriptions();
+
+ if (oldSubscriptions) {
+ String subscription;
+ BOOST_FOREACH(tie(tuples::ignore, subscription), oldSubscriptions) {
+ if (!newSubscriptions || !newSubscriptions->Contains(subscription)) {
+ Logger::Write(LogInformation, "icinga", "Removed subscription for '" + GetName() + "': " + subscription);
+ OnSubscriptionUnregistered(GetSelf(), subscription);
+ }
+ }
+ }
+
+ if (newSubscriptions) {
+ String subscription;
+ BOOST_FOREACH(tie(tuples::ignore, subscription), newSubscriptions) {
+ if (!oldSubscriptions || !oldSubscriptions->Contains(subscription)) {
+ Logger::Write(LogInformation, "icinga", "New subscription for '" + GetName() + "': " + subscription);
+ OnSubscriptionRegistered(GetSelf(), subscription);
+ }
+ }
+ }
+ }
}
-/**
- * Retrieves whether a welcome message has been sent to this endpoint.
- *
- * @returns Whether we've sent a welcome message.
- */
-bool Endpoint::HasSentWelcome(void) const
+void Endpoint::ProcessRequest(const Endpoint::Ptr& sender, const RequestMessage& request)
{
- return m_SentWelcome;
+ if (!IsConnected()) {
+ // TODO: persist the message
+ return;
+ }
+
+ if (IsLocalEndpoint()) {
+ String method;
+ if (!request.GetMethod(&method))
+ return;
+
+ map<String, shared_ptr<boost::signal<Endpoint::Callback> > >::iterator it;
+ it = m_TopicHandlers.find(method);
+
+ if (it == m_TopicHandlers.end())
+ return;
+
+ (*it->second)(GetSelf(), sender, request);
+ } else {
+ GetClient()->SendMessage(request);
+ }
}
+
+void Endpoint::ProcessResponse(const Endpoint::Ptr& sender, const ResponseMessage& response)
+{
+ if (!IsConnected())
+ return;
+
+ if (IsLocalEndpoint())
+ EndpointManager::GetInstance()->ProcessResponseMessage(sender, response);
+ else
+ GetClient()->SendMessage(response);
+}
+
+void Endpoint::NewMessageHandler(const MessagePart& message)
+{
+ Endpoint::Ptr sender = GetSelf();
+
+ if (ResponseMessage::IsResponseMessage(message)) {
+ /* rather than routing the message to the right virtual
+ * endpoint we just process it here right away. */
+ EndpointManager::GetInstance()->ProcessResponseMessage(sender, message);
+ return;
+ }
+
+ RequestMessage request = message;
+
+ String method;
+ if (!request.GetMethod(&method))
+ return;
+
+ String id;
+ if (request.GetID(&id))
+ EndpointManager::GetInstance()->SendAnycastMessage(sender, request);
+ else
+ EndpointManager::GetInstance()->SendMulticastMessage(sender, request);
+}
+
+void Endpoint::ClientClosedHandler(void)
+{
+ try {
+ GetClient()->CheckException();
+ } catch (const exception& ex) {
+ stringstream message;
+ message << "Error occured for JSON-RPC socket: Message=" << ex.what();
+
+ Logger::Write(LogWarning, "jsonrpc", message.str());
+ }
+
+ Logger::Write(LogWarning, "jsonrpc", "Lost connection to endpoint: identity=" + GetName());
+
+ // TODO: _only_ clear non-persistent subscriptions
+ // unregister ourselves if no persistent subscriptions are left (use a
+ // timer for that, once we have a TTL property for the topics)
+ ClearSubscriptions();
+
+ Set("client", Empty);
+
+ OnDisconnected(GetSelf());
+}
+
+String Endpoint::GetNode(void) const
+{
+ return Get("node");
+}
+
+String Endpoint::GetService(void) const
+{
+ return Get("service");
+}
+
*
* @ingroup icinga
*/
-class I2_ICINGA_API Endpoint : public Object
+class I2_ICINGA_API Endpoint : public DynamicObject
{
public:
typedef shared_ptr<Endpoint> Ptr;
typedef weak_ptr<Endpoint> WeakPtr;
- typedef set<String>::const_iterator ConstTopicIterator;
+ typedef void (Callback)(const Endpoint::Ptr&, const Endpoint::Ptr&, const RequestMessage&);
- Endpoint(void)
- : m_ReceivedWelcome(false), m_SentWelcome(false)
- { }
+ Endpoint(const Dictionary::Ptr& serializedUpdate);
- virtual String GetIdentity(void) const = 0;
- virtual String GetAddress(void) const = 0;
+ static bool Exists(const String& name);
+ static Endpoint::Ptr GetByName(const String& name);
- void SetReceivedWelcome(bool value);
- bool HasReceivedWelcome(void) const;
+ String GetAddress(void) const;
- void SetSentWelcome(bool value);
- bool HasSentWelcome(void) const;
+ JsonRpcClient::Ptr GetClient(void) const;
+ void SetClient(const JsonRpcClient::Ptr& client);
- shared_ptr<EndpointManager> GetEndpointManager(void) const;
- void SetEndpointManager(weak_ptr<EndpointManager> manager);
+ void RegisterSubscription(const String& topic);
+ void UnregisterSubscription(const String& topic);
+ bool HasSubscription(const String& topic) const;
- void RegisterSubscription(String topic);
- void UnregisterSubscription(String topic);
- bool HasSubscription(String topic) const;
+ Dictionary::Ptr GetSubscriptions(void) const;
+ void SetSubscriptions(const Dictionary::Ptr& subscriptions);
- virtual bool IsLocal(void) const = 0;
- virtual bool IsConnected(void) const = 0;
+ bool IsLocalEndpoint(void) const;
+ bool IsConnected(void) const;
- virtual void ProcessRequest(Endpoint::Ptr sender, const RequestMessage& message) = 0;
- virtual void ProcessResponse(Endpoint::Ptr sender, const ResponseMessage& message) = 0;
-
- virtual void Stop(void) = 0;
+ void ProcessRequest(const Endpoint::Ptr& sender, const RequestMessage& message);
+ void ProcessResponse(const Endpoint::Ptr& sender, const ResponseMessage& message);
void ClearSubscriptions(void);
- ConstTopicIterator BeginSubscriptions(void) const;
- ConstTopicIterator EndSubscriptions(void) const;
+ void RegisterTopicHandler(const String& topic, const function<Callback>& callback);
+ void UnregisterTopicHandler(const String& topic, const function<Callback>& callback);
+
+ virtual void OnAttributeChanged(const String& name, const Value& oldValue);
+
+ String GetNode(void) const;
+ String GetService(void) const;
- boost::signal<void (const Endpoint::Ptr&)> OnSessionEstablished;
+ static Endpoint::Ptr MakeEndpoint(const String& name, bool local);
+
+ static boost::signal<void (const Endpoint::Ptr&)> OnConnected;
+ static boost::signal<void (const Endpoint::Ptr&)> OnDisconnected;
+
+ static boost::signal<void (const Endpoint::Ptr&, const String& topic)> OnSubscriptionRegistered;
+ static boost::signal<void (const Endpoint::Ptr&, const String& topic)> OnSubscriptionUnregistered;
private:
- set<String> m_Subscriptions; /**< The topics this endpoint is
- subscribed to. */
bool m_ReceivedWelcome; /**< Have we received a welcome message
from this endpoint? */
bool m_SentWelcome; /**< Have we sent a welcome message to this
endpoint? */
- weak_ptr<EndpointManager> m_EndpointManager; /**< The endpoint manager
- this endpoint is
- registered with. */
+ map<String, shared_ptr<boost::signal<Callback> > > m_TopicHandlers;
+
+ void NewMessageHandler(const MessagePart& message);
+ void ClientClosedHandler(void);
};
}
m_RequestTimer->OnTimerExpired.connect(boost::bind(&EndpointManager::RequestTimerHandler, this));
m_RequestTimer->SetInterval(5);
m_RequestTimer->Start();
+
+ m_SubscriptionTimer = boost::make_shared<Timer>();
+ m_SubscriptionTimer->OnTimerExpired.connect(boost::bind(&EndpointManager::SubscriptionTimerHandler, this));
+ m_SubscriptionTimer->SetInterval(10);
+ m_SubscriptionTimer->Start();
+
+ m_ReconnectTimer = boost::make_shared<Timer>();
+ m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&EndpointManager::ReconnectTimerHandler, this));
+ m_ReconnectTimer->SetInterval(10);
+ m_ReconnectTimer->Start();
}
/**
void EndpointManager::SetIdentity(const String& identity)
{
m_Identity = identity;
+
+ if (m_Endpoint)
+ m_Endpoint->Unregister();
+
+ DynamicObject::Ptr object = DynamicObject::GetObject("Endpoint", identity);
+
+ if (object)
+ m_Endpoint = dynamic_pointer_cast<Endpoint>(object);
+ else
+ m_Endpoint = Endpoint::MakeEndpoint(identity, false);
}
/**
return m_Identity;
}
-/**
- * Sets the SSL context that is used for remote connections.
- *
- * @param sslContext The new SSL context.
- */
-void EndpointManager::SetSSLContext(const shared_ptr<SSL_CTX>& sslContext)
-{
- m_SSLContext = sslContext;
-}
-
-/**
- * Retrieves the SSL context that is used for remote connections.
- *
- * @returns The SSL context.
- */
-shared_ptr<SSL_CTX> EndpointManager::GetSSLContext(void) const
-{
- return m_SSLContext;
-}
-
/**
* Creates a new JSON-RPC listener on the specified port.
*
*/
void EndpointManager::AddListener(const String& service)
{
- if (!GetSSLContext())
+ shared_ptr<SSL_CTX> sslContext = IcingaApplication::GetInstance()->GetSSLContext();
+
+ if (!sslContext)
throw_exception(logic_error("SSL context is required for AddListener()"));
stringstream s;
s << "Adding new listener: port " << service;
Logger::Write(LogInformation, "icinga", s.str());
- JsonRpcServer::Ptr server = boost::make_shared<JsonRpcServer>(m_SSLContext);
- RegisterServer(server);
+ JsonRpcServer::Ptr server = boost::make_shared<JsonRpcServer>(sslContext);
+
+ m_Servers.insert(server);
+ server->OnNewClient.connect(boost::bind(&EndpointManager::NewClientHandler,
+ this, _2));
server->Bind(service, AF_INET6);
server->Listen();
* @param node The remote host.
* @param service The remote port.
*/
-void EndpointManager::AddConnection(const String& node, const String& service)
-{
- stringstream s;
- s << "Adding new endpoint: [" << node << "]:" << service;
- Logger::Write(LogInformation, "icinga", s.str());
-
- JsonRpcEndpoint::Ptr endpoint = boost::make_shared<JsonRpcEndpoint>();
- RegisterEndpoint(endpoint);
- endpoint->Connect(node, service, m_SSLContext);
-}
-
-/**
- * Registers a new JSON-RPC server with this endpoint manager.
- *
- * @param server The JSON-RPC server.
- */
-void EndpointManager::RegisterServer(const JsonRpcServer::Ptr& server)
-{
- m_Servers.push_back(server);
- server->OnNewClient.connect(boost::bind(&EndpointManager::NewClientHandler,
- this, _2));
+void EndpointManager::AddConnection(const String& node, const String& service) {
+ JsonRpcClient::Ptr client = boost::make_shared<JsonRpcClient>(RoleOutbound,
+ IcingaApplication::GetInstance()->GetSSLContext());
+ client->Connect(node, service);
+ NewClientHandler(client);
}
/**
* Processes a new client connection.
*
- * @param ncea Event arguments.
+ * @param client The new client.
*/
void EndpointManager::NewClientHandler(const TcpClient::Ptr& client)
{
- Logger::Write(LogInformation, "icinga", "Accepted new client from " + client->GetPeerAddress());
+ JsonRpcClient::Ptr jclient = static_pointer_cast<JsonRpcClient>(client);
- JsonRpcEndpoint::Ptr endpoint = boost::make_shared<JsonRpcEndpoint>();
- endpoint->SetClient(static_pointer_cast<JsonRpcClient>(client));
- client->Start();
- RegisterEndpoint(endpoint);
-}
+ Logger::Write(LogInformation, "icinga", "New client connection from " + jclient->GetPeerAddress());
-/**
- * Unregisters a JSON-RPC server.
- *
- * @param server The JSON-RPC server.
- */
-void EndpointManager::UnregisterServer(const JsonRpcServer::Ptr& server)
-{
- m_Servers.erase(
- remove(m_Servers.begin(), m_Servers.end(), server),
- m_Servers.end());
- // TODO: unbind event
+ m_PendingClients.insert(jclient);
+ jclient->OnConnected.connect(boost::bind(&EndpointManager::ClientConnectedHandler, this, _1));
+ jclient->Start();
}
-/**
- * Registers a new endpoint with this endpoint manager.
- *
- * @param endpoint The new endpoint.
- */
-void EndpointManager::RegisterEndpoint(const Endpoint::Ptr& endpoint)
+void EndpointManager::ClientConnectedHandler(const TcpClient::Ptr& client)
{
- endpoint->SetEndpointManager(GetSelf());
+ JsonRpcClient::Ptr jclient = static_pointer_cast<JsonRpcClient>(client);
- UnregisterEndpoint(endpoint);
+ m_PendingClients.erase(jclient);
- String identity = endpoint->GetIdentity();
+ shared_ptr<X509> cert = jclient->GetPeerCertificate();
- if (!identity.IsEmpty()) {
- m_Endpoints[identity] = endpoint;
- OnNewEndpoint(GetSelf(), endpoint);
- } else {
- m_PendingEndpoints.push_back(endpoint);
- }
+ String identity = Utility::GetCertificateCN(cert);
- if (endpoint->IsLocal()) {
- /* this endpoint might have introduced new subscriptions
- * or publications which affect remote endpoints, we need
- * to close all fully-connected remote endpoints to make sure
- * these subscriptions/publications are kept up-to-date. */
- Iterator prev, it;
- for (it = m_Endpoints.begin(); it != m_Endpoints.end(); ) {
- prev = it;
- it++;
-
- if (!prev->second->IsLocal())
- m_Endpoints.erase(prev);
- }
- }
-}
+ Endpoint::Ptr endpoint;
-/**
- * Unregisters an endpoint.
- *
- * @param endpoint The endpoint.
- */
-void EndpointManager::UnregisterEndpoint(const Endpoint::Ptr& endpoint)
-{
- m_PendingEndpoints.erase(
- remove(m_PendingEndpoints.begin(), m_PendingEndpoints.end(), endpoint),
- m_PendingEndpoints.end());
+ if (Endpoint::Exists(identity))
+ endpoint = Endpoint::GetByName(identity);
+ else
+ endpoint = Endpoint::MakeEndpoint(identity, false);
- String identity = endpoint->GetIdentity();
- if (!identity.IsEmpty())
- m_Endpoints.erase(identity);
+ endpoint->SetClient(jclient);
}
/**
throw_exception(invalid_argument("Message is missing the 'method' property."));
vector<Endpoint::Ptr> candidates;
- Endpoint::Ptr endpoint;
- BOOST_FOREACH(tie(tuples::ignore, endpoint), m_Endpoints) {
+ DynamicObject::Ptr object;
+ BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) {
+ Endpoint::Ptr endpoint = dynamic_pointer_cast<Endpoint>(object);
/* don't forward messages between non-local endpoints */
if (!sender->IsLocal() && !endpoint->IsLocal())
continue;
if (!message.GetMethod(&method))
throw_exception(invalid_argument("Message is missing the 'method' property."));
- Endpoint::Ptr recipient;
- BOOST_FOREACH(tie(tuples::ignore, recipient), m_Endpoints) {
+ DynamicObject::Ptr object;
+ BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) {
+ Endpoint::Ptr recipient = dynamic_pointer_cast<Endpoint>(object);
+
/* don't forward messages back to the sender */
if (sender == recipient)
continue;
*
* @param callback The callback function.
*/
-void EndpointManager::ForEachEndpoint(function<void (const EndpointManager::Ptr&, const Endpoint::Ptr&)> callback)
-{
- map<String, Endpoint::Ptr>::iterator prev, i;
- for (i = m_Endpoints.begin(); i != m_Endpoints.end(); ) {
- prev = i;
- i++;
-
- callback(GetSelf(), prev->second);
- }
-}
-
-/**
- * Retrieves an endpoint that has the specified identity.
- *
- * @param identity The identity of the endpoint.
- */
-Endpoint::Ptr EndpointManager::GetEndpointByIdentity(const String& identity) const
-{
- map<String, Endpoint::Ptr>::const_iterator i;
- i = m_Endpoints.find(identity);
- if (i != m_Endpoints.end())
- return i->second;
- else
- return Endpoint::Ptr();
-}
+//void EndpointManager::ForEachEndpoint(function<void (const EndpointManager::Ptr&, const Endpoint::Ptr&)> callback)
+//{
+// map<String, Endpoint::Ptr>::iterator prev, i;
+// for (i = m_Endpoints.begin(); i != m_Endpoints.end(); ) {
+// prev = i;
+// i++;
+//
+// callback(GetSelf(), prev->second);
+// }
+//}
void EndpointManager::SendAPIMessage(const Endpoint::Ptr& sender, const Endpoint::Ptr& recipient,
RequestMessage& message,
return a.second.Timeout < b.second.Timeout;
}
+void EndpointManager::SubscriptionTimerHandler(void)
+{
+ Dictionary::Ptr subscriptions = boost::make_shared<Dictionary>();
+
+ DynamicObject::Ptr object;
+ BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) {
+ Endpoint::Ptr endpoint = dynamic_pointer_cast<Endpoint>(object);
+
+ if (!endpoint->IsLocalEndpoint())
+ continue;
+
+ String topic;
+ BOOST_FOREACH(tie(tuples::ignore, topic), endpoint->GetSubscriptions()) {
+ subscriptions->Set(topic, topic);
+ }
+ }
+
+ m_Endpoint->SetSubscriptions(subscriptions);
+}
+
+void EndpointManager::ReconnectTimerHandler(void)
+{
+ DynamicObject::Ptr object;
+ BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) {
+ Endpoint::Ptr endpoint = dynamic_pointer_cast<Endpoint>(object);
+
+ if (endpoint->IsConnected())
+ continue;
+
+ String node, service;
+ node = endpoint->GetNode();
+ service = endpoint->GetService();
+
+ if (node.IsEmpty() || service.IsEmpty())
+ continue;
+
+ AddConnection(node, service);
+ }
+}
+
void EndpointManager::RequestTimerHandler(void)
{
map<String, PendingRequest>::iterator it;
m_Requests.erase(it);
}
-EndpointManager::Iterator EndpointManager::Begin(void)
-{
- return m_Endpoints.begin();
-}
+//EndpointManager::Iterator EndpointManager::Begin(void)
+//{
+// return m_Endpoints.begin();
+//}
-EndpointManager::Iterator EndpointManager::End(void)
-{
- return m_Endpoints.end();
-}
+//EndpointManager::Iterator EndpointManager::End(void)
+//{
+// return m_Endpoints.end();
+//}
EndpointManager::Ptr EndpointManager::GetInstance(void)
{
typedef shared_ptr<EndpointManager> Ptr;
typedef weak_ptr<EndpointManager> WeakPtr;
- typedef map<String, Endpoint::Ptr>::iterator Iterator;
+// typedef map<String, Endpoint::Ptr>::iterator Iterator;
EndpointManager(void);
void AddListener(const String& service);
void AddConnection(const String& node, const String& service);
- void RegisterEndpoint(const Endpoint::Ptr& endpoint);
- void UnregisterEndpoint(const Endpoint::Ptr& endpoint);
-
void SendUnicastMessage(const Endpoint::Ptr& sender, const Endpoint::Ptr& recipient, const MessagePart& message);
void SendAnycastMessage(const Endpoint::Ptr& sender, const RequestMessage& message);
void SendMulticastMessage(const Endpoint::Ptr& sender, const RequestMessage& message);
void ProcessResponseMessage(const Endpoint::Ptr& sender, const ResponseMessage& message);
- void ForEachEndpoint(function<void (const EndpointManager::Ptr&, const Endpoint::Ptr&)> callback);
- Iterator Begin(void);
- Iterator End(void);
-
- Endpoint::Ptr GetEndpointByIdentity(const String& identity) const;
+// void ForEachEndpoint(function<void (const EndpointManager::Ptr&, const Endpoint::Ptr&)> callback);
+// Iterator Begin(void);
+// Iterator End(void);
boost::signal<void (const EndpointManager::Ptr&, const Endpoint::Ptr&)> OnNewEndpoint;
private:
String m_Identity;
- shared_ptr<SSL_CTX> m_SSLContext;
+ Endpoint::Ptr m_Endpoint;
+
+ Timer::Ptr m_SubscriptionTimer;
+
+ Timer::Ptr m_ReconnectTimer;
- vector<JsonRpcServer::Ptr> m_Servers;
- vector<Endpoint::Ptr> m_PendingEndpoints;
- map<String, Endpoint::Ptr> m_Endpoints;
+ set<JsonRpcServer::Ptr> m_Servers;
+ set<JsonRpcClient::Ptr> m_PendingClients;
/**
* Information about a pending API request.
map<String, PendingRequest> m_Requests;
Timer::Ptr m_RequestTimer;
- void RegisterServer(const JsonRpcServer::Ptr& server);
- void UnregisterServer(const JsonRpcServer::Ptr& server);
-
static bool RequestTimeoutLessComparer(const pair<String, PendingRequest>& a, const pair<String, PendingRequest>& b);
void RequestTimerHandler(void);
+ void SubscriptionTimerHandler(void);
+
+ void ReconnectTimerHandler(void);
+
void NewClientHandler(const TcpClient::Ptr& client);
+ void ClientConnectedHandler(const TcpClient::Ptr& client);
};
}
#endif /* I2_ICINGA_BUILD */
#include "endpoint.h"
-#include "jsonrpcendpoint.h"
-#include "virtualendpoint.h"
#include "endpointmanager.h"
#include "icingaapplication.h"
Logger::Write(LogInformation, "icinga", "My identity: " + identity);
EndpointManager::GetInstance()->SetIdentity(identity);
- shared_ptr<SSL_CTX> sslContext = Utility::MakeSSLContext(GetCertificateFile(), GetCertificateFile(), GetCAFile());
- EndpointManager::GetInstance()->SetSSLContext(sslContext);
+ m_SSLContext = Utility::MakeSSLContext(GetCertificateFile(), GetCertificateFile(), GetCAFile());
}
/* create the primary RPC listener */
{
return m_StartTime;
}
+
+shared_ptr<SSL_CTX> IcingaApplication::GetSSLContext(void) const
+{
+ return m_SSLContext;
+}
String GetPidPath(void) const;
String GetStatePath(void) const;
Dictionary::Ptr GetMacros(void) const;
+ shared_ptr<SSL_CTX> GetSSLContext(void) const;
double GetStartTime(void) const;
String m_PidPath;
String m_StatePath;
Dictionary::Ptr m_Macros;
+ shared_ptr<SSL_CTX> m_SSLContext;
double m_StartTime;
+++ /dev/null
-/******************************************************************************
- * Icinga 2 *
- * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
- * *
- * This program is free software; you can redistribute it and/or *
- * modify it under the terms of the GNU General Public License *
- * as published by the Free Software Foundation; either version 2 *
- * of the License, or (at your option) any later version. *
- * *
- * This program is distributed in the hope that it will be useful, *
- * but WITHOUT ANY WARRANTY; without even the implied warranty of *
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
- * GNU General Public License for more details. *
- * *
- * You should have received a copy of the GNU General Public License *
- * along with this program; if not, write to the Free Software Foundation *
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
- ******************************************************************************/
-
-#include "i2-icinga.h"
-
-using namespace icinga;
-
-String JsonRpcEndpoint::GetIdentity(void) const
-{
- return m_Identity;
-}
-
-String JsonRpcEndpoint::GetAddress(void) const
-{
- if (!m_Client)
- return "<disconnected endpoint>";
-
- return m_Client->GetPeerAddress();
-}
-
-JsonRpcClient::Ptr JsonRpcEndpoint::GetClient(void)
-{
- return m_Client;
-}
-
-void JsonRpcEndpoint::Connect(String node, String service, shared_ptr<SSL_CTX> sslContext)
-{
- JsonRpcClient::Ptr client = boost::make_shared<JsonRpcClient>(RoleOutbound, sslContext);
- SetClient(client);
- client->Connect(node, service);
- client->Start();
-}
-
-void JsonRpcEndpoint::SetClient(JsonRpcClient::Ptr client)
-{
- m_Client = client;
- client->OnNewMessage.connect(boost::bind(&JsonRpcEndpoint::NewMessageHandler, this, _2));
- client->OnClosed.connect(boost::bind(&JsonRpcEndpoint::ClientClosedHandler, this));
- client->OnConnected.connect(boost::bind(&JsonRpcEndpoint::ClientConnectedHandler, this));
-}
-
-bool JsonRpcEndpoint::IsLocal(void) const
-{
- return false;
-}
-
-bool JsonRpcEndpoint::IsConnected(void) const
-{
- return (m_Client && m_Client->IsConnected());
-}
-
-void JsonRpcEndpoint::ProcessRequest(Endpoint::Ptr sender, const RequestMessage& message)
-{
- if (IsConnected()) {
- m_Client->SendMessage(message);
- } else {
- // TODO: persist the event
- }
-}
-
-void JsonRpcEndpoint::ProcessResponse(Endpoint::Ptr sender, const ResponseMessage& message)
-{
- m_Client->SendMessage(message);
-}
-
-void JsonRpcEndpoint::NewMessageHandler(const MessagePart& message)
-{
- Endpoint::Ptr sender = GetSelf();
-
- if (ResponseMessage::IsResponseMessage(message)) {
- /* rather than routing the message to the right virtual
- * endpoint we just process it here right away. */
- GetEndpointManager()->ProcessResponseMessage(sender, message);
- return;
- }
-
- RequestMessage request = message;
-
- String method;
- if (!request.GetMethod(&method))
- return;
-
- String id;
- if (request.GetID(&id))
- GetEndpointManager()->SendAnycastMessage(sender, request);
- else
- GetEndpointManager()->SendMulticastMessage(sender, request);
-}
-
-void JsonRpcEndpoint::ClientClosedHandler(void)
-{
- try {
- m_Client->CheckException();
- } catch (const exception& ex) {
- stringstream message;
- message << "Error occured for JSON-RPC socket: Message=" << ex.what();
-
- Logger::Write(LogWarning, "jsonrpc", message.str());
- }
-
- Logger::Write(LogWarning, "jsonrpc", "Lost connection to endpoint: identity=" + GetIdentity());
-
- // TODO: _only_ clear non-persistent subscriptions
- // unregister ourselves if no persistent subscriptions are left (use a timer for that, once we have a TTL property for the topics)
- ClearSubscriptions();
-
- // remove the endpoint if there are no more subscriptions */
- if (BeginSubscriptions() == EndSubscriptions()) {
- Hold();
- GetEndpointManager()->UnregisterEndpoint(GetSelf());
- }
-
- m_Client.reset();
-
- // TODO: persist events, etc., for now we just disable the endpoint
-}
-
-void JsonRpcEndpoint::ClientConnectedHandler(void)
-{
- String identity = Utility::GetCertificateCN(m_Client->GetPeerCertificate());
-
- if (GetIdentity().IsEmpty() && !identity.IsEmpty()) {
- m_Identity = identity;
- GetEndpointManager()->RegisterEndpoint(GetSelf());
- }
-}
-
-void JsonRpcEndpoint::Stop(void)
-{
- if (m_Client)
- m_Client->Close();
-}
+++ /dev/null
-/******************************************************************************
- * Icinga 2 *
- * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
- * *
- * This program is free software; you can redistribute it and/or *
- * modify it under the terms of the GNU General Public License *
- * as published by the Free Software Foundation; either version 2 *
- * of the License, or (at your option) any later version. *
- * *
- * This program is distributed in the hope that it will be useful, *
- * but WITHOUT ANY WARRANTY; without even the implied warranty of *
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
- * GNU General Public License for more details. *
- * *
- * You should have received a copy of the GNU General Public License *
- * along with this program; if not, write to the Free Software Foundation *
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
- ******************************************************************************/
-
-#ifndef JSONRPCENDPOINT_H
-#define JSONRPCENDPOINT_H
-
-namespace icinga
-{
-
-/**
- * A JSON-RPC endpoint that can be used to communicate with a remote
- * Icinga instance.
- *
- * @ingroup icinga
- */
-class I2_ICINGA_API JsonRpcEndpoint : public Endpoint
-{
-public:
- typedef shared_ptr<JsonRpcEndpoint> Ptr;
- typedef weak_ptr<JsonRpcEndpoint> WeakPtr;
-
- void Connect(String node, String service,
- shared_ptr<SSL_CTX> sslContext);
-
- JsonRpcClient::Ptr GetClient(void);
- void SetClient(JsonRpcClient::Ptr client);
-
- virtual String GetIdentity(void) const;
- virtual String GetAddress(void) const;
-
- virtual bool IsLocal(void) const;
- virtual bool IsConnected(void) const;
-
- virtual void ProcessRequest(Endpoint::Ptr sender, const RequestMessage& message);
- virtual void ProcessResponse(Endpoint::Ptr sender, const ResponseMessage& message);
-
- virtual void Stop(void);
-
-private:
- String m_Identity; /**< The identity of this endpoint. */
-
- shared_ptr<SSL_CTX> m_SSLContext;
- String m_Address;
- JsonRpcClient::Ptr m_Client;
-
- void SetAddress(String address);
-
- void NewMessageHandler(const MessagePart& message);
- void ClientClosedHandler(void);
- void ClientConnectedHandler(void);
-};
-
-}
-
-#endif /* JSONRPCENDPOINT_H */
+++ /dev/null
-/******************************************************************************
- * Icinga 2 *
- * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
- * *
- * This program is free software; you can redistribute it and/or *
- * modify it under the terms of the GNU General Public License *
- * as published by the Free Software Foundation; either version 2 *
- * of the License, or (at your option) any later version. *
- * *
- * This program is distributed in the hope that it will be useful, *
- * but WITHOUT ANY WARRANTY; without even the implied warranty of *
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
- * GNU General Public License for more details. *
- * *
- * You should have received a copy of the GNU General Public License *
- * along with this program; if not, write to the Free Software Foundation *
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
- ******************************************************************************/
-
-#include "i2-icinga.h"
-
-using namespace icinga;
-
-String VirtualEndpoint::GetIdentity(void) const
-{
- return "__" + GetAddress();
-}
-
-String VirtualEndpoint::GetAddress(void) const
-{
- char address[50];
- sprintf(address, "virtual:%p", (void *)this);
- return address;
-}
-
-bool VirtualEndpoint::IsLocal(void) const
-{
- return true;
-}
-
-bool VirtualEndpoint::IsConnected(void) const
-{
- return true;
-}
-
-void VirtualEndpoint::RegisterTopicHandler(String topic, function<void (const VirtualEndpoint::Ptr&, const Endpoint::Ptr, const RequestMessage&)> callback)
-{
- map<String, shared_ptr<boost::signal<void (const VirtualEndpoint::Ptr&, const Endpoint::Ptr, const RequestMessage&)> > >::iterator it;
- it = m_TopicHandlers.find(topic);
-
- shared_ptr<boost::signal<void (const VirtualEndpoint::Ptr&, const Endpoint::Ptr, const RequestMessage&)> > sig;
-
- if (it == m_TopicHandlers.end()) {
- sig = boost::make_shared<boost::signal<void (const VirtualEndpoint::Ptr&, const Endpoint::Ptr, const RequestMessage&)> >();
- m_TopicHandlers.insert(make_pair(topic, sig));
- } else {
- sig = it->second;
- }
-
- sig->connect(callback);
-
- RegisterSubscription(topic);
-}
-
-void VirtualEndpoint::UnregisterTopicHandler(String topic, function<void (const VirtualEndpoint::Ptr&, const Endpoint::Ptr, const RequestMessage&)> callback)
-{
- // TODO: implement
- //m_TopicHandlers[method] -= callback;
- //UnregisterMethodSubscription(method);
-
- throw_exception(NotImplementedException());
-}
-
-void VirtualEndpoint::ProcessRequest(Endpoint::Ptr sender, const RequestMessage& request)
-{
- String method;
- if (!request.GetMethod(&method))
- return;
-
- map<String, shared_ptr<boost::signal<void (const VirtualEndpoint::Ptr&, const Endpoint::Ptr, const RequestMessage&)> > >::iterator it;
- it = m_TopicHandlers.find(method);
-
- if (it == m_TopicHandlers.end())
- return;
-
- (*it->second)(GetSelf(), sender, request);
-}
-
-void VirtualEndpoint::ProcessResponse(Endpoint::Ptr sender, const ResponseMessage& response)
-{
- GetEndpointManager()->ProcessResponseMessage(sender, response);
-}
-
-void VirtualEndpoint::Stop(void)
-{
- /* Nothing to do here. */
-}
+++ /dev/null
-/******************************************************************************
- * Icinga 2 *
- * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
- * *
- * This program is free software; you can redistribute it and/or *
- * modify it under the terms of the GNU General Public License *
- * as published by the Free Software Foundation; either version 2 *
- * of the License, or (at your option) any later version. *
- * *
- * This program is distributed in the hope that it will be useful, *
- * but WITHOUT ANY WARRANTY; without even the implied warranty of *
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
- * GNU General Public License for more details. *
- * *
- * You should have received a copy of the GNU General Public License *
- * along with this program; if not, write to the Free Software Foundation *
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
- ******************************************************************************/
-
-#ifndef VIRTUALENDPOINT_H
-#define VIRTUALENDPOINT_H
-
-namespace icinga
-{
-
-/**
- * A local endpoint.
- *
- * @ingroup icinga
- */
-class I2_ICINGA_API VirtualEndpoint : public Endpoint
-{
-public:
- typedef shared_ptr<VirtualEndpoint> Ptr;
- typedef weak_ptr<VirtualEndpoint> WeakPtr;
-
- void RegisterTopicHandler(String topic, function<void (const VirtualEndpoint::Ptr&, const Endpoint::Ptr, const RequestMessage&)> callback);
- void UnregisterTopicHandler(String topic, function<void (const VirtualEndpoint::Ptr&, const Endpoint::Ptr, const RequestMessage&)> callback);
-
- virtual String GetIdentity(void) const;
- virtual String GetAddress(void) const;
-
- virtual bool IsLocal(void) const;
- virtual bool IsConnected(void) const;
-
- virtual void ProcessRequest(Endpoint::Ptr sender, const RequestMessage& message);
- virtual void ProcessResponse(Endpoint::Ptr sender, const ResponseMessage& message);
-
- virtual void Stop(void);
-
-private:
- map< String, shared_ptr<boost::signal<void (const VirtualEndpoint::Ptr&, const Endpoint::Ptr, const RequestMessage&)> > > m_TopicHandlers;
-};
-
-}
-
-#endif /* VIRTUALENDPOINT_H */
void JsonRpcClient::SendMessage(const MessagePart& message)
{
Value value = message.GetDictionary();
- NetString::WriteStringToIOQueue(this, value.Serialize());
+ String json = value.Serialize();
+ //std::cerr << ">> " << json << std::endl;
+ NetString::WriteStringToIOQueue(this, json);
}
/**
String jsonString;
while (NetString::ReadStringFromIOQueue(this, &jsonString)) {
+ //std::cerr << "<< " << jsonString << std::endl;
+
try {
Value value = Value::Deserialize(jsonString);