1 /******************************************************************************
3 * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
5 * This program is free software; you can redistribute it and/or *
6 * modify it under the terms of the GNU General Public License *
7 * as published by the Free Software Foundation; either version 2 *
8 * of the License, or (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the Free Software Foundation *
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
18 ******************************************************************************/
20 #include "i2-icinga.h"
22 using namespace icinga;
25 * Sets the identity of the endpoint manager. This identity is used when
26 * connecting to remote peers.
28 * @param identity The new identity.
30 void EndpointManager::SetIdentity(string identity)
32 m_Identity = identity;
36 * Retrieves the identity for the endpoint manager.
38 * @returns The identity.
40 string EndpointManager::GetIdentity(void) const
46 * Sets the SSL context that is used for remote connections.
48 * @param sslContext The new SSL context.
50 void EndpointManager::SetSSLContext(shared_ptr<SSL_CTX> sslContext)
52 m_SSLContext = sslContext;
56 * Retrieves the SSL context that is used for remote connections.
58 * @returns The SSL context.
60 shared_ptr<SSL_CTX> EndpointManager::GetSSLContext(void) const
66 * Creates a new JSON-RPC listener on the specified port.
68 * @param service The port to listen on.
70 void EndpointManager::AddListener(string service)
73 throw logic_error("SSL context is required for AddListener()");
76 s << "Adding new listener: port " << service;
77 Application::Log(s.str());
79 JsonRpcServer::Ptr server = make_shared<JsonRpcServer>(m_SSLContext);
80 RegisterServer(server);
82 server->Bind(service, AF_INET6);
88 * Creates a new JSON-RPC client and connects to the specified host and port.
90 * @param node The remote host.
91 * @param service The remote port.
93 void EndpointManager::AddConnection(string node, string service)
96 s << "Adding new endpoint: [" << node << "]:" << service;
97 Application::Log(s.str());
99 JsonRpcEndpoint::Ptr endpoint = make_shared<JsonRpcEndpoint>();
100 RegisterEndpoint(endpoint);
101 endpoint->Connect(node, service, m_SSLContext);
105 * Registers a new JSON-RPC server with this endpoint manager.
107 * @param server The JSON-RPC server.
109 void EndpointManager::RegisterServer(JsonRpcServer::Ptr server)
111 m_Servers.push_back(server);
112 server->OnNewClient += bind_weak(&EndpointManager::NewClientHandler,
117 * Processes a new client connection.
119 * @param ncea Event arguments.
121 int EndpointManager::NewClientHandler(const NewClientEventArgs& ncea)
123 string address = ncea.Client->GetPeerAddress();
124 Application::Log("Accepted new client from " + address);
126 JsonRpcEndpoint::Ptr endpoint = make_shared<JsonRpcEndpoint>();
127 endpoint->SetClient(static_pointer_cast<JsonRpcClient>(ncea.Client));
128 RegisterEndpoint(endpoint);
134 * Unregisters a JSON-RPC server.
136 * @param server The JSON-RPC server.
138 void EndpointManager::UnregisterServer(JsonRpcServer::Ptr server)
141 remove(m_Servers.begin(), m_Servers.end(), server),
143 // TODO: unbind event
147 * Registers a new endpoint with this endpoint manager.
149 * @param endpoint The new endpoint.
151 void EndpointManager::RegisterEndpoint(Endpoint::Ptr endpoint)
153 if (!endpoint->IsLocal() && endpoint->GetIdentity() != "")
154 throw invalid_argument("Identity must be empty.");
156 endpoint->SetEndpointManager(static_pointer_cast<EndpointManager>(shared_from_this()));
157 m_Endpoints.push_back(endpoint);
159 NewEndpointEventArgs neea;
160 neea.Source = shared_from_this();
161 neea.Endpoint = endpoint;
166 * Unregisters an endpoint.
168 * @param endpoint The endpoint.
170 void EndpointManager::UnregisterEndpoint(Endpoint::Ptr endpoint)
173 remove(m_Endpoints.begin(), m_Endpoints.end(), endpoint),
178 * Sends a unicast message to the specified recipient.
180 * @param sender The sender of the message.
181 * @param recipient The recipient of the message.
182 * @param message The request.
184 void EndpointManager::SendUnicastMessage(Endpoint::Ptr sender,
185 Endpoint::Ptr recipient, const MessagePart& message)
187 /* don't forward messages between non-local endpoints */
188 if (!sender->IsLocal() && !recipient->IsLocal())
191 if (ResponseMessage::IsResponseMessage(message))
192 recipient->ProcessResponse(sender, message);
194 recipient->ProcessRequest(sender, message);
198 * Sends a message to exactly one recipient out of all recipients who have a
199 * subscription for the message's topic.
201 * @param sender The sender of the message.
202 * @param message The message.
204 void EndpointManager::SendAnycastMessage(Endpoint::Ptr sender,
205 const RequestMessage& message)
208 if (!message.GetMethod(&method))
209 throw invalid_argument("Message is missing the 'method' property.");
211 vector<Endpoint::Ptr> candidates;
212 for (vector<Endpoint::Ptr>::iterator i = m_Endpoints.begin(); i != m_Endpoints.end(); i++)
214 Endpoint::Ptr endpoint = *i;
215 if (endpoint->HasSubscription(method))
216 candidates.push_back(endpoint);
219 if (candidates.size() == 0)
222 Endpoint::Ptr recipient = candidates[rand() % candidates.size()];
223 SendUnicastMessage(sender, recipient, message);
227 * Sends a message to all recipients who have a subscription for the
230 * @param sender The sender of the message.
231 * @param message The message.
233 void EndpointManager::SendMulticastMessage(Endpoint::Ptr sender,
234 const RequestMessage& message)
237 if (message.GetID(&id))
238 throw invalid_argument("Multicast requests must not have an ID.");
241 if (!message.GetMethod(&method))
242 throw invalid_argument("Message is missing the 'method' property.");
244 for (vector<Endpoint::Ptr>::iterator i = m_Endpoints.begin(); i != m_Endpoints.end(); i++)
246 Endpoint::Ptr recipient = *i;
248 /* don't forward messages back to the sender */
249 if (sender == recipient)
252 if (recipient->HasSubscription(method))
253 SendUnicastMessage(sender, recipient, message);
258 * Calls the specified callback function for each registered endpoint.
260 * @param callback The callback function.
262 void EndpointManager::ForEachEndpoint(function<int (const NewEndpointEventArgs&)> callback)
264 NewEndpointEventArgs neea;
265 neea.Source = shared_from_this();
267 vector<Endpoint::Ptr>::iterator prev, i;
268 for (i = m_Endpoints.begin(); i != m_Endpoints.end(); ) {
272 neea.Endpoint = *prev;
278 * Retrieves an endpoint that has the specified identity.
280 * @param identity The identity of the endpoint.
282 Endpoint::Ptr EndpointManager::GetEndpointByIdentity(string identity) const
284 vector<Endpoint::Ptr>::const_iterator i;
285 for (i = m_Endpoints.begin(); i != m_Endpoints.end(); i++) {
286 if ((*i)->GetIdentity() == identity)
290 return Endpoint::Ptr();
293 void EndpointManager::SendAPIMessage(Endpoint::Ptr sender,
294 RequestMessage& message,
295 function<int(const NewResponseEventArgs&)> callback, time_t timeout)
299 stringstream idstream;
300 idstream << m_NextMessageID;
302 string id = idstream.str();
306 pr.Request = message;
307 pr.Callback = callback;
308 pr.Timeout = time(NULL) + timeout;
311 RescheduleRequestTimer();
313 SendAnycastMessage(sender, message);
316 bool EndpointManager::RequestTimeoutLessComparer(const pair<string, PendingRequest>& a,
317 const pair<string, PendingRequest>& b)
319 return a.second.Timeout < b.second.Timeout;
322 void EndpointManager::RescheduleRequestTimer(void)
324 map<string, PendingRequest>::iterator it;
325 it = min_element(m_Requests.begin(), m_Requests.end(),
326 &EndpointManager::RequestTimeoutLessComparer);
328 if (!m_RequestTimer) {
329 m_RequestTimer = make_shared<Timer>();
330 m_RequestTimer->OnTimerExpired += bind_weak(&EndpointManager::RequestTimerHandler, shared_from_this());
333 if (it != m_Requests.end()) {
337 time_t next_timeout = (it->second.Timeout < now) ? now : it->second.Timeout;
338 m_RequestTimer->SetInterval(next_timeout - now);
339 m_RequestTimer->Start();
341 m_RequestTimer->Stop();
345 int EndpointManager::RequestTimerHandler(const TimerEventArgs& ea)
347 map<string, PendingRequest>::iterator it;
348 for (it = m_Requests.begin(); it != m_Requests.end(); it++) {
349 if (it->second.HasTimedOut()) {
350 NewResponseEventArgs nrea;
351 nrea.Request = it->second.Request;
352 nrea.Source = shared_from_this();
353 nrea.TimedOut = true;
355 it->second.Callback(nrea);
357 m_Requests.erase(it);
363 RescheduleRequestTimer();
368 void EndpointManager::ProcessResponseMessage(const Endpoint::Ptr& sender, const ResponseMessage& message)
371 if (!message.GetID(&id))
372 throw invalid_argument("Response message must have a message ID.");
374 map<string, PendingRequest>::iterator it;
375 it = m_Requests.find(id);
377 if (it == m_Requests.end())
380 NewResponseEventArgs nrea;
381 nrea.Sender = sender;
382 nrea.Request = it->second.Request;
383 nrea.Response = message;
384 nrea.Source = shared_from_this();
385 nrea.TimedOut = false;
387 it->second.Callback(nrea);
389 m_Requests.erase(it);
390 RescheduleRequestTimer();