]> granicus.if.org Git - icinga2/blob - components/discovery/discoverycomponent.cpp
b9b50396b0478840f34a38346b4b7f42fdd88dca
[icinga2] / components / discovery / discoverycomponent.cpp
1 /******************************************************************************
2  * Icinga 2                                                                   *
3  * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/)        *
4  *                                                                            *
5  * This program is free software; you can redistribute it and/or              *
6  * modify it under the terms of the GNU General Public License                *
7  * as published by the Free Software Foundation; either version 2             *
8  * of the License, or (at your option) any later version.                     *
9  *                                                                            *
10  * This program is distributed in the hope that it will be useful,            *
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
13  * GNU General Public License for more details.                               *
14  *                                                                            *
15  * You should have received a copy of the GNU General Public License          *
16  * along with this program; if not, write to the Free Software Foundation     *
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
18  ******************************************************************************/
19
20 #include "i2-discovery.h"
21
22 using namespace icinga;
23
24 /**
25  * Returns the name of this component.
26  *
27  * @returns The name.
28  */
29 string DiscoveryComponent::GetName(void) const
30 {
31         return "discoverycomponent";
32 }
33
34 /**
35  * Starts the discovery component.
36  */
37 void DiscoveryComponent::Start(void)
38 {
39         m_DiscoveryEndpoint = make_shared<VirtualEndpoint>();
40
41         m_DiscoveryEndpoint->RegisterPublication("discovery::RegisterComponent");
42         m_DiscoveryEndpoint->RegisterTopicHandler("discovery::RegisterComponent",
43                 bind_weak(&DiscoveryComponent::RegisterComponentMessageHandler, shared_from_this()));
44
45         m_DiscoveryEndpoint->RegisterPublication("discovery::NewComponent");
46         m_DiscoveryEndpoint->RegisterTopicHandler("discovery::NewComponent",
47                 bind_weak(&DiscoveryComponent::NewComponentMessageHandler, shared_from_this()));
48
49         m_DiscoveryEndpoint->RegisterTopicHandler("discovery::Welcome",
50                 bind_weak(&DiscoveryComponent::WelcomeMessageHandler, shared_from_this()));
51
52         GetEndpointManager()->ForEachEndpoint(bind(&DiscoveryComponent::NewEndpointHandler, this, _1));
53         GetEndpointManager()->OnNewEndpoint += bind_weak(&DiscoveryComponent::NewEndpointHandler, shared_from_this());
54
55         GetEndpointManager()->RegisterEndpoint(m_DiscoveryEndpoint);
56
57         /* create the reconnect timer */
58         m_DiscoveryTimer = make_shared<Timer>();
59         m_DiscoveryTimer->SetInterval(30);
60         m_DiscoveryTimer->OnTimerExpired += bind_weak(&DiscoveryComponent::DiscoveryTimerHandler, shared_from_this());
61         m_DiscoveryTimer->Start();
62
63         /* call the timer as soon as possible */
64         m_DiscoveryTimer->Reschedule(0);
65 }
66
67 /**
68  * Stops the discovery component.
69  */
70 void DiscoveryComponent::Stop(void)
71 {
72         EndpointManager::Ptr mgr = GetEndpointManager();
73
74         if (mgr)
75                 mgr->UnregisterEndpoint(m_DiscoveryEndpoint);
76 }
77
78 /**
79  * Checks whether the specified endpoint is already connected
80  * and disconnects older endpoints.
81  *
82  * @param endpoint The endpoint that is to be checked.
83  * @param neea Event arguments for another endpoint.
84  * @returns 0
85  */
86 int DiscoveryComponent::CheckExistingEndpoint(Endpoint::Ptr endpoint, const NewEndpointEventArgs& neea)
87 {
88         if (endpoint == neea.Endpoint)
89                 return 0;
90
91         if (!neea.Endpoint->IsConnected())
92                 return 0;
93
94         if (endpoint->GetIdentity() == neea.Endpoint->GetIdentity()) {
95                 Application::Log("Detected duplicate identity:" + endpoint->GetIdentity() + " - Disconnecting old endpoint.");
96
97                 neea.Endpoint->Stop();
98                 GetEndpointManager()->UnregisterEndpoint(neea.Endpoint);
99         }
100
101         return 0;
102 }
103
104 /**
105  * Registers handlers for new endpoints.
106  *
107  * @param neea Event arguments for the new endpoint.
108  * @returns 0
109  */
110 int DiscoveryComponent::NewEndpointHandler(const NewEndpointEventArgs& neea)
111 {
112         neea.Endpoint->OnIdentityChanged += bind_weak(&DiscoveryComponent::NewIdentityHandler, shared_from_this());
113
114         /* accept discovery::RegisterComponent messages from any endpoint */
115         neea.Endpoint->RegisterPublication("discovery::RegisterComponent");
116
117         /* accept discovery::Welcome messages from any endpoint */
118         neea.Endpoint->RegisterPublication("discovery::Welcome");
119
120         return 0;
121 }
122
123 /**
124  * Registers message Subscriptions/sources in the specified component information object.
125  *
126  * @param neea Event arguments for the endpoint.
127  * @param info Component information object.
128  * @return 0
129  */
130 int DiscoveryComponent::DiscoveryEndpointHandler(const NewEndpointEventArgs& neea, ComponentDiscoveryInfo::Ptr info) const
131 {
132         Endpoint::ConstTopicIterator i;
133
134         for (i = neea.Endpoint->BeginSubscriptions(); i != neea.Endpoint->EndSubscriptions(); i++) {
135                 info->Subscriptions.insert(*i);
136         }
137
138         for (i = neea.Endpoint->BeginPublications(); i != neea.Endpoint->EndPublications(); i++) {
139                 info->Publications.insert(*i);
140         }
141
142         return 0;
143 }
144
145 /**
146  * Retrieves the component information object for the specified component.
147  *
148  * @param component The identity of the component.
149  * @param info Pointer to the information object.
150  * @returns true if the info object was successfully retrieved, false otherwise.
151  */
152 bool DiscoveryComponent::GetComponentDiscoveryInfo(string component, ComponentDiscoveryInfo::Ptr *info) const
153 {
154         if (component == GetEndpointManager()->GetIdentity()) {
155                 /* Build fake discovery info for ourselves */
156                 *info = make_shared<ComponentDiscoveryInfo>();
157                 GetEndpointManager()->ForEachEndpoint(bind(&DiscoveryComponent::DiscoveryEndpointHandler, this, _1, *info));
158                 
159                 (*info)->LastSeen = 0;
160                 (*info)->Node = GetIcingaApplication()->GetNode();
161                 (*info)->Service = GetIcingaApplication()->GetService();
162
163                 return true;
164         }
165
166         map<string, ComponentDiscoveryInfo::Ptr>::const_iterator i;
167
168         i = m_Components.find(component);
169
170         if (i == m_Components.end())
171                 return false;
172
173         *info = i->second;
174         return true;
175 }
176
177 /**
178  * Deals with a new endpoint whose identity has just become known.
179  *
180  * @param ea Event arguments for the component.
181  * @returns 0
182  */
183 int DiscoveryComponent::NewIdentityHandler(const EventArgs& ea)
184 {
185         Endpoint::Ptr endpoint = static_pointer_cast<Endpoint>(ea.Source);
186         string identity = endpoint->GetIdentity();
187
188         if (identity == GetEndpointManager()->GetIdentity()) {
189                 Application::Log("Detected loop-back connection - Disconnecting endpoint.");
190
191                 endpoint->Stop();
192                 GetEndpointManager()->UnregisterEndpoint(endpoint);
193
194                 return 0;
195         }
196
197         GetEndpointManager()->ForEachEndpoint(bind(&DiscoveryComponent::CheckExistingEndpoint, this, endpoint, _1));
198
199         // we assume the other component _always_ wants
200         // discovery::RegisterComponent messages from us
201         endpoint->RegisterSubscription("discovery::RegisterComponent");
202
203         // send a discovery::RegisterComponent message, if the
204         // other component is a broker this makes sure
205         // the broker knows about our message types
206         SendDiscoveryMessage("discovery::RegisterComponent", GetEndpointManager()->GetIdentity(), endpoint);
207
208         map<string, ComponentDiscoveryInfo::Ptr>::iterator ic;
209
210         // we assume the other component _always_ wants
211         // discovery::NewComponent messages from us
212         endpoint->RegisterSubscription("discovery::NewComponent");
213
214         // send discovery::NewComponent message for ourselves
215         SendDiscoveryMessage("discovery::NewComponent", GetEndpointManager()->GetIdentity(), endpoint);
216
217         // send discovery::NewComponent messages for all components
218         // we know about
219         for (ic = m_Components.begin(); ic != m_Components.end(); ic++) {
220                 SendDiscoveryMessage("discovery::NewComponent", ic->first, endpoint);
221         }
222
223         // check if we already know the other component
224         ic = m_Components.find(endpoint->GetIdentity());
225
226         if (ic == m_Components.end()) {
227                 // we don't know the other component yet, so
228                 // wait until we get a discovery::NewComponent message
229                 // from a broker
230                 return 0;
231         }
232
233         // register published/subscribed topics for this endpoint
234         ComponentDiscoveryInfo::Ptr info = ic->second;
235         set<string>::iterator it;
236         for (it = info->Publications.begin(); it != info->Publications.end(); it++)
237                 endpoint->RegisterPublication(*it);
238
239         for (it = info->Subscriptions.begin(); it != info->Subscriptions.end(); it++)
240                 endpoint->RegisterSubscription(*it);
241
242         FinishDiscoverySetup(endpoint);
243
244         return 0;
245 }
246
247 /**
248  * Processes discovery::Welcome messages.
249  *
250  * @param nrea Event arguments for the request.
251  * @returns 0
252  */
253 int DiscoveryComponent::WelcomeMessageHandler(const NewRequestEventArgs& nrea)
254 {
255         Endpoint::Ptr endpoint = nrea.Sender;
256
257         if (endpoint->HasReceivedWelcome())
258                 return 0;
259
260         endpoint->SetReceivedWelcome(true);
261
262         if (endpoint->HasSentWelcome()) {
263                 EventArgs ea;
264                 ea.Source = endpoint;
265                 endpoint->OnSessionEstablished(ea);
266         }
267
268         return 0;
269 }
270
271 /**
272  * Finishes the welcome handshake for a new component
273  * by registering message Subscriptions/sources for the component
274  * and sending a welcome message if necessary.
275  *
276  * @param endpoint The endpoint to set up.
277  */
278 void DiscoveryComponent::FinishDiscoverySetup(Endpoint::Ptr endpoint)
279 {
280         if (endpoint->HasSentWelcome())
281                 return;
282
283         // we assume the other component _always_ wants
284         // discovery::Welcome messages from us
285         endpoint->RegisterSubscription("discovery::Welcome");
286         RequestMessage request;
287         request.SetMethod("discovery::Welcome");
288         GetEndpointManager()->SendUnicastMessage(m_DiscoveryEndpoint, endpoint, request);
289
290         endpoint->SetSentWelcome(true);
291
292         if (endpoint->HasReceivedWelcome()) {
293                 EventArgs ea;
294                 ea.Source = endpoint;
295                 endpoint->OnSessionEstablished(ea);
296         }
297 }
298
299 /**
300  * Sends a discovery message for the specified identity using the
301  * specified message type.
302  *
303  * @param method The method to use for the message ("discovery::NewComponent" or "discovery::RegisterComponent").
304  * @param identity The identity of the component for which a message should be sent.
305  * @param recipient The recipient of the message. A multicast message is sent if this parameter is empty.
306  */
307 void DiscoveryComponent::SendDiscoveryMessage(string method, string identity, Endpoint::Ptr recipient)
308 {
309         RequestMessage request;
310         request.SetMethod(method);
311         
312         DiscoveryMessage params;
313         request.SetParams(params);
314
315         params.SetIdentity(identity);
316
317         ComponentDiscoveryInfo::Ptr info;
318
319         if (!GetComponentDiscoveryInfo(identity, &info))
320                 return;
321
322         if (!info->Node.empty() && !info->Service.empty()) {
323                 params.SetNode(info->Node);
324                 params.SetService(info->Service);
325         }
326
327         set<string>::iterator i;
328         MessagePart subscriptions;
329         for (i = info->Subscriptions.begin(); i != info->Subscriptions.end(); i++)
330                 subscriptions.AddUnnamedProperty(*i);
331
332         params.SetSubscriptions(subscriptions);
333
334         MessagePart publications;
335         for (i = info->Publications.begin(); i != info->Publications.end(); i++)
336                 publications.AddUnnamedProperty(*i);
337
338         params.SetPublications(publications);
339
340         if (recipient)
341                 GetEndpointManager()->SendUnicastMessage(m_DiscoveryEndpoint, recipient, request);
342         else
343                 GetEndpointManager()->SendMulticastMessage(m_DiscoveryEndpoint, request);
344 }
345
346 bool DiscoveryComponent::HasMessagePermission(Dictionary::Ptr roles, string messageType, string message)
347 {
348         if (!roles)
349                 return false;
350
351         ConfigObject::TMap::Range range = ConfigObject::GetObjects("role");
352
353         for (ConfigObject::TMap::Iterator ip = range.first; ip != range.second; ip++) {
354                 ConfigObject::Ptr role = ip->second;
355
356                 Object::Ptr object;
357                 if (!role->GetProperty(messageType, &object))
358                         continue;
359
360                 Dictionary::Ptr permissions = dynamic_pointer_cast<Dictionary>(object);
361                 if (!permissions)
362                         throw runtime_error("Object is not a dictionary.");
363
364                 for (DictionaryIterator is = permissions->Begin(); is != permissions->End(); is++) {
365                         if (Utility::Match(is->second.GetString(), message))
366                                 return true;
367                 }
368         }
369
370         return false;
371 }
372
373 /**
374  * Processes a discovery message by registering the component in the
375  * discovery component registry.
376  *
377  * @param identity The authorative identity of the component.
378  * @param message The discovery message.
379  * @param trusted Whether the message comes from a trusted source (i.e. a broker).
380  */
381 void DiscoveryComponent::ProcessDiscoveryMessage(string identity, DiscoveryMessage message, bool trusted)
382 {
383         /* ignore discovery messages that are about ourselves */
384         if (identity == GetEndpointManager()->GetIdentity())
385                 return;
386
387         ComponentDiscoveryInfo::Ptr info = make_shared<ComponentDiscoveryInfo>();
388
389         time(&(info->LastSeen));
390
391         message.GetNode(&info->Node);
392         message.GetService(&info->Service);
393
394         ConfigObject::Ptr endpointConfig = ConfigObject::GetObject("endpoint", identity);
395         Dictionary::Ptr roles;
396         if (endpointConfig) {
397                 Object::Ptr object;
398                 if (endpointConfig->GetProperty("roles", &object)) {
399                         roles = dynamic_pointer_cast<Dictionary>(object);
400                         if (!roles)
401                                 throw runtime_error("Object is not a dictionary.");
402                 }
403         }
404
405         Endpoint::Ptr endpoint = GetEndpointManager()->GetEndpointByIdentity(identity);
406
407         MessagePart publications;
408         if (message.GetPublications(&publications)) {
409                 DictionaryIterator i;
410                 for (i = publications.Begin(); i != publications.End(); i++) {
411                         if (trusted || HasMessagePermission(roles, "publications", i->second)) {
412                                 info->Publications.insert(i->second);
413                                 if (endpoint)
414                                         endpoint->RegisterPublication(i->second);
415                         }
416                 }
417         }
418
419         MessagePart subscriptions;
420         if (message.GetSubscriptions(&subscriptions)) {
421                 DictionaryIterator i;
422                 for (i = subscriptions.Begin(); i != subscriptions.End(); i++) {
423                         if (trusted || HasMessagePermission(roles, "subscriptions", i->second)) {
424                                 info->Subscriptions.insert(i->second);
425                                 if (endpoint)
426                                         endpoint->RegisterSubscription(i->second);
427                         }
428                 }
429         }
430
431         map<string, ComponentDiscoveryInfo::Ptr>::iterator i;
432
433         i = m_Components.find(identity);
434
435         if (i != m_Components.end())
436                 m_Components.erase(i);
437
438         m_Components[identity] = info;
439
440         SendDiscoveryMessage("discovery::NewComponent", identity, Endpoint::Ptr());
441
442         /* don't send a welcome message for discovery::RegisterComponent messages */
443         if (endpoint && trusted)
444                 FinishDiscoverySetup(endpoint);
445 }
446
447 /**
448  * Processes "discovery::NewComponent" messages.
449  *
450  * @param nrea Event arguments for the request.
451  * @returns 0
452  */
453 int DiscoveryComponent::NewComponentMessageHandler(const NewRequestEventArgs& nrea)
454 {
455         DiscoveryMessage message;
456         nrea.Request.GetParams(&message);
457
458         string identity;
459         if (!message.GetIdentity(&identity))
460                 return 0;
461
462         ProcessDiscoveryMessage(identity, message, true);
463         return 0;
464 }
465
466 /**
467  * Processes "discovery::RegisterComponent" messages.
468  *
469  * @param nrea Event arguments for the request.
470  * @returns 0
471  */
472 int DiscoveryComponent::RegisterComponentMessageHandler(const NewRequestEventArgs& nrea)
473 {
474         DiscoveryMessage message;
475         nrea.Request.GetParams(&message);
476         ProcessDiscoveryMessage(nrea.Sender->GetIdentity(), message, false);
477
478         return 0;
479 }
480
481 /**
482  * Checks whether we have to reconnect to other components and removes stale
483  * components from the registry.
484  *
485  * @param tea Event arguments for the timer.
486  * @returns 0
487  */
488 int DiscoveryComponent::DiscoveryTimerHandler(const TimerEventArgs& tea)
489 {
490         EndpointManager::Ptr endpointManager = GetEndpointManager();
491         
492         time_t now;
493         time(&now);
494
495         /* check whether we have to reconnect to one of our upstream endpoints */
496         ConfigObject::TMap::Range range = ConfigObject::GetObjects("endpoint");
497
498         for (ConfigObject::TMap::Iterator it = range.first; it != range.second; it++) {
499                 ConfigObject::Ptr object = it->second;
500
501                 /* Check if we're already connected to this endpoint. */
502                 if (endpointManager->GetEndpointByIdentity(object->GetName()))
503                         continue;
504
505                 string node, service;
506                 if (object->GetProperty("node", &node) && object->GetProperty("service", &service)) {
507                         /* reconnect to this endpoint */
508                         endpointManager->AddConnection(node, service);
509                 }
510         }
511
512         map<string, ComponentDiscoveryInfo::Ptr>::iterator curr, i;
513         for (i = m_Components.begin(); i != m_Components.end(); ) {
514                 string identity = i->first;
515                 ComponentDiscoveryInfo::Ptr info = i->second;
516
517                 curr = i;
518                 i++;
519
520                 if (info->LastSeen < now - DiscoveryComponent::RegistrationTTL) {
521                         /* unregister this component if its registration has expired */
522                         m_Components.erase(curr);
523                         continue;
524                 }
525
526                 /* send discovery message to all connected components to
527                         refresh their TTL for this component */
528                 SendDiscoveryMessage("discovery::NewComponent", identity, Endpoint::Ptr());
529
530                 Endpoint::Ptr endpoint = endpointManager->GetEndpointByIdentity(identity);
531                 if (endpoint && endpoint->IsConnected()) {
532                         /* update LastSeen if we're still connected to this endpoint */
533                         info->LastSeen = now;
534                 } else {
535                         /* TODO: figure out whether we actually want to connect to this component */
536                         /* try and reconnect to this component */
537                         endpointManager->AddConnection(info->Node, info->Service);
538                 }
539         }
540
541         return 0;
542 }
543
544 EXPORT_COMPONENT(discovery, DiscoveryComponent);