]> granicus.if.org Git - icinga2/blob - icinga/endpointmanager.cpp
Replaced custom event code with Boost.Signals.
[icinga2] / icinga / endpointmanager.cpp
1 /******************************************************************************
2  * Icinga 2                                                                   *
3  * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/)        *
4  *                                                                            *
5  * This program is free software; you can redistribute it and/or              *
6  * modify it under the terms of the GNU General Public License                *
7  * as published by the Free Software Foundation; either version 2             *
8  * of the License, or (at your option) any later version.                     *
9  *                                                                            *
10  * This program is distributed in the hope that it will be useful,            *
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
13  * GNU General Public License for more details.                               *
14  *                                                                            *
15  * You should have received a copy of the GNU General Public License          *
16  * along with this program; if not, write to the Free Software Foundation     *
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
18  ******************************************************************************/
19
20 #include "i2-icinga.h"
21
22 using namespace icinga;
23
24 /**
25  * Sets the identity of the endpoint manager. This identity is used when
26  * connecting to remote peers.
27  *
28  * @param identity The new identity.
29  */
30 void EndpointManager::SetIdentity(string identity)
31 {
32         m_Identity = identity;
33 }
34
35 /**
36  * Retrieves the identity for the endpoint manager.
37  *
38  * @returns The identity.
39  */
40 string EndpointManager::GetIdentity(void) const
41 {
42         return m_Identity;
43 }
44
45 /**
46  * Sets the SSL context that is used for remote connections.
47  *
48  * @param sslContext The new SSL context.
49  */
50 void EndpointManager::SetSSLContext(shared_ptr<SSL_CTX> sslContext)
51 {
52         m_SSLContext = sslContext;
53 }
54
55 /**
56  * Retrieves the SSL context that is used for remote connections.
57  *
58  * @returns The SSL context.
59  */
60 shared_ptr<SSL_CTX> EndpointManager::GetSSLContext(void) const
61 {
62         return m_SSLContext;
63 }
64
65 /**
66  * Creates a new JSON-RPC listener on the specified port.
67  *
68  * @param service The port to listen on.
69  */
70 void EndpointManager::AddListener(string service)
71 {
72         if (!GetSSLContext())
73                 throw logic_error("SSL context is required for AddListener()");
74
75         stringstream s;
76         s << "Adding new listener: port " << service;
77         Application::Log(s.str());
78
79         JsonRpcServer::Ptr server = make_shared<JsonRpcServer>(m_SSLContext);
80         RegisterServer(server);
81
82         server->Bind(service, AF_INET6);
83         server->Listen();
84         server->Start();
85 }
86
87 /**
88  * Creates a new JSON-RPC client and connects to the specified host and port.
89  *
90  * @param node The remote host.
91  * @param service The remote port.
92  */
93 void EndpointManager::AddConnection(string node, string service)
94 {
95         stringstream s;
96         s << "Adding new endpoint: [" << node << "]:" << service;
97         Application::Log(s.str());
98
99         JsonRpcEndpoint::Ptr endpoint = make_shared<JsonRpcEndpoint>();
100         RegisterEndpoint(endpoint);
101         endpoint->Connect(node, service, m_SSLContext);
102 }
103
104 /**
105  * Registers a new JSON-RPC server with this endpoint manager.
106  *
107  * @param server The JSON-RPC server.
108  */
109 void EndpointManager::RegisterServer(JsonRpcServer::Ptr server)
110 {
111         m_Servers.push_back(server);
112         server->OnNewClient.connect(bind(&EndpointManager::NewClientHandler,
113             this, _1));
114 }
115
116 /**
117  * Processes a new client connection.
118  *
119  * @param ncea Event arguments.
120  */
121 int EndpointManager::NewClientHandler(const NewClientEventArgs& ncea)
122 {
123         string address = ncea.Client->GetPeerAddress();
124         Application::Log("Accepted new client from " + address);
125
126         JsonRpcEndpoint::Ptr endpoint = make_shared<JsonRpcEndpoint>();
127         endpoint->SetClient(static_pointer_cast<JsonRpcClient>(ncea.Client));
128         RegisterEndpoint(endpoint);
129
130         return 0;
131 }
132
133 /**
134  * Unregisters a JSON-RPC server.
135  *
136  * @param server The JSON-RPC server.
137  */
138 void EndpointManager::UnregisterServer(JsonRpcServer::Ptr server)
139 {
140         m_Servers.erase(
141             remove(m_Servers.begin(), m_Servers.end(), server),
142             m_Servers.end());
143         // TODO: unbind event
144 }
145
146 /**
147  * Registers a new endpoint with this endpoint manager.
148  *
149  * @param endpoint The new endpoint.
150  */
151 void EndpointManager::RegisterEndpoint(Endpoint::Ptr endpoint)
152 {
153         if (!endpoint->IsLocal() && endpoint->GetIdentity() != "")
154                 throw invalid_argument("Identity must be empty.");
155
156         endpoint->SetEndpointManager(static_pointer_cast<EndpointManager>(shared_from_this()));
157         m_Endpoints.push_back(endpoint);
158
159         NewEndpointEventArgs neea;
160         neea.Source = shared_from_this();
161         neea.Endpoint = endpoint;
162         OnNewEndpoint(neea);
163 }
164
165 /**
166  * Unregisters an endpoint.
167  *
168  * @param endpoint The endpoint.
169  */
170 void EndpointManager::UnregisterEndpoint(Endpoint::Ptr endpoint)
171 {
172         m_Endpoints.erase(
173             remove(m_Endpoints.begin(), m_Endpoints.end(), endpoint),
174             m_Endpoints.end());
175 }
176
177 /**
178  * Sends a unicast message to the specified recipient.
179  *
180  * @param sender The sender of the message.
181  * @param recipient The recipient of the message.
182  * @param message The request.
183  */
184 void EndpointManager::SendUnicastMessage(Endpoint::Ptr sender,
185     Endpoint::Ptr recipient, const MessagePart& message)
186 {
187         /* don't forward messages between non-local endpoints */
188         if (!sender->IsLocal() && !recipient->IsLocal())
189                 return;
190
191         if (ResponseMessage::IsResponseMessage(message))
192                 recipient->ProcessResponse(sender, message);
193         else
194                 recipient->ProcessRequest(sender, message);
195 }
196
197 /**
198  * Sends a message to exactly one recipient out of all recipients who have a
199  * subscription for the message's topic.
200  *
201  * @param sender The sender of the message.
202  * @param message The message.
203  */
204 void EndpointManager::SendAnycastMessage(Endpoint::Ptr sender,
205     const RequestMessage& message)
206 {
207         string method;
208         if (!message.GetMethod(&method))
209                 throw invalid_argument("Message is missing the 'method' property.");
210
211         vector<Endpoint::Ptr> candidates;
212         for (vector<Endpoint::Ptr>::iterator i = m_Endpoints.begin(); i != m_Endpoints.end(); i++)
213         {
214                 Endpoint::Ptr endpoint = *i;
215                 if (endpoint->HasSubscription(method))
216                         candidates.push_back(endpoint);
217         }
218
219         if (candidates.size() == 0)
220                 return;
221
222         Endpoint::Ptr recipient = candidates[rand() % candidates.size()];
223         SendUnicastMessage(sender, recipient, message);
224 }
225
226 /**
227  * Sends a message to all recipients who have a subscription for the
228  * message's topic.
229  *
230  * @param sender The sender of the message.
231  * @param message The message.
232  */
233 void EndpointManager::SendMulticastMessage(Endpoint::Ptr sender,
234     const RequestMessage& message)
235 {
236         string id;
237         if (message.GetID(&id))
238                 throw invalid_argument("Multicast requests must not have an ID.");
239
240         string method;
241         if (!message.GetMethod(&method))
242                 throw invalid_argument("Message is missing the 'method' property.");
243
244         for (vector<Endpoint::Ptr>::iterator i = m_Endpoints.begin(); i != m_Endpoints.end(); i++)
245         {
246                 Endpoint::Ptr recipient = *i;
247
248                 /* don't forward messages back to the sender */
249                 if (sender == recipient)
250                         continue;
251
252                 if (recipient->HasSubscription(method))
253                         SendUnicastMessage(sender, recipient, message);
254         }
255 }
256
257 /**
258  * Calls the specified callback function for each registered endpoint.
259  *
260  * @param callback The callback function.
261  */
262 void EndpointManager::ForEachEndpoint(function<int (const NewEndpointEventArgs&)> callback)
263 {
264         NewEndpointEventArgs neea;
265         neea.Source = shared_from_this();
266
267         vector<Endpoint::Ptr>::iterator prev, i;
268         for (i = m_Endpoints.begin(); i != m_Endpoints.end(); ) {
269                 prev = i;
270                 i++;
271
272                 neea.Endpoint = *prev;
273                 callback(neea);
274         }
275 }
276
277 /**
278  * Retrieves an endpoint that has the specified identity.
279  *
280  * @param identity The identity of the endpoint.
281  */
282 Endpoint::Ptr EndpointManager::GetEndpointByIdentity(string identity) const
283 {
284         vector<Endpoint::Ptr>::const_iterator i;
285         for (i = m_Endpoints.begin(); i != m_Endpoints.end(); i++) {
286                 if ((*i)->GetIdentity() == identity)
287                         return *i;
288         }
289
290         return Endpoint::Ptr();
291 }
292
293 void EndpointManager::SendAPIMessage(Endpoint::Ptr sender,
294     RequestMessage& message,
295     function<int(const NewResponseEventArgs&)> callback, time_t timeout)
296 {
297         m_NextMessageID++;
298
299         stringstream idstream;
300         idstream << m_NextMessageID;
301
302         string id = idstream.str();
303         message.SetID(id);
304
305         PendingRequest pr;
306         pr.Request = message;
307         pr.Callback = callback;
308         pr.Timeout = time(NULL) + timeout;
309
310         m_Requests[id] = pr;
311         RescheduleRequestTimer();
312
313         SendAnycastMessage(sender, message);
314 }
315
316 bool EndpointManager::RequestTimeoutLessComparer(const pair<string, PendingRequest>& a,
317     const pair<string, PendingRequest>& b)
318 {
319         return a.second.Timeout < b.second.Timeout;
320 }
321
322 void EndpointManager::RescheduleRequestTimer(void)
323 {
324         map<string, PendingRequest>::iterator it;
325         it = min_element(m_Requests.begin(), m_Requests.end(),
326             &EndpointManager::RequestTimeoutLessComparer);
327
328         if (!m_RequestTimer) {
329                 m_RequestTimer = make_shared<Timer>();
330                 m_RequestTimer->OnTimerExpired.connect(bind(&EndpointManager::RequestTimerHandler, this, _1));
331         }
332
333         if (it != m_Requests.end()) {
334                 time_t now;
335                 time(&now);
336
337                 time_t next_timeout = (it->second.Timeout < now) ? now : it->second.Timeout;
338                 m_RequestTimer->SetInterval(next_timeout - now);
339                 m_RequestTimer->Start();
340         } else {
341                 m_RequestTimer->Stop();
342         }
343 }
344
345 int EndpointManager::RequestTimerHandler(const TimerEventArgs& ea)
346 {
347         map<string, PendingRequest>::iterator it;
348         for (it = m_Requests.begin(); it != m_Requests.end(); it++) {
349                 if (it->second.HasTimedOut()) {
350                         NewResponseEventArgs nrea;
351                         nrea.Request = it->second.Request;
352                         nrea.Source = shared_from_this();
353                         nrea.TimedOut = true;
354
355                         it->second.Callback(nrea);
356
357                         m_Requests.erase(it);
358
359                         break;
360                 }
361         }
362
363         RescheduleRequestTimer();
364
365         return 0;
366 }
367
368 void EndpointManager::ProcessResponseMessage(const Endpoint::Ptr& sender, const ResponseMessage& message)
369 {
370         string id;
371         if (!message.GetID(&id))
372                 throw invalid_argument("Response message must have a message ID.");
373
374         map<string, PendingRequest>::iterator it;
375         it = m_Requests.find(id);
376
377         if (it == m_Requests.end())
378                 return;
379
380         NewResponseEventArgs nrea;
381         nrea.Sender = sender;
382         nrea.Request = it->second.Request;
383         nrea.Response = message;
384         nrea.Source = shared_from_this();
385         nrea.TimedOut = false;
386
387         it->second.Callback(nrea);
388
389         m_Requests.erase(it);
390         RescheduleRequestTimer();
391 }