1 /******************************************************************************
3 * Copyright (C) 2012-2014 Icinga Development Team (http://www.icinga.org) *
5 * This program is free software; you can redistribute it and/or *
6 * modify it under the terms of the GNU General Public License *
7 * as published by the Free Software Foundation; either version 2 *
8 * of the License, or (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the Free Software Foundation *
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
18 ******************************************************************************/
20 #include "remote/apilistener.hpp"
21 #include "remote/apiclient.hpp"
22 #include "remote/endpoint.hpp"
23 #include "base/convert.hpp"
24 #include "base/netstring.hpp"
25 #include "base/dynamictype.hpp"
26 #include "base/logger_fwd.hpp"
27 #include "base/objectlock.hpp"
28 #include "base/stdiostream.hpp"
29 #include "base/application.hpp"
30 #include "base/context.hpp"
31 #include "base/statsfunction.hpp"
34 using namespace icinga;
36 REGISTER_TYPE(ApiListener);
38 boost::signals2::signal<void(bool)> ApiListener::OnMasterChanged;
40 REGISTER_STATSFUNCTION(ApiListenerStats, &ApiListener::StatsFunc);
42 void ApiListener::OnConfigLoaded(void)
44 /* set up SSL context */
45 shared_ptr<X509> cert = make_shared<X509>();
47 cert = GetX509Certificate(GetCertPath());
48 } catch (std::exception&) {
49 Log(LogCritical, "ApiListener", "Cannot get certificate from cert path: '" + GetCertPath() + "'.");
54 SetIdentity(GetCertificateCN(cert));
55 } catch (std::exception&) {
56 Log(LogCritical, "ApiListener", "Cannot get certificate common name from cert path: '" + GetCertPath() + "'.");
60 Log(LogInformation, "ApiListener", "My API identity: " + GetIdentity());
63 m_SSLContext = MakeSSLContext(GetCertPath(), GetKeyPath(), GetCaPath());
64 } catch (std::exception&) {
65 Log(LogCritical, "ApiListener", "Cannot make SSL context for cert path: '" + GetCertPath() + "' key path: '" + GetKeyPath() + "' ca path: '" + GetCaPath() + "'.");
69 if (!GetCrlPath().IsEmpty()) {
71 AddCRLToSSLContext(m_SSLContext, GetCrlPath());
72 } catch (std::exception&) {
73 Log(LogCritical, "ApiListener", "Cannot add certificate revocation list to SSL context for crl path: '" + GetCrlPath() + "'.");
78 if (!Endpoint::GetByName(GetIdentity())) {
79 Log(LogCritical, "ApiListener", "Endpoint object for '" + GetIdentity() + "' is missing.");
87 * Starts the component.
89 void ApiListener::Start(void)
91 if (std::distance(DynamicType::GetObjects<ApiListener>().first, DynamicType::GetObjects<ApiListener>().second) > 1) {
92 Log(LogCritical, "ApiListener", "Only one ApiListener object is allowed.");
96 DynamicObject::Start();
99 boost::mutex::scoped_lock(m_LogLock);
104 /* create the primary JSON-RPC listener */
105 if (!AddListener(GetBindPort())) {
106 Log(LogCritical, "ApiListener", "Cannot add listener for port '" + Convert::ToString(GetBindPort()) + "'.");
107 Application::Exit(EXIT_FAILURE);
110 m_Timer = make_shared<Timer>();
111 m_Timer->OnTimerExpired.connect(boost::bind(&ApiListener::ApiTimerHandler, this));
112 m_Timer->SetInterval(5);
114 m_Timer->Reschedule(0);
116 OnMasterChanged(true);
119 ApiListener::Ptr ApiListener::GetInstance(void)
121 BOOST_FOREACH(const ApiListener::Ptr& listener, DynamicType::GetObjects<ApiListener>())
124 return ApiListener::Ptr();
127 shared_ptr<SSL_CTX> ApiListener::GetSSLContext(void) const
132 Endpoint::Ptr ApiListener::GetMaster(void) const
134 Zone::Ptr zone = Zone::GetLocalZone();
137 return Endpoint::Ptr();
139 std::vector<String> names;
141 BOOST_FOREACH(const Endpoint::Ptr& endpoint, zone->GetEndpoints())
142 if (endpoint->IsConnected() || endpoint->GetName() == GetIdentity())
143 names.push_back(endpoint->GetName());
145 std::sort(names.begin(), names.end());
147 return Endpoint::GetByName(*names.begin());
150 bool ApiListener::IsMaster(void) const
152 Endpoint::Ptr master = GetMaster();
157 return master->GetName() == GetIdentity();
161 * Creates a new JSON-RPC listener on the specified port.
163 * @param service The port to listen on.
165 bool ApiListener::AddListener(const String& service)
167 ObjectLock olock(this);
169 shared_ptr<SSL_CTX> sslContext = m_SSLContext;
172 Log(LogCritical, "ApiListener", "SSL context is required for AddListener()");
176 std::ostringstream s;
177 s << "Adding new listener: port " << service;
178 Log(LogInformation, "ApiListener", s.str());
180 TcpSocket::Ptr server = make_shared<TcpSocket>();
183 server->Bind(service, AF_UNSPEC);
184 } catch(std::exception&) {
185 Log(LogCritical, "ApiListener", "Cannot bind tcp socket on '" + service + "'.");
189 boost::thread thread(boost::bind(&ApiListener::ListenerThreadProc, this, server));
192 m_Servers.insert(server);
197 void ApiListener::ListenerThreadProc(const Socket::Ptr& server)
199 Utility::SetThreadName("API Listener");
205 Socket::Ptr client = server->Accept();
206 Utility::QueueAsyncCallback(boost::bind(&ApiListener::NewClientHandler, this, client, RoleServer));
207 } catch (std::exception&) {
208 Log(LogCritical, "ApiListener", "Cannot accept new connection.");
214 * Creates a new JSON-RPC client and connects to the specified endpoint.
216 * @param endpoint The endpoint.
218 void ApiListener::AddConnection(const Endpoint::Ptr& endpoint)
221 ObjectLock olock(this);
223 shared_ptr<SSL_CTX> sslContext = m_SSLContext;
226 Log(LogCritical, "ApiListener", "SSL context is required for AddConnection()");
231 String host = endpoint->GetHost();
232 String port = endpoint->GetPort();
234 TcpSocket::Ptr client = make_shared<TcpSocket>();
237 endpoint->SetConnecting(true);
238 client->Connect(host, port);
239 NewClientHandler(client, RoleClient);
240 endpoint->SetConnecting(false);
241 } catch (const std::exception& ex) {
242 endpoint->SetConnecting(false);
245 std::ostringstream info, debug;
246 info << "Cannot connect to host '" << host << "' on port '" << port << "'";
247 debug << info.str() << std::endl << DiagnosticInformation(ex);
248 Log(LogCritical, "ApiListener", info.str());
249 Log(LogDebug, "ApiListener", debug.str());
254 * Processes a new client connection.
256 * @param client The new client.
258 void ApiListener::NewClientHandler(const Socket::Ptr& client, ConnectionRole role)
260 CONTEXT("Handling new API client connection");
262 TlsStream::Ptr tlsStream;
265 ObjectLock olock(this);
267 tlsStream = make_shared<TlsStream>(client, role, m_SSLContext);
268 } catch (std::exception&) {
269 Log(LogCritical, "ApiListener", "Cannot create tls stream from client connection.");
275 tlsStream->Handshake();
276 } catch (std::exception) {
277 Log(LogCritical, "ApiListener", "Client TLS handshake failed.");
281 shared_ptr<X509> cert = tlsStream->GetPeerCertificate();
285 identity = GetCertificateCN(cert);
286 } catch (std::exception&) {
287 Log(LogCritical, "ApiListener", "Cannot get certificate common name from cert path: '" + GetCertPath() + "'.");
291 Log(LogInformation, "ApiListener", "New client connection for identity '" + identity + "'");
293 Endpoint::Ptr endpoint = Endpoint::GetByName(identity);
295 bool need_sync = false;
298 need_sync = !endpoint->IsConnected();
300 ApiClient::Ptr aclient = make_shared<ApiClient>(identity, tlsStream, role);
306 ObjectLock olock(endpoint);
308 endpoint->SetSyncing(true);
314 SendConfigUpdate(aclient);
316 endpoint->AddClient(aclient);
318 AddAnonymousClient(aclient);
321 void ApiListener::ApiTimerHandler(void)
323 double now = Utility::GetTime();
325 std::vector<int> files;
326 Utility::Glob(GetApiDir() + "log/*", boost::bind(&ApiListener::LogGlobHandler, boost::ref(files), _1), GlobFile);
327 std::sort(files.begin(), files.end());
329 BOOST_FOREACH(int ts, files) {
332 BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
333 if (endpoint->GetName() == GetIdentity())
336 if (endpoint->GetLogDuration() >= 0 && ts < now - endpoint->GetLogDuration())
339 if (ts > endpoint->GetLocalLogPosition()) {
346 String path = GetApiDir() + "log/" + Convert::ToString(ts);
347 Log(LogNotice, "ApiListener", "Removing old log file: " + path);
348 (void)unlink(path.CStr());
353 Zone::Ptr my_zone = Zone::GetLocalZone();
355 BOOST_FOREACH(const Zone::Ptr& zone, DynamicType::GetObjects<Zone>()) {
356 /* only connect to endpoints in a) the same zone b) our parent zone c) immediate child zones */
357 if (my_zone != zone && my_zone != zone->GetParent() && zone != my_zone->GetParent())
360 bool connected = false;
362 BOOST_FOREACH(const Endpoint::Ptr& endpoint, zone->GetEndpoints()) {
363 if (endpoint->IsConnected()) {
369 /* don't connect to an endpoint if we already have a connection to the zone */
373 BOOST_FOREACH(const Endpoint::Ptr& endpoint, zone->GetEndpoints()) {
374 /* don't connect to ourselves */
375 if (endpoint->GetName() == GetIdentity())
378 /* don't try to connect to endpoints which don't have a host and port */
379 if (endpoint->GetHost().IsEmpty() || endpoint->GetPort().IsEmpty())
382 /* don't try to connect if there's already a connection attempt */
383 if (endpoint->GetConnecting())
386 Utility::QueueAsyncCallback(boost::bind(&ApiListener::AddConnection, this, endpoint));
391 BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
392 if (!endpoint->IsConnected())
395 double ts = endpoint->GetRemoteLogPosition();
400 Dictionary::Ptr lparams = make_shared<Dictionary>();
401 lparams->Set("log_position", ts);
403 Dictionary::Ptr lmessage = make_shared<Dictionary>();
404 lmessage->Set("jsonrpc", "2.0");
405 lmessage->Set("method", "log::SetLogPosition");
406 lmessage->Set("params", lparams);
408 BOOST_FOREACH(const ApiClient::Ptr& client, endpoint->GetClients())
409 client->SendMessage(lmessage);
411 Log(LogNotice, "ApiListener", "Setting log position for identity '" + endpoint->GetName() + "': " +
412 Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", ts));
415 Endpoint::Ptr master = GetMaster();
418 Log(LogNotice, "ApiListener", "Current zone master: " + master->GetName());
420 std::vector<String> names;
421 BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>())
422 if (endpoint->IsConnected())
423 names.push_back(endpoint->GetName() + " (" + Convert::ToString(endpoint->GetClients().size()) + ")");
425 Log(LogNotice, "ApiListener", "Connected endpoints: " + Utility::NaturalJoin(names));
428 void ApiListener::RelayMessage(const MessageOrigin& origin, const DynamicObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
430 m_RelayQueue.Enqueue(boost::bind(&ApiListener::SyncRelayMessage, this, origin, secobj, message, log));
433 void ApiListener::PersistMessage(const Dictionary::Ptr& message)
435 double ts = message->Get("ts");
439 Dictionary::Ptr pmessage = make_shared<Dictionary>();
440 pmessage->Set("timestamp", ts);
442 pmessage->Set("message", JsonSerialize(message));
444 boost::mutex::scoped_lock lock(m_LogLock);
446 NetString::WriteStringToStream(m_LogFile, JsonSerialize(pmessage));
448 SetLogMessageTimestamp(ts);
450 if (m_LogMessageCount > 50000) {
458 void ApiListener::SyncRelayMessage(const MessageOrigin& origin, const DynamicObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
460 double ts = Utility::GetTime();
461 message->Set("ts", ts);
463 Log(LogNotice, "ApiListener", "Relaying '" + message->Get("method") + "' message");
466 m_LogQueue.Enqueue(boost::bind(&ApiListener::PersistMessage, this, message));
469 message->Set("originZone", origin.FromZone->GetName());
471 bool is_master = IsMaster();
472 Endpoint::Ptr master = GetMaster();
473 Zone::Ptr my_zone = Zone::GetLocalZone();
475 std::vector<Endpoint::Ptr> skippedEndpoints;
476 std::set<Zone::Ptr> finishedZones;
478 BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
479 /* don't relay messages to ourselves or disconnected endpoints */
480 if (endpoint->GetName() == GetIdentity() || !endpoint->IsConnected())
483 Zone::Ptr target_zone = endpoint->GetZone();
485 /* don't relay the message to the zone through more than one endpoint */
486 if (finishedZones.find(target_zone) != finishedZones.end()) {
487 skippedEndpoints.push_back(endpoint);
491 /* don't relay messages back to the endpoint which we got the message from */
492 if (origin.FromClient && endpoint == origin.FromClient->GetEndpoint()) {
493 skippedEndpoints.push_back(endpoint);
497 /* don't relay messages back to the zone which we got the message from */
498 if (origin.FromZone && target_zone == origin.FromZone) {
499 skippedEndpoints.push_back(endpoint);
503 /* only relay message to the master if we're not currently the master */
504 if (!is_master && master != endpoint) {
505 skippedEndpoints.push_back(endpoint);
509 /* only relay the message to a) the same zone, b) the parent zone and c) direct child zones */
510 if (target_zone != my_zone && target_zone != my_zone->GetParent() &&
511 secobj->GetZone() != target_zone->GetName()) {
512 skippedEndpoints.push_back(endpoint);
516 /* only relay messages to zones which have access to the object */
517 if (!target_zone->CanAccessObject(secobj))
520 finishedZones.insert(target_zone);
523 ObjectLock olock(endpoint);
525 if (!endpoint->GetSyncing()) {
526 Log(LogNotice, "ApiListener", "Sending message to '" + endpoint->GetName() + "'");
528 BOOST_FOREACH(const ApiClient::Ptr& client, endpoint->GetClients())
529 client->SendMessage(message);
534 BOOST_FOREACH(const Endpoint::Ptr& endpoint, skippedEndpoints)
535 endpoint->SetLocalLogPosition(ts);
538 String ApiListener::GetApiDir(void)
540 return Application::GetLocalStateDir() + "/lib/icinga2/api/";
543 /* must hold m_LogLock */
544 void ApiListener::OpenLogFile(void)
546 String path = GetApiDir() + "log/current";
548 std::fstream *fp = new std::fstream(path.CStr(), std::fstream::out | std::ofstream::app);
551 Log(LogWarning, "ApiListener", "Could not open spool file: " + path);
555 m_LogFile = make_shared<StdioStream>(fp, true);
556 m_LogMessageCount = 0;
557 SetLogMessageTimestamp(Utility::GetTime());
560 /* must hold m_LogLock */
561 void ApiListener::CloseLogFile(void)
570 /* must hold m_LogLock */
571 void ApiListener::RotateLogFile(void)
573 double ts = GetLogMessageTimestamp();
576 ts = Utility::GetTime();
578 String oldpath = GetApiDir() + "log/current";
579 String newpath = GetApiDir() + "log/" + Convert::ToString(static_cast<int>(ts)+1);
580 (void) rename(oldpath.CStr(), newpath.CStr());
583 void ApiListener::LogGlobHandler(std::vector<int>& files, const String& file)
585 String name = Utility::BaseName(file);
590 ts = Convert::ToLong(name);
592 catch (const std::exception&) {
599 void ApiListener::ReplayLog(const ApiClient::Ptr& client)
601 Endpoint::Ptr endpoint = client->GetEndpoint();
603 CONTEXT("Replaying log for Endpoint '" + endpoint->GetName() + "'");
606 double peer_ts = endpoint->GetLocalLogPosition();
607 bool last_sync = false;
610 boost::mutex::scoped_lock lock(m_LogLock);
615 if (count == -1 || count > 50000) {
624 std::vector<int> files;
625 Utility::Glob(GetApiDir() + "log/*", boost::bind(&ApiListener::LogGlobHandler, boost::ref(files), _1), GlobFile);
626 std::sort(files.begin(), files.end());
628 BOOST_FOREACH(int ts, files) {
629 String path = GetApiDir() + "log/" + Convert::ToString(ts);
634 Log(LogNotice, "ApiListener", "Replaying log: " + path);
636 std::fstream *fp = new std::fstream(path.CStr(), std::fstream::in);
637 StdioStream::Ptr logStream = make_shared<StdioStream>(fp, true);
641 Dictionary::Ptr pmessage;
644 if (!NetString::ReadStringFromStream(logStream, &message))
647 pmessage = JsonDeserialize(message);
648 } catch (const std::exception&) {
649 Log(LogWarning, "ApiListener", "Unexpected end-of-file for cluster log: " + path);
651 /* Log files may be incomplete or corrupted. This is perfectly OK. */
655 if (pmessage->Get("timestamp") <= peer_ts)
658 NetString::WriteStringToStream(client->GetStream(), pmessage->Get("message"));
661 peer_ts = pmessage->Get("timestamp");
667 Log(LogNotice, "ApiListener", "Replayed " + Convert::ToString(count) + " messages.");
671 ObjectLock olock2(endpoint);
672 endpoint->SetSyncing(false);
682 Value ApiListener::StatsFunc(Dictionary::Ptr& status, Dictionary::Ptr& perfdata)
684 Dictionary::Ptr nodes = make_shared<Dictionary>();
685 std::pair<Dictionary::Ptr, Dictionary::Ptr> stats;
687 ApiListener::Ptr listener = ApiListener::GetInstance();
692 stats = listener->GetStatus();
694 BOOST_FOREACH(Dictionary::Pair const& kv, stats.second)
695 perfdata->Set("api_" + kv.first, kv.second);
697 status->Set("api", stats.first);
702 std::pair<Dictionary::Ptr, Dictionary::Ptr> ApiListener::GetStatus(void)
704 Dictionary::Ptr status = make_shared<Dictionary>();
705 Dictionary::Ptr perfdata = make_shared<Dictionary>();
708 status->Set("identity", GetIdentity());
710 double count_endpoints = 0;
711 Array::Ptr not_connected_endpoints = make_shared<Array>();
712 Array::Ptr connected_endpoints = make_shared<Array>();
714 BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
715 if (endpoint->GetName() == GetIdentity())
720 if (!endpoint->IsConnected())
721 not_connected_endpoints->Add(endpoint->GetName());
723 connected_endpoints->Add(endpoint->GetName());
726 status->Set("num_endpoints", count_endpoints);
727 status->Set("num_conn_endpoints", connected_endpoints->GetLength());
728 status->Set("num_not_conn_endpoints", not_connected_endpoints->GetLength());
729 status->Set("conn_endpoints", connected_endpoints);
730 status->Set("not_conn_endpoints", not_connected_endpoints);
732 perfdata->Set("num_endpoints", count_endpoints);
733 perfdata->Set("num_conn_endpoints", Convert::ToDouble(connected_endpoints->GetLength()));
734 perfdata->Set("num_not_conn_endpoints", Convert::ToDouble(not_connected_endpoints->GetLength()));
736 return std::make_pair(status, perfdata);
739 void ApiListener::AddAnonymousClient(const ApiClient::Ptr& aclient)
741 ObjectLock olock(this);
742 m_AnonymousClients.insert(aclient);
745 void ApiListener::RemoveAnonymousClient(const ApiClient::Ptr& aclient)
747 ObjectLock olock(this);
748 m_AnonymousClients.erase(aclient);
751 std::set<ApiClient::Ptr> ApiListener::GetAnonymousClients(void) const
753 ObjectLock olock(this);
754 return m_AnonymousClients;