void CheckerComponent::Start(void)
{
m_Endpoint = boost::make_shared<VirtualEndpoint>();
- m_Endpoint->RegisterPublication("checker::ServiceStateChange");
+
+ /* dummy registration so the delegation module knows this is a checker
+ TODO: figure out a better way for this */
+ m_Endpoint->RegisterSubscription("checker");
+
EndpointManager::GetInstance()->RegisterEndpoint(m_Endpoint);
Service::OnCheckerChanged.connect(bind(&CheckerComponent::CheckerChangedHandler, this, _1));
DynamicObject::OnUnregistered.connect(boost::bind(&CIBSyncComponent::LocalObjectUnregisteredHandler, this, _1));
DynamicObject::OnTransactionClosing.connect(boost::bind(&CIBSyncComponent::TransactionClosingHandler, this, _1));
- m_Endpoint->RegisterPublication("config::ObjectUpdate");
- m_Endpoint->RegisterPublication("config::ObjectRemoved");
-
EndpointManager::GetInstance()->OnNewEndpoint.connect(boost::bind(&CIBSyncComponent::NewEndpointHandler, this, _2));
- m_Endpoint->RegisterPublication("config::FetchObjects");
m_Endpoint->RegisterTopicHandler("config::ObjectUpdate",
boost::bind(&CIBSyncComponent::RemoteObjectUpdateHandler, this, _2, _3));
m_Endpoint->RegisterTopicHandler("config::ObjectRemoved",
bool DelegationComponent::IsEndpointChecker(const Endpoint::Ptr& endpoint)
{
- return (endpoint->HasPublication("checker::ServiceStateChange"));
+ return (endpoint->HasSubscription("checker"));
}
vector<Endpoint::Ptr> DelegationComponent::GetCheckerCandidates(const Service::Ptr& service) const
m_Endpoint = boost::make_shared<VirtualEndpoint>();
m_Endpoint->RegisterTopicHandler("demo::HelloWorld",
boost::bind(&DemoComponent::HelloWorldRequestHandler, this, _2, _3));
- m_Endpoint->RegisterPublication("demo::HelloWorld");
EndpointManager::GetInstance()->RegisterEndpoint(m_Endpoint);
m_DemoTimer = boost::make_shared<Timer>();
{
m_Endpoint = boost::make_shared<VirtualEndpoint>();
- m_Endpoint->RegisterPublication("discovery::RegisterComponent");
m_Endpoint->RegisterTopicHandler("discovery::RegisterComponent",
boost::bind(&DiscoveryComponent::RegisterComponentMessageHandler, this, _2, _3));
- m_Endpoint->RegisterPublication("discovery::NewComponent");
m_Endpoint->RegisterTopicHandler("discovery::NewComponent",
boost::bind(&DiscoveryComponent::NewComponentMessageHandler, this, _3));
return;
}
- /* accept discovery::RegisterComponent messages from any endpoint */
- endpoint->RegisterPublication("discovery::RegisterComponent");
-
- /* accept discovery::Welcome messages from any endpoint */
- endpoint->RegisterPublication("discovery::Welcome");
-
String identity = endpoint->GetIdentity();
if (identity == EndpointManager::GetInstance()->GetIdentity()) {
// register published/subscribed topics for this endpoint
ComponentDiscoveryInfo::Ptr info = ic->second;
- BOOST_FOREACH(String publication, info->Publications) {
- endpoint->RegisterPublication(publication);
- }
-
BOOST_FOREACH(String subscription, info->Subscriptions) {
endpoint->RegisterSubscription(subscription);
}
for (i = endpoint->BeginSubscriptions(); i != endpoint->EndSubscriptions(); i++)
info->Subscriptions.insert(*i);
-
- for (i = endpoint->BeginPublications(); i != endpoint->EndPublications(); i++)
- info->Publications.insert(*i);
}
/**
params.SetSubscriptions(subscriptions);
- Dictionary::Ptr publications = boost::make_shared<Dictionary>();
- BOOST_FOREACH(String publication, info->Publications) {
- publications->Add(publication);
- }
-
- params.SetPublications(publications);
-
if (recipient)
EndpointManager::GetInstance()->SendUnicastMessage(m_Endpoint, recipient, request);
else
EndpointManager::GetInstance()->SendMulticastMessage(m_Endpoint, request);
}
-bool DiscoveryComponent::HasMessagePermission(const Dictionary::Ptr& roles, const String& messageType, const String& message)
-{
- if (!roles)
- return false;
-
- Value roleName;
- BOOST_FOREACH(tie(tuples::ignore, roleName), roles) {
- DynamicObject::Ptr role = DynamicObject::GetObject("Role", roleName);
- Dictionary::Ptr permissions = role->Get(messageType);
- if (!permissions)
- continue;
-
- Value permission;
- BOOST_FOREACH(tie(tuples::ignore, permission), permissions) {
- if (Utility::Match(permission, message))
- return true;
- }
- }
-
- return false;
-}
-
/**
* Processes a discovery message by registering the component in the
* discovery component registry.
Endpoint::Ptr endpoint = EndpointManager::GetInstance()->GetEndpointByIdentity(identity);
- Dictionary::Ptr publications;
- if (message.GetPublications(&publications)) {
- Value publication;
- BOOST_FOREACH(tie(tuples::ignore, publication), publications) {
- if (trusted || HasMessagePermission(roles, "publications", publication)) {
- info->Publications.insert(publication);
- if (endpoint)
- endpoint->RegisterPublication(publication);
- }
- }
- }
-
Dictionary::Ptr subscriptions;
if (message.GetSubscriptions(&subscriptions)) {
Value subscription;
BOOST_FOREACH(tie(tuples::ignore, subscription), subscriptions) {
- if (trusted || HasMessagePermission(roles, "subscriptions", subscription)) {
- info->Subscriptions.insert(subscription);
- if (endpoint)
- endpoint->RegisterSubscription(subscription);
- }
+ info->Subscriptions.insert(subscription);
+ if (endpoint)
+ endpoint->RegisterSubscription(subscription);
}
}
/* update LastSeen if we're still connected to this endpoint */
info->LastSeen = now;
} else {
- /* TODO: figure out whether we actually want to connect to this component */
/* try and reconnect to this component */
try {
if (!info->Node.IsEmpty() && !info->Service.IsEmpty())
void FinishDiscoverySetup(const Endpoint::Ptr& endpoint);
- bool HasMessagePermission(const Dictionary::Ptr& roles, const String& messageType, const String& message);
-
static const int RegistrationTTL = 300;
};
Set("subscriptions", value);
}
-bool DiscoveryMessage::GetPublications(Dictionary::Ptr *value) const
-{
- return Get("publications", value);
-}
-
-void DiscoveryMessage::SetPublications(const Dictionary::Ptr& value)
-{
- Set("publications", value);
-}
bool GetSubscriptions(Dictionary::Ptr *value) const;
void SetSubscriptions(const Dictionary::Ptr& value);
-
- bool GetPublications(Dictionary::Ptr *value) const;
- void SetPublications(const Dictionary::Ptr& value);
};
}
AX_BOOST_BASE
AX_BOOST_SIGNALS
AX_BOOST_THREAD
+AX_BOOST_SYSTEM
AX_BOOST_UNIT_TEST_FRAMEWORK
AX_CHECK_OPENSSL([], [AC_MSG_ERROR([You need the OpenSSL headers and libraries in order to build this application])])
AC_CHECK_LIB(ssl, SSL_new)
local object Endpoint "icinga-c1" {
node = "192.168.5.46",
service = 7777,
-
- roles = { "all" }
}
-local object Role "all" {
- publications = { "*" },
- subscriptions = { "*" }
-}
local object endpoint "icinga-c1" {
node = "192.168.5.46",
service = 7777,
-
- roles = { "all" }
}
-local object role "all" {
- publications = { "*" },
- subscriptions = { "*" }
-}
ca = "ca.crt",
node = "192.168.2.235",
- service = 7777
+ service = 7777,
+
+ macros = {
+ plugindir = "/usr/local/icinga/libexec"
+ }
}
local object Component "discovery" {
}
-local object Endpoint "icinga-c2" {
- roles = { "all" }
-}
-
-local object Endpoint "icinga-c3" {
- roles = { "all" }
-}
-
-local object Endpoint "icinga-c4" {
- roles = { "all" }
-}
-
-local object Role "all" {
- publications = { "*" },
- subscriptions = { "*" }
-}
-
object Host "localhost" {
}
abstract object Service "nagios-service" {
methods = {
check = "native::NagiosCheck"
- },
-
- macros = {
- plugindir = "/usr/local/icinga/libexec"
}
}
broker = 1
}
-local object Endpoint "icinga-c2" {
- roles = { "demo" }
-}
-
-local object Endpoint "icinga-c3" {
- roles = { "demo" }
-}
-
-local object Role "broker" {
- publications = { "discovery::NewComponent" }
-}
-
-local object Role "demo" {
- publications = { "demo::*" },
- subscriptions = { "demo::*" }
-}
local object endpoint "icinga-c3" {
node = "192.168.5.46",
service = 9999,
-
- roles = { "all" }
-}
-
-local object role "all" {
- publications = { "*" },
- subscriptions = { "*" }
}
# --------------------------------------------
local object endpoint "icinga-c2" {
node = "192.168.2.235",
service = 7777,
-
- roles = { "all" }
}
-local object role "all" {
- publications = { "*" },
- subscriptions = { "*" }
-}
return (m_Subscriptions.find(topic) != m_Subscriptions.end());
}
-/**
- * Registers a topic publication for this endpoint.
- *
- * @param topic The name of the topic.
- */
-void Endpoint::RegisterPublication(String topic)
-{
- m_Publications.insert(topic);
-}
-
-/**
- * Removes a topic publication from this endpoint.
- *
- * @param topic The name of the topic.
- */
-void Endpoint::UnregisterPublication(String topic)
-{
- m_Publications.erase(topic);
-}
-
-/**
- * Checks whether the endpoint has a publication for the specified topic.
- *
- * @param topic The name of the topic.
- * @returns true if the endpoint is publishing this topic, false otherwise.
- */
-bool Endpoint::HasPublication(String topic) const
-{
- return (m_Publications.find(topic) != m_Publications.end());
-}
-
/**
* Removes all subscriptions for the endpoint.
*/
m_Subscriptions.clear();
}
-/**
- * Removes all publications for the endpoint.
- */
-void Endpoint::ClearPublications(void)
-{
- m_Publications.clear();
-}
-
/**
* Returns the beginning of the subscriptions list.
*
return m_Subscriptions.end();
}
-/**
- * Returns the beginning of the publications list.
- *
- * @returns An iterator that points to the first publication.
- */
-Endpoint::ConstTopicIterator Endpoint::BeginPublications(void) const
-{
- return m_Publications.begin();
-}
-
-/**
- * Returns the end of the publications list.
- *
- * @returns An iterator that points past the last publication.
- */
-Endpoint::ConstTopicIterator Endpoint::EndPublications(void) const
-{
- return m_Publications.end();
-}
-
/**
* Sets whether a welcome message has been received from this endpoint.
*
void UnregisterSubscription(String topic);
bool HasSubscription(String topic) const;
- void RegisterPublication(String topic);
- void UnregisterPublication(String topic);
- bool HasPublication(String topic) const;
-
virtual bool IsLocal(void) const = 0;
virtual bool IsConnected(void) const = 0;
virtual void Stop(void) = 0;
void ClearSubscriptions(void);
- void ClearPublications(void);
ConstTopicIterator BeginSubscriptions(void) const;
ConstTopicIterator EndSubscriptions(void) const;
- ConstTopicIterator BeginPublications(void) const;
- ConstTopicIterator EndPublications(void) const;
-
boost::signal<void (const Endpoint::Ptr&)> OnSessionEstablished;
private:
set<String> m_Subscriptions; /**< The topics this endpoint is
subscribed to. */
- set<String> m_Publications; /**< The topics this endpoint is
- publishing. */
bool m_ReceivedWelcome; /**< Have we received a welcome message
from this endpoint? */
bool m_SentWelcome; /**< Have we sent a welcome message to this
if (!request.GetMethod(&method))
return;
- if (!HasPublication(method))
- return;
-
String id;
if (request.GetID(&id))
GetEndpointManager()->SendAnycastMessage(sender, request);
Logger::Write(LogWarning, "jsonrpc", "Lost connection to endpoint: identity=" + GetIdentity());
- // TODO: _only_ clear non-persistent publications/subscriptions
- // unregister ourselves if no persistent publications/subscriptions are left (use a timer for that, once we have a TTL property for the topics)
+ // TODO: _only_ clear non-persistent subscriptions
+ // unregister ourselves if no persistent subscriptions are left (use a timer for that, once we have a TTL property for the topics)
ClearSubscriptions();
- ClearPublications();
// remove the endpoint if there are no more subscriptions */
if (BeginSubscriptions() == EndSubscriptions()) {