1 /******************************************************************************
3 * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
5 * This program is free software; you can redistribute it and/or *
6 * modify it under the terms of the GNU General Public License *
7 * as published by the Free Software Foundation; either version 2 *
8 * of the License, or (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the Free Software Foundation *
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
18 ******************************************************************************/
20 #include "i2-discovery.h"
22 using namespace icinga;
25 * Returns the name of this component.
29 string DiscoveryComponent::GetName(void) const
31 return "discoverycomponent";
35 * Starts the discovery component.
37 void DiscoveryComponent::Start(void)
39 m_DiscoveryEndpoint = make_shared<VirtualEndpoint>();
41 m_DiscoveryEndpoint->RegisterPublication("discovery::RegisterComponent");
42 m_DiscoveryEndpoint->RegisterTopicHandler("discovery::RegisterComponent",
43 bind_weak(&DiscoveryComponent::RegisterComponentMessageHandler, shared_from_this()));
45 m_DiscoveryEndpoint->RegisterPublication("discovery::NewComponent");
46 m_DiscoveryEndpoint->RegisterTopicHandler("discovery::NewComponent",
47 bind_weak(&DiscoveryComponent::NewComponentMessageHandler, shared_from_this()));
49 m_DiscoveryEndpoint->RegisterTopicHandler("discovery::Welcome",
50 bind_weak(&DiscoveryComponent::WelcomeMessageHandler, shared_from_this()));
52 GetEndpointManager()->ForEachEndpoint(bind(&DiscoveryComponent::NewEndpointHandler, this, _1));
53 GetEndpointManager()->OnNewEndpoint += bind_weak(&DiscoveryComponent::NewEndpointHandler, shared_from_this());
55 GetEndpointManager()->RegisterEndpoint(m_DiscoveryEndpoint);
57 /* create the reconnect timer */
58 m_DiscoveryTimer = make_shared<Timer>();
59 m_DiscoveryTimer->SetInterval(30);
60 m_DiscoveryTimer->OnTimerExpired += bind_weak(&DiscoveryComponent::DiscoveryTimerHandler, shared_from_this());
61 m_DiscoveryTimer->Start();
63 /* call the timer as soon as possible */
64 m_DiscoveryTimer->Reschedule(0);
68 * Stops the discovery component.
70 void DiscoveryComponent::Stop(void)
72 EndpointManager::Ptr mgr = GetEndpointManager();
75 mgr->UnregisterEndpoint(m_DiscoveryEndpoint);
79 * Checks whether the specified endpoint is already connected
80 * and disconnects older endpoints.
82 * @param endpoint The endpoint that is to be checked.
83 * @param neea Event arguments for another endpoint.
86 int DiscoveryComponent::CheckExistingEndpoint(Endpoint::Ptr endpoint, const NewEndpointEventArgs& neea)
88 if (endpoint == neea.Endpoint)
91 if (!neea.Endpoint->IsConnected())
94 if (endpoint->GetIdentity() == neea.Endpoint->GetIdentity()) {
95 Application::Log("Detected duplicate identity:" + endpoint->GetIdentity() + " - Disconnecting old endpoint.");
97 neea.Endpoint->Stop();
98 GetEndpointManager()->UnregisterEndpoint(neea.Endpoint);
105 * Registers handlers for new endpoints.
107 * @param neea Event arguments for the new endpoint.
110 int DiscoveryComponent::NewEndpointHandler(const NewEndpointEventArgs& neea)
112 neea.Endpoint->OnIdentityChanged += bind_weak(&DiscoveryComponent::NewIdentityHandler, shared_from_this());
114 /* accept discovery::RegisterComponent messages from any endpoint */
115 neea.Endpoint->RegisterPublication("discovery::RegisterComponent");
117 /* accept discovery::Welcome messages from any endpoint */
118 neea.Endpoint->RegisterPublication("discovery::Welcome");
124 * Registers message Subscriptions/sources in the specified component information object.
126 * @param neea Event arguments for the endpoint.
127 * @param info Component information object.
130 int DiscoveryComponent::DiscoveryEndpointHandler(const NewEndpointEventArgs& neea, ComponentDiscoveryInfo::Ptr info) const
132 Endpoint::ConstTopicIterator i;
134 for (i = neea.Endpoint->BeginSubscriptions(); i != neea.Endpoint->EndSubscriptions(); i++) {
135 info->Subscriptions.insert(*i);
138 for (i = neea.Endpoint->BeginPublications(); i != neea.Endpoint->EndPublications(); i++) {
139 info->Publications.insert(*i);
146 * Retrieves the component information object for the specified component.
148 * @param component The identity of the component.
149 * @param info Pointer to the information object.
150 * @returns true if the info object was successfully retrieved, false otherwise.
152 bool DiscoveryComponent::GetComponentDiscoveryInfo(string component, ComponentDiscoveryInfo::Ptr *info) const
154 if (component == GetEndpointManager()->GetIdentity()) {
155 /* Build fake discovery info for ourselves */
156 *info = make_shared<ComponentDiscoveryInfo>();
157 GetEndpointManager()->ForEachEndpoint(bind(&DiscoveryComponent::DiscoveryEndpointHandler, this, _1, *info));
159 (*info)->LastSeen = 0;
160 (*info)->Node = GetIcingaApplication()->GetNode();
161 (*info)->Service = GetIcingaApplication()->GetService();
166 map<string, ComponentDiscoveryInfo::Ptr>::const_iterator i;
168 i = m_Components.find(component);
170 if (i == m_Components.end())
178 * Deals with a new endpoint whose identity has just become known.
180 * @param ea Event arguments for the component.
183 int DiscoveryComponent::NewIdentityHandler(const EventArgs& ea)
185 Endpoint::Ptr endpoint = static_pointer_cast<Endpoint>(ea.Source);
186 string identity = endpoint->GetIdentity();
188 if (identity == GetEndpointManager()->GetIdentity()) {
189 Application::Log("Detected loop-back connection - Disconnecting endpoint.");
192 GetEndpointManager()->UnregisterEndpoint(endpoint);
197 GetEndpointManager()->ForEachEndpoint(bind(&DiscoveryComponent::CheckExistingEndpoint, this, endpoint, _1));
199 // we assume the other component _always_ wants
200 // discovery::RegisterComponent messages from us
201 endpoint->RegisterSubscription("discovery::RegisterComponent");
203 // send a discovery::RegisterComponent message, if the
204 // other component is a broker this makes sure
205 // the broker knows about our message types
206 SendDiscoveryMessage("discovery::RegisterComponent", GetEndpointManager()->GetIdentity(), endpoint);
208 map<string, ComponentDiscoveryInfo::Ptr>::iterator ic;
210 // we assume the other component _always_ wants
211 // discovery::NewComponent messages from us
212 endpoint->RegisterSubscription("discovery::NewComponent");
214 // send discovery::NewComponent message for ourselves
215 SendDiscoveryMessage("discovery::NewComponent", GetEndpointManager()->GetIdentity(), endpoint);
217 // send discovery::NewComponent messages for all components
219 for (ic = m_Components.begin(); ic != m_Components.end(); ic++) {
220 SendDiscoveryMessage("discovery::NewComponent", ic->first, endpoint);
223 // check if we already know the other component
224 ic = m_Components.find(endpoint->GetIdentity());
226 if (ic == m_Components.end()) {
227 // we don't know the other component yet, so
228 // wait until we get a discovery::NewComponent message
233 // register published/subscribed topics for this endpoint
234 ComponentDiscoveryInfo::Ptr info = ic->second;
235 set<string>::iterator it;
236 for (it = info->Publications.begin(); it != info->Publications.end(); it++)
237 endpoint->RegisterPublication(*it);
239 for (it = info->Subscriptions.begin(); it != info->Subscriptions.end(); it++)
240 endpoint->RegisterSubscription(*it);
242 FinishDiscoverySetup(endpoint);
248 * Processes discovery::Welcome messages.
250 * @param nrea Event arguments for the request.
253 int DiscoveryComponent::WelcomeMessageHandler(const NewRequestEventArgs& nrea)
255 Endpoint::Ptr endpoint = nrea.Sender;
257 if (endpoint->HasReceivedWelcome())
260 endpoint->SetReceivedWelcome(true);
262 if (endpoint->HasSentWelcome()) {
264 ea.Source = endpoint;
265 endpoint->OnSessionEstablished(ea);
272 * Finishes the welcome handshake for a new component
273 * by registering message Subscriptions/sources for the component
274 * and sending a welcome message if necessary.
276 * @param endpoint The endpoint to set up.
278 void DiscoveryComponent::FinishDiscoverySetup(Endpoint::Ptr endpoint)
280 if (endpoint->HasSentWelcome())
283 // we assume the other component _always_ wants
284 // discovery::Welcome messages from us
285 endpoint->RegisterSubscription("discovery::Welcome");
286 RequestMessage request;
287 request.SetMethod("discovery::Welcome");
288 GetEndpointManager()->SendUnicastMessage(m_DiscoveryEndpoint, endpoint, request);
290 endpoint->SetSentWelcome(true);
292 if (endpoint->HasReceivedWelcome()) {
294 ea.Source = endpoint;
295 endpoint->OnSessionEstablished(ea);
300 * Sends a discovery message for the specified identity using the
301 * specified message type.
303 * @param method The method to use for the message ("discovery::NewComponent" or "discovery::RegisterComponent").
304 * @param identity The identity of the component for which a message should be sent.
305 * @param recipient The recipient of the message. A multicast message is sent if this parameter is empty.
307 void DiscoveryComponent::SendDiscoveryMessage(string method, string identity, Endpoint::Ptr recipient)
309 RequestMessage request;
310 request.SetMethod(method);
312 DiscoveryMessage params;
313 request.SetParams(params);
315 params.SetIdentity(identity);
317 ComponentDiscoveryInfo::Ptr info;
319 if (!GetComponentDiscoveryInfo(identity, &info))
322 if (!info->Node.empty() && !info->Service.empty()) {
323 params.SetNode(info->Node);
324 params.SetService(info->Service);
327 set<string>::iterator i;
328 MessagePart subscriptions;
329 for (i = info->Subscriptions.begin(); i != info->Subscriptions.end(); i++)
330 subscriptions.AddUnnamedProperty(*i);
332 params.SetSubscriptions(subscriptions);
334 MessagePart publications;
335 for (i = info->Publications.begin(); i != info->Publications.end(); i++)
336 publications.AddUnnamedProperty(*i);
338 params.SetPublications(publications);
341 GetEndpointManager()->SendUnicastMessage(m_DiscoveryEndpoint, recipient, request);
343 GetEndpointManager()->SendMulticastMessage(m_DiscoveryEndpoint, request);
346 bool DiscoveryComponent::HasMessagePermission(Dictionary::Ptr roles, string messageType, string message)
351 ConfigObject::TMap::Range range = ConfigObject::GetObjects("role");
353 for (ConfigObject::TMap::Iterator ip = range.first; ip != range.second; ip++) {
354 ConfigObject::Ptr role = ip->second;
357 if (!role->GetProperty(messageType, &object))
360 Dictionary::Ptr permissions = dynamic_pointer_cast<Dictionary>(object);
362 throw runtime_error("Object is not a dictionary.");
364 for (DictionaryIterator is = permissions->Begin(); is != permissions->End(); is++) {
365 if (Utility::Match(is->second.GetString(), message))
374 * Processes a discovery message by registering the component in the
375 * discovery component registry.
377 * @param identity The authorative identity of the component.
378 * @param message The discovery message.
379 * @param trusted Whether the message comes from a trusted source (i.e. a broker).
381 void DiscoveryComponent::ProcessDiscoveryMessage(string identity, DiscoveryMessage message, bool trusted)
383 /* ignore discovery messages that are about ourselves */
384 if (identity == GetEndpointManager()->GetIdentity())
387 ComponentDiscoveryInfo::Ptr info = make_shared<ComponentDiscoveryInfo>();
389 time(&(info->LastSeen));
391 message.GetNode(&info->Node);
392 message.GetService(&info->Service);
394 ConfigObject::Ptr endpointConfig = ConfigObject::GetObject("endpoint", identity);
395 Dictionary::Ptr roles;
396 if (endpointConfig) {
398 if (endpointConfig->GetProperty("roles", &object)) {
399 roles = dynamic_pointer_cast<Dictionary>(object);
401 throw runtime_error("Object is not a dictionary.");
405 Endpoint::Ptr endpoint = GetEndpointManager()->GetEndpointByIdentity(identity);
407 MessagePart publications;
408 if (message.GetPublications(&publications)) {
409 DictionaryIterator i;
410 for (i = publications.Begin(); i != publications.End(); i++) {
411 if (trusted || HasMessagePermission(roles, "publications", i->second)) {
412 info->Publications.insert(i->second);
414 endpoint->RegisterPublication(i->second);
419 MessagePart subscriptions;
420 if (message.GetSubscriptions(&subscriptions)) {
421 DictionaryIterator i;
422 for (i = subscriptions.Begin(); i != subscriptions.End(); i++) {
423 if (trusted || HasMessagePermission(roles, "subscriptions", i->second)) {
424 info->Subscriptions.insert(i->second);
426 endpoint->RegisterSubscription(i->second);
431 map<string, ComponentDiscoveryInfo::Ptr>::iterator i;
433 i = m_Components.find(identity);
435 if (i != m_Components.end())
436 m_Components.erase(i);
438 m_Components[identity] = info;
440 SendDiscoveryMessage("discovery::NewComponent", identity, Endpoint::Ptr());
442 /* don't send a welcome message for discovery::RegisterComponent messages */
443 if (endpoint && trusted)
444 FinishDiscoverySetup(endpoint);
448 * Processes "discovery::NewComponent" messages.
450 * @param nrea Event arguments for the request.
453 int DiscoveryComponent::NewComponentMessageHandler(const NewRequestEventArgs& nrea)
455 DiscoveryMessage message;
456 nrea.Request.GetParams(&message);
459 if (!message.GetIdentity(&identity))
462 ProcessDiscoveryMessage(identity, message, true);
467 * Processes "discovery::RegisterComponent" messages.
469 * @param nrea Event arguments for the request.
472 int DiscoveryComponent::RegisterComponentMessageHandler(const NewRequestEventArgs& nrea)
474 DiscoveryMessage message;
475 nrea.Request.GetParams(&message);
476 ProcessDiscoveryMessage(nrea.Sender->GetIdentity(), message, false);
482 * Checks whether we have to reconnect to other components and removes stale
483 * components from the registry.
485 * @param tea Event arguments for the timer.
488 int DiscoveryComponent::DiscoveryTimerHandler(const TimerEventArgs& tea)
490 EndpointManager::Ptr endpointManager = GetEndpointManager();
495 /* check whether we have to reconnect to one of our upstream endpoints */
496 ConfigObject::TMap::Range range = ConfigObject::GetObjects("endpoint");
498 for (ConfigObject::TMap::Iterator it = range.first; it != range.second; it++) {
499 ConfigObject::Ptr object = it->second;
501 /* Check if we're already connected to this endpoint. */
502 if (endpointManager->GetEndpointByIdentity(object->GetName()))
505 string node, service;
506 if (object->GetProperty("node", &node) && object->GetProperty("service", &service)) {
507 /* reconnect to this endpoint */
508 endpointManager->AddConnection(node, service);
512 map<string, ComponentDiscoveryInfo::Ptr>::iterator curr, i;
513 for (i = m_Components.begin(); i != m_Components.end(); ) {
514 string identity = i->first;
515 ComponentDiscoveryInfo::Ptr info = i->second;
520 if (info->LastSeen < now - DiscoveryComponent::RegistrationTTL) {
521 /* unregister this component if its registration has expired */
522 m_Components.erase(curr);
526 /* send discovery message to all connected components to
527 refresh their TTL for this component */
528 SendDiscoveryMessage("discovery::NewComponent", identity, Endpoint::Ptr());
530 Endpoint::Ptr endpoint = endpointManager->GetEndpointByIdentity(identity);
531 if (endpoint && endpoint->IsConnected()) {
532 /* update LastSeen if we're still connected to this endpoint */
533 info->LastSeen = now;
535 /* TODO: figure out whether we actually want to connect to this component */
536 /* try and reconnect to this component */
537 endpointManager->AddConnection(info->Node, info->Service);
544 EXPORT_COMPONENT(discovery, DiscoveryComponent);