m_DiscoveryEndpoint->RegisterMethodHandler("discovery::Welcome",
bind_weak(&DiscoveryComponent::WelcomeMessageHandler, shared_from_this()));
- GetEndpointManager()->ForeachEndpoint(bind(&DiscoveryComponent::NewEndpointHandler, this, _1));
+ GetEndpointManager()->ForEachEndpoint(bind(&DiscoveryComponent::NewEndpointHandler, this, _1));
GetEndpointManager()->OnNewEndpoint += bind_weak(&DiscoveryComponent::NewEndpointHandler, shared_from_this());
GetEndpointManager()->RegisterEndpoint(m_DiscoveryEndpoint);
- m_DiscoveryConnectTimer = make_shared<Timer>();
- m_DiscoveryConnectTimer->SetInterval(30);
- m_DiscoveryConnectTimer->OnTimerExpired += bind_weak(&DiscoveryComponent::ReconnectTimerHandler, shared_from_this());
- m_DiscoveryConnectTimer->Start();
+ m_DiscoveryTimer = make_shared<Timer>();
+ m_DiscoveryTimer->SetInterval(30);
+ m_DiscoveryTimer->OnTimerExpired += bind_weak(&DiscoveryComponent::DiscoveryTimerHandler, shared_from_this());
+ m_DiscoveryTimer->Start();
}
void DiscoveryComponent::Stop(void)
neea.Endpoint->RegisterMethodSource("discovery::RegisterComponent");
}
+ /* accept discovery::Welcome messages from any endpoint */
neea.Endpoint->RegisterMethodSource("discovery::Welcome");
- /* TODO: implement message broker authorisation */
- neea.Endpoint->RegisterMethodSource("discovery::NewComponent");
-
- /* TODO: register handler to unregister this endpoint when it's closed */
-
return 0;
}
int DiscoveryComponent::DiscoveryEndpointHandler(const NewEndpointEventArgs& neea, ComponentDiscoveryInfo::Ptr info) const
{
- neea.Endpoint->ForeachMethodSink(bind(&DiscoveryComponent::DiscoverySinkHandler, this, _1, info));
- neea.Endpoint->ForeachMethodSource(bind(&DiscoveryComponent::DiscoverySourceHandler, this, _1, info));
+ neea.Endpoint->ForEachMethodSink(bind(&DiscoveryComponent::DiscoverySinkHandler, this, _1, info));
+ neea.Endpoint->ForEachMethodSource(bind(&DiscoveryComponent::DiscoverySourceHandler, this, _1, info));
return 0;
}
if (component == GetEndpointManager()->GetIdentity()) {
/* Build fake discovery info for ourselves */
*info = make_shared<ComponentDiscoveryInfo>();
- GetEndpointManager()->ForeachEndpoint(bind(&DiscoveryComponent::DiscoveryEndpointHandler, this, _1, *info));
+ GetEndpointManager()->ForEachEndpoint(bind(&DiscoveryComponent::DiscoveryEndpointHandler, this, _1, *info));
+ (*info)->LastSeen = 0;
(*info)->Node = GetIcingaApplication()->GetNode();
(*info)->Service = GetIcingaApplication()->GetService();
return 0;
}
- GetEndpointManager()->ForeachEndpoint(bind(&DiscoveryComponent::CheckExistingEndpoint, this, endpoint, _1));
+ GetEndpointManager()->ForEachEndpoint(bind(&DiscoveryComponent::CheckExistingEndpoint, this, endpoint, _1));
}
+ ConfigCollection::Ptr brokerCollection = GetApplication()->GetConfigHive()->GetCollection("broker");
+ if (brokerCollection->GetObject(identity)) {
+ /* accept discovery::NewComponent messages from brokers */
+ endpoint->RegisterMethodSource("discovery::NewComponent");
+ }
+
+
// we assume the other component _always_ wants
// discovery::RegisterComponent messages from us
endpoint->RegisterMethodSink("discovery::RegisterComponent");
void DiscoveryComponent::ProcessDiscoveryMessage(string identity, DiscoveryMessage message)
{
+ /* ignore discovery messages that are about ourselves */
+ if (identity == GetEndpointManager()->GetIdentity())
+ return;
+
ComponentDiscoveryInfo::Ptr info = make_shared<ComponentDiscoveryInfo>();
+ time(&(info->LastSeen));
+
message.GetNode(&info->Node);
message.GetService(&info->Service);
if (message.GetProvides(&provides)) {
DictionaryIterator i;
for (i = provides.GetDictionary()->Begin(); i != provides.GetDictionary()->End(); i++) {
+ if (IsBroker()) {
+ /* TODO: Add authorisation checks here */
+ }
info->PublishedMethods.insert(i->second);
}
}
if (message.GetSubscribes(&subscribes)) {
DictionaryIterator i;
for (i = subscribes.GetDictionary()->Begin(); i != subscribes.GetDictionary()->End(); i++) {
+ if (IsBroker()) {
+ /* TODO: Add authorisation checks here */
+ }
info->SubscribedMethods.insert(i->second);
}
}
int DiscoveryComponent::RegisterComponentMessageHandler(const NewRequestEventArgs& nrea)
{
+ /* ignore discovery::RegisterComponent messages when we're not a broker */
+ if (!IsBroker())
+ return 0;
+
DiscoveryMessage message;
nrea.Request.GetParams(&message);
ProcessDiscoveryMessage(nrea.Sender->GetIdentity(), message);
+
return 0;
}
-int DiscoveryComponent::ReconnectTimerHandler(const TimerEventArgs& tea)
+int DiscoveryComponent::BrokerConfigHandler(const EventArgs& ea)
{
+ ConfigObject::Ptr object = static_pointer_cast<ConfigObject>(ea.Source);
+
EndpointManager::Ptr endpointManager = GetEndpointManager();
+ /* Check if we're already connected to this broker. */
+ if (endpointManager->GetEndpointByIdentity(object->GetName()))
+ return 0;
+
+ string node;
+ if (!object->GetPropertyString("node", &node))
+ throw InvalidArgumentException("'node' property required for 'broker' config object.");
+
+ string service;
+ if (!object->GetPropertyString("service", &service))
+ throw InvalidArgumentException("'service' property required for 'broker' config object.");
+
+ /* reconnect to this broker */
+ endpointManager->AddConnection(node, service);
+
+ return 0;
+}
+
+int DiscoveryComponent::DiscoveryTimerHandler(const TimerEventArgs& tea)
+{
+ EndpointManager::Ptr endpointManager = GetEndpointManager();
+
+ time_t now;
+ time(&now);
+
+ ConfigCollection::Ptr brokerCollection = GetApplication()->GetConfigHive()->GetCollection("broker");
+ brokerCollection->ForEachObject(bind(&DiscoveryComponent::BrokerConfigHandler, this, _1));
+
map<string, ComponentDiscoveryInfo::Ptr>::iterator i;
- for (i = m_Components.begin(); i != m_Components.end(); i++) {
- Endpoint::Ptr endpoint = endpointManager->GetEndpointByIdentity(i->first);
- if (endpoint)
+ for (i = m_Components.begin(); i != m_Components.end(); ) {
+ string identity = i->first;
+ ComponentDiscoveryInfo::Ptr info = i->second;
+
+ if (info->LastSeen < now - DiscoveryComponent::RegistrationTTL) {
+ /* unregister this component if its registration has expired */
+ i = m_Components.erase(i);
continue;
+ }
- ComponentDiscoveryInfo::Ptr info = i->second;
- endpointManager->AddConnection(info->Node, info->Service);
+ if (IsBroker()) {
+ /* send discovery message to all connected components to
+ refresh their TTL for this component */
+ SendDiscoveryMessage("discovery::NewComponent", i->first, Endpoint::Ptr());
+ }
+
+ Endpoint::Ptr endpoint = endpointManager->GetEndpointByIdentity(identity);
+ if (endpoint) {
+ /* update LastSeen if we're still connected to this endpoint */
+ info->LastSeen = now;
+ } else {
+ /* try and reconnect to this component */
+ endpointManager->AddConnection(info->Node, info->Service);
+ }
+
+ i++;
}
return 0;