1 /******************************************************************************
3 * Copyright (C) 2012-2018 Icinga Development Team (https://www.icinga.com/) *
5 * This program is free software; you can redistribute it and/or *
6 * modify it under the terms of the GNU General Public License *
7 * as published by the Free Software Foundation; either version 2 *
8 * of the License, or (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the Free Software Foundation *
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
18 ******************************************************************************/
20 #include "remote/apilistener.hpp"
21 #include "remote/apilistener.tcpp"
22 #include "remote/jsonrpcconnection.hpp"
23 #include "remote/endpoint.hpp"
24 #include "remote/jsonrpc.hpp"
25 #include "remote/apifunction.hpp"
26 #include "base/convert.hpp"
27 #include "base/netstring.hpp"
28 #include "base/json.hpp"
29 #include "base/configtype.hpp"
30 #include "base/logger.hpp"
31 #include "base/objectlock.hpp"
32 #include "base/stdiostream.hpp"
33 #include "base/perfdatavalue.hpp"
34 #include "base/application.hpp"
35 #include "base/context.hpp"
36 #include "base/statsfunction.hpp"
37 #include "base/exception.hpp"
40 using namespace icinga;
42 REGISTER_TYPE(ApiListener);
44 boost::signals2::signal<void(bool)> ApiListener::OnMasterChanged;
45 ApiListener::Ptr ApiListener::m_Instance;
47 REGISTER_STATSFUNCTION(ApiListener, &ApiListener::StatsFunc);
49 REGISTER_APIFUNCTION(Hello, icinga, &ApiListener::HelloAPIHandler);
51 ApiListener::ApiListener(void)
52 : m_SyncQueue(0, 4), m_LogMessageCount(0)
54 m_RelayQueue.SetName("ApiListener, RelayQueue");
55 m_SyncQueue.SetName("ApiListener, SyncQueue");
58 String ApiListener::GetApiDir(void)
60 return Application::GetLocalStateDir() + "/lib/icinga2/api/";
63 String ApiListener::GetCertsDir(void)
65 return Application::GetLocalStateDir() + "/lib/icinga2/certs/";
68 String ApiListener::GetCaDir(void)
70 return Application::GetLocalStateDir() + "/lib/icinga2/ca/";
73 String ApiListener::GetCertificateRequestsDir(void)
75 return Application::GetLocalStateDir() + "/lib/icinga2/certificate-requests/";
78 String ApiListener::GetDefaultCertPath(void)
80 return GetCertsDir() + "/" + ScriptGlobal::Get("NodeName") + ".crt";
83 String ApiListener::GetDefaultKeyPath(void)
85 return GetCertsDir() + "/" + ScriptGlobal::Get("NodeName") + ".key";
88 String ApiListener::GetDefaultCaPath(void)
90 return GetCertsDir() + "/ca.crt";
93 void ApiListener::CopyCertificateFile(const String& oldCertPath, const String& newCertPath)
97 if (!oldCertPath.IsEmpty() && stat(oldCertPath.CStr(), &st1) >= 0 && (stat(newCertPath.CStr(), &st2) < 0 || st1.st_mtime > st2.st_mtime)) {
98 Log(LogWarning, "ApiListener")
99 << "Copying '" << oldCertPath << "' certificate file to '" << newCertPath << "'";
101 Utility::MkDirP(Utility::DirName(newCertPath), 0700);
102 Utility::CopyFile(oldCertPath, newCertPath);
106 void ApiListener::OnConfigLoaded(void)
109 BOOST_THROW_EXCEPTION(ScriptError("Only one ApiListener object is allowed.", GetDebugInfo()));
113 String defaultCertPath = GetDefaultCertPath();
114 String defaultKeyPath = GetDefaultKeyPath();
115 String defaultCaPath = GetDefaultCaPath();
117 /* Migrate certificate location < 2.8 to the new default path. */
118 String oldCertPath = GetCertPath();
119 String oldKeyPath = GetKeyPath();
120 String oldCaPath = GetCaPath();
122 CopyCertificateFile(oldCertPath, defaultCertPath);
123 CopyCertificateFile(oldKeyPath, defaultKeyPath);
124 CopyCertificateFile(oldCaPath, defaultCaPath);
126 if (!oldCertPath.IsEmpty() && !oldKeyPath.IsEmpty() && !oldCaPath.IsEmpty()) {
127 Log(LogWarning, "ApiListener", "Please read the upgrading documentation for v2.8: https://www.icinga.com/docs/icinga2/latest/doc/16-upgrading-icinga-2/");
130 /* set up SSL context */
131 std::shared_ptr<X509> cert;
133 cert = GetX509Certificate(defaultCertPath);
134 } catch (const std::exception&) {
135 BOOST_THROW_EXCEPTION(ScriptError("Cannot get certificate from cert path: '"
136 + defaultCertPath + "'.", GetDebugInfo()));
140 SetIdentity(GetCertificateCN(cert));
141 } catch (const std::exception&) {
142 BOOST_THROW_EXCEPTION(ScriptError("Cannot get certificate common name from cert path: '"
143 + defaultCertPath + "'.", GetDebugInfo()));
146 Log(LogInformation, "ApiListener")
147 << "My API identity: " << GetIdentity();
152 void ApiListener::UpdateSSLContext(void)
154 std::shared_ptr<SSL_CTX> context;
157 context = MakeSSLContext(GetDefaultCertPath(), GetDefaultKeyPath(), GetDefaultCaPath());
158 } catch (const std::exception&) {
159 BOOST_THROW_EXCEPTION(ScriptError("Cannot make SSL context for cert path: '"
160 + GetDefaultCertPath() + "' key path: '" + GetDefaultKeyPath() + "' ca path: '" + GetDefaultCaPath() + "'.", GetDebugInfo()));
163 if (!GetCrlPath().IsEmpty()) {
165 AddCRLToSSLContext(context, GetCrlPath());
166 } catch (const std::exception&) {
167 BOOST_THROW_EXCEPTION(ScriptError("Cannot add certificate revocation list to SSL context for crl path: '"
168 + GetCrlPath() + "'.", GetDebugInfo()));
172 if (!GetCipherList().IsEmpty()) {
174 SetCipherListToSSLContext(context, GetCipherList());
175 } catch (const std::exception&) {
176 BOOST_THROW_EXCEPTION(ScriptError("Cannot set cipher list to SSL context for cipher list: '"
177 + GetCipherList() + "'.", GetDebugInfo()));
181 if (!GetTlsProtocolmin().IsEmpty()){
183 SetTlsProtocolminToSSLContext(context, GetTlsProtocolmin());
184 } catch (const std::exception&) {
185 BOOST_THROW_EXCEPTION(ScriptError("Cannot set minimum TLS protocol version to SSL context with tls_protocolmin: '" + GetTlsProtocolmin() + "'.", GetDebugInfo()));
189 m_SSLContext = context;
191 for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) {
192 for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
193 client->Disconnect();
197 for (const JsonRpcConnection::Ptr& client : m_AnonymousClients) {
198 client->Disconnect();
202 void ApiListener::OnAllConfigLoaded(void)
204 m_LocalEndpoint = Endpoint::GetByName(GetIdentity());
206 if (!m_LocalEndpoint)
207 BOOST_THROW_EXCEPTION(ScriptError("Endpoint object for '" + GetIdentity() + "' is missing.", GetDebugInfo()));
211 * Starts the component.
213 void ApiListener::Start(bool runtimeCreated)
215 Log(LogInformation, "ApiListener")
216 << "'" << GetName() << "' started.";
220 ObjectImpl<ApiListener>::Start(runtimeCreated);
223 boost::mutex::scoped_lock(m_LogLock);
228 /* create the primary JSON-RPC listener */
229 if (!AddListener(GetBindHost(), GetBindPort())) {
230 Log(LogCritical, "ApiListener")
231 << "Cannot add listener on host '" << GetBindHost() << "' for port '" << GetBindPort() << "'.";
232 Application::Exit(EXIT_FAILURE);
235 m_Timer = new Timer();
236 m_Timer->OnTimerExpired.connect(std::bind(&ApiListener::ApiTimerHandler, this));
237 m_Timer->SetInterval(5);
239 m_Timer->Reschedule(0);
241 m_ReconnectTimer = new Timer();
242 m_ReconnectTimer->OnTimerExpired.connect(std::bind(&ApiListener::ApiReconnectTimerHandler, this));
243 m_ReconnectTimer->SetInterval(60);
244 m_ReconnectTimer->Start();
245 m_ReconnectTimer->Reschedule(0);
247 m_AuthorityTimer = new Timer();
248 m_AuthorityTimer->OnTimerExpired.connect(std::bind(&ApiListener::UpdateObjectAuthority));
249 m_AuthorityTimer->SetInterval(30);
250 m_AuthorityTimer->Start();
252 m_CleanupCertificateRequestsTimer = new Timer();
253 m_CleanupCertificateRequestsTimer->OnTimerExpired.connect(std::bind(&ApiListener::CleanupCertificateRequestsTimerHandler, this));
254 m_CleanupCertificateRequestsTimer->SetInterval(3600);
255 m_CleanupCertificateRequestsTimer->Start();
256 m_CleanupCertificateRequestsTimer->Reschedule(0);
258 OnMasterChanged(true);
261 void ApiListener::Stop(bool runtimeDeleted)
263 ObjectImpl<ApiListener>::Stop(runtimeDeleted);
265 Log(LogInformation, "ApiListener")
266 << "'" << GetName() << "' stopped.";
268 boost::mutex::scoped_lock lock(m_LogLock);
272 ApiListener::Ptr ApiListener::GetInstance(void)
277 Endpoint::Ptr ApiListener::GetMaster(void) const
279 Zone::Ptr zone = Zone::GetLocalZone();
284 std::vector<String> names;
286 for (const Endpoint::Ptr& endpoint : zone->GetEndpoints())
287 if (endpoint->GetConnected() || endpoint->GetName() == GetIdentity())
288 names.push_back(endpoint->GetName());
290 std::sort(names.begin(), names.end());
292 return Endpoint::GetByName(*names.begin());
295 bool ApiListener::IsMaster(void) const
297 Endpoint::Ptr master = GetMaster();
302 return master == GetLocalEndpoint();
306 * Creates a new JSON-RPC listener on the specified port.
308 * @param node The host the listener should be bound to.
309 * @param service The port to listen on.
311 bool ApiListener::AddListener(const String& node, const String& service)
313 ObjectLock olock(this);
315 std::shared_ptr<SSL_CTX> sslContext = m_SSLContext;
318 Log(LogCritical, "ApiListener", "SSL context is required for AddListener()");
322 Log(LogInformation, "ApiListener")
323 << "Adding new listener on port '" << service << "'";
325 TcpSocket::Ptr server = new TcpSocket();
328 server->Bind(node, service, AF_UNSPEC);
329 } catch (const std::exception&) {
330 Log(LogCritical, "ApiListener")
331 << "Cannot bind TCP socket for host '" << node << "' on port '" << service << "'.";
335 std::thread thread(std::bind(&ApiListener::ListenerThreadProc, this, server));
338 m_Servers.insert(server);
343 void ApiListener::ListenerThreadProc(const Socket::Ptr& server)
345 Utility::SetThreadName("API Listener");
351 Socket::Ptr client = server->Accept();
352 std::thread thread(std::bind(&ApiListener::NewClientHandler, this, client, String(), RoleServer));
354 } catch (const std::exception&) {
355 Log(LogCritical, "ApiListener", "Cannot accept new connection.");
361 * Creates a new JSON-RPC client and connects to the specified endpoint.
363 * @param endpoint The endpoint.
365 void ApiListener::AddConnection(const Endpoint::Ptr& endpoint)
368 ObjectLock olock(this);
370 std::shared_ptr<SSL_CTX> sslContext = m_SSLContext;
373 Log(LogCritical, "ApiListener", "SSL context is required for AddConnection()");
378 String host = endpoint->GetHost();
379 String port = endpoint->GetPort();
381 Log(LogInformation, "ApiListener")
382 << "Reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'";
384 TcpSocket::Ptr client = new TcpSocket();
387 endpoint->SetConnecting(true);
388 client->Connect(host, port);
389 NewClientHandler(client, endpoint->GetName(), RoleClient);
390 endpoint->SetConnecting(false);
391 } catch (const std::exception& ex) {
392 endpoint->SetConnecting(false);
395 std::ostringstream info;
396 info << "Cannot connect to host '" << host << "' on port '" << port << "'";
397 Log(LogCritical, "ApiListener", info.str());
398 Log(LogDebug, "ApiListener")
399 << info.str() << "\n" << DiagnosticInformation(ex);
402 Log(LogInformation, "ApiListener")
403 << "Finished reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'";
406 void ApiListener::NewClientHandler(const Socket::Ptr& client, const String& hostname, ConnectionRole role)
409 NewClientHandlerInternal(client, hostname, role);
410 } catch (const std::exception& ex) {
411 Log(LogCritical, "ApiListener")
412 << "Exception while handling new API client connection: " << DiagnosticInformation(ex, false);
414 Log(LogDebug, "ApiListener")
415 << "Exception while handling new API client connection: " << DiagnosticInformation(ex);
420 * Processes a new client connection.
422 * @param client The new client.
424 void ApiListener::NewClientHandlerInternal(const Socket::Ptr& client, const String& hostname, ConnectionRole role)
426 CONTEXT("Handling new API client connection");
430 if (role == RoleClient)
435 conninfo += " " + client->GetPeerAddress();
437 TlsStream::Ptr tlsStream;
440 ObjectLock olock(this);
442 tlsStream = new TlsStream(client, hostname, role, m_SSLContext);
443 } catch (const std::exception&) {
444 Log(LogCritical, "ApiListener")
445 << "Cannot create TLS stream from client connection (" << conninfo << ")";
451 tlsStream->Handshake();
452 } catch (const std::exception&) {
453 Log(LogCritical, "ApiListener")
454 << "Client TLS handshake failed (" << conninfo << ")";
458 std::shared_ptr<X509> cert = tlsStream->GetPeerCertificate();
460 Endpoint::Ptr endpoint;
461 bool verify_ok = false;
465 identity = GetCertificateCN(cert);
466 } catch (const std::exception&) {
467 Log(LogCritical, "ApiListener")
468 << "Cannot get certificate common name from cert path: '" << GetDefaultCertPath() << "'.";
472 verify_ok = tlsStream->IsVerifyOK();
473 if (!hostname.IsEmpty()) {
474 if (identity != hostname) {
475 Log(LogWarning, "ApiListener")
476 << "Unexpected certificate common name while connecting to endpoint '"
477 << hostname << "': got '" << identity << "'";
479 } else if (!verify_ok) {
480 Log(LogWarning, "ApiListener")
481 << "Certificate validation failed for endpoint '" << hostname
482 << "': " << tlsStream->GetVerifyError();
487 endpoint = Endpoint::GetByName(identity);
490 Log log(LogInformation, "ApiListener");
492 log << "New client connection for identity '" << identity << "' " << conninfo;
495 log << " (certificate validation failed: " << tlsStream->GetVerifyError() << ")";
497 log << " (no Endpoint object found for identity)";
500 Log(LogInformation, "ApiListener")
501 << "New client connection " << conninfo << " (no client certificate)";
506 if (role == RoleClient) {
507 Dictionary::Ptr message = new Dictionary();
508 message->Set("jsonrpc", "2.0");
509 message->Set("method", "icinga::Hello");
510 message->Set("params", new Dictionary());
511 JsonRpc::SendMessage(tlsStream, message);
512 ctype = ClientJsonRpc;
514 tlsStream->WaitForData(5);
516 if (!tlsStream->IsDataAvailable()) {
517 Log(LogWarning, "ApiListener")
518 << "No data received on new API connection for identity '" << identity << "'. Ensure that the remote endpoints are properly configured in a cluster setup.";
523 tlsStream->Peek(&firstByte, 1, false);
525 if (firstByte >= '0' && firstByte <= '9')
526 ctype = ClientJsonRpc;
531 if (ctype == ClientJsonRpc) {
532 Log(LogNotice, "ApiListener", "New JSON-RPC client");
534 JsonRpcConnection::Ptr aclient = new JsonRpcConnection(identity, verify_ok, tlsStream, role);
538 bool needSync = !endpoint->GetConnected();
540 endpoint->AddClient(aclient);
542 m_SyncQueue.Enqueue(std::bind(&ApiListener::SyncClient, this, aclient, endpoint, needSync));
544 AddAnonymousClient(aclient);
546 Log(LogNotice, "ApiListener", "New HTTP client");
548 HttpServerConnection::Ptr aclient = new HttpServerConnection(identity, verify_ok, tlsStream);
550 AddHttpClient(aclient);
554 void ApiListener::SyncClient(const JsonRpcConnection::Ptr& aclient, const Endpoint::Ptr& endpoint, bool needSync)
556 Zone::Ptr eZone = endpoint->GetZone();
560 ObjectLock olock(endpoint);
562 endpoint->SetSyncing(true);
565 Zone::Ptr myZone = Zone::GetLocalZone();
567 if (myZone->GetParent() == eZone) {
568 Log(LogInformation, "ApiListener")
569 << "Requesting new certificate for this Icinga instance from endpoint '" << endpoint->GetName() << "'.";
571 JsonRpcConnection::SendCertificateRequest(aclient, nullptr, String());
573 if (Utility::PathExists(ApiListener::GetCertificateRequestsDir()))
574 Utility::Glob(ApiListener::GetCertificateRequestsDir() + "/*.json", std::bind(&JsonRpcConnection::SendCertificateRequest, aclient, nullptr, _1), GlobFile);
577 /* Make sure that the config updates are synced
578 * before the logs are replayed.
581 Log(LogInformation, "ApiListener")
582 << "Sending config updates for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
584 /* sync zone file config */
585 SendConfigUpdate(aclient);
587 Log(LogInformation, "ApiListener")
588 << "Finished sending config file updates for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
590 /* sync runtime config */
591 SendRuntimeConfigObjects(aclient);
593 Log(LogInformation, "ApiListener")
594 << "Finished sending runtime config updates for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
597 ObjectLock olock2(endpoint);
598 endpoint->SetSyncing(false);
602 Log(LogInformation, "ApiListener")
603 << "Sending replay log for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
607 if (eZone == Zone::GetLocalZone())
608 UpdateObjectAuthority();
610 Log(LogInformation, "ApiListener")
611 << "Finished sending replay log for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
612 } catch (const std::exception& ex) {
614 ObjectLock olock2(endpoint);
615 endpoint->SetSyncing(false);
618 Log(LogCritical, "ApiListener")
619 << "Error while syncing endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex, false);
621 Log(LogDebug, "ApiListener")
622 << "Error while syncing endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex);
625 Log(LogInformation, "ApiListener")
626 << "Finished syncing endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
629 void ApiListener::ApiTimerHandler(void)
631 double now = Utility::GetTime();
633 std::vector<int> files;
634 Utility::Glob(GetApiDir() + "log/*", std::bind(&ApiListener::LogGlobHandler, std::ref(files), _1), GlobFile);
635 std::sort(files.begin(), files.end());
637 for (int ts : files) {
640 for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) {
641 if (endpoint == GetLocalEndpoint())
644 if (endpoint->GetLogDuration() >= 0 && ts < now - endpoint->GetLogDuration())
647 if (ts > endpoint->GetLocalLogPosition()) {
654 String path = GetApiDir() + "log/" + Convert::ToString(ts);
655 Log(LogNotice, "ApiListener")
656 << "Removing old log file: " << path;
657 (void)unlink(path.CStr());
661 for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) {
662 if (!endpoint->GetConnected())
665 double ts = endpoint->GetRemoteLogPosition();
670 Dictionary::Ptr lparams = new Dictionary();
671 lparams->Set("log_position", ts);
673 Dictionary::Ptr lmessage = new Dictionary();
674 lmessage->Set("jsonrpc", "2.0");
675 lmessage->Set("method", "log::SetLogPosition");
676 lmessage->Set("params", lparams);
680 for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
681 if (client->GetTimestamp() > maxTs)
682 maxTs = client->GetTimestamp();
685 for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
686 if (client->GetTimestamp() != maxTs)
687 client->Disconnect();
689 client->SendMessage(lmessage);
692 Log(LogNotice, "ApiListener")
693 << "Setting log position for identity '" << endpoint->GetName() << "': "
694 << Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", ts);
698 void ApiListener::ApiReconnectTimerHandler(void)
700 Zone::Ptr my_zone = Zone::GetLocalZone();
702 for (const Zone::Ptr& zone : ConfigType::GetObjectsByType<Zone>()) {
703 /* don't connect to global zones */
704 if (zone->GetGlobal())
707 /* only connect to endpoints in a) the same zone b) our parent zone c) immediate child zones */
708 if (my_zone != zone && my_zone != zone->GetParent() && zone != my_zone->GetParent()) {
709 Log(LogDebug, "ApiListener")
710 << "Not connecting to Zone '" << zone->GetName()
711 << "' because it's not in the same zone, a parent or a child zone.";
715 for (const Endpoint::Ptr& endpoint : zone->GetEndpoints()) {
716 /* don't connect to ourselves */
717 if (endpoint == GetLocalEndpoint()) {
718 Log(LogDebug, "ApiListener")
719 << "Not connecting to Endpoint '" << endpoint->GetName() << "' because that's us.";
723 /* don't try to connect to endpoints which don't have a host and port */
724 if (endpoint->GetHost().IsEmpty() || endpoint->GetPort().IsEmpty()) {
725 Log(LogDebug, "ApiListener")
726 << "Not connecting to Endpoint '" << endpoint->GetName()
727 << "' because the host/port attributes are missing.";
731 /* don't try to connect if there's already a connection attempt */
732 if (endpoint->GetConnecting()) {
733 Log(LogDebug, "ApiListener")
734 << "Not connecting to Endpoint '" << endpoint->GetName()
735 << "' because we're already trying to connect to it.";
739 /* don't try to connect if we're already connected */
740 if (endpoint->GetConnected()) {
741 Log(LogDebug, "ApiListener")
742 << "Not connecting to Endpoint '" << endpoint->GetName()
743 << "' because we're already connected to it.";
747 std::thread thread(std::bind(&ApiListener::AddConnection, this, endpoint));
752 Endpoint::Ptr master = GetMaster();
755 Log(LogNotice, "ApiListener")
756 << "Current zone master: " << master->GetName();
758 std::vector<String> names;
759 for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>())
760 if (endpoint->GetConnected())
761 names.emplace_back(endpoint->GetName() + " (" + Convert::ToString(endpoint->GetClients().size()) + ")");
763 Log(LogNotice, "ApiListener")
764 << "Connected endpoints: " << Utility::NaturalJoin(names);
767 static void CleanupCertificateRequest(const String& path, double expiryTime)
771 if (lstat(path.CStr(), &statbuf) < 0)
774 struct _stat statbuf;
775 if (_stat(path.CStr(), &statbuf) < 0)
779 if (statbuf.st_mtime < expiryTime)
780 (void) unlink(path.CStr());
783 void ApiListener::CleanupCertificateRequestsTimerHandler(void)
785 String requestsDir = GetCertificateRequestsDir();
787 if (Utility::PathExists(requestsDir)) {
788 /* remove certificate requests that are older than a week */
789 double expiryTime = Utility::GetTime() - 7 * 24 * 60 * 60;
790 Utility::Glob(requestsDir + "/*.json", std::bind(&CleanupCertificateRequest, _1, expiryTime), GlobFile);
794 void ApiListener::RelayMessage(const MessageOrigin::Ptr& origin,
795 const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
800 m_RelayQueue.Enqueue(std::bind(&ApiListener::SyncRelayMessage, this, origin, secobj, message, log), PriorityNormal, true);
803 void ApiListener::PersistMessage(const Dictionary::Ptr& message, const ConfigObject::Ptr& secobj)
805 double ts = message->Get("ts");
809 Dictionary::Ptr pmessage = new Dictionary();
810 pmessage->Set("timestamp", ts);
812 pmessage->Set("message", JsonEncode(message));
815 Dictionary::Ptr secname = new Dictionary();
816 secname->Set("type", secobj->GetReflectionType()->GetName());
817 secname->Set("name", secobj->GetName());
818 pmessage->Set("secobj", secname);
821 boost::mutex::scoped_lock lock(m_LogLock);
823 NetString::WriteStringToStream(m_LogFile, JsonEncode(pmessage));
825 SetLogMessageTimestamp(ts);
827 if (m_LogMessageCount > 50000) {
835 void ApiListener::SyncSendMessage(const Endpoint::Ptr& endpoint, const Dictionary::Ptr& message)
837 ObjectLock olock(endpoint);
839 if (!endpoint->GetSyncing()) {
840 Log(LogNotice, "ApiListener")
841 << "Sending message '" << message->Get("method") << "' to '" << endpoint->GetName() << "'";
845 for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
846 if (client->GetTimestamp() > maxTs)
847 maxTs = client->GetTimestamp();
850 for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
851 if (client->GetTimestamp() != maxTs)
854 client->SendMessage(message);
859 bool ApiListener::RelayMessageOne(const Zone::Ptr& targetZone, const MessageOrigin::Ptr& origin, const Dictionary::Ptr& message, const Endpoint::Ptr& currentMaster)
863 Zone::Ptr myZone = Zone::GetLocalZone();
865 /* only relay the message to a) the same zone, b) the parent zone and c) direct child zones. Exception is a global zone. */
866 if (!targetZone->GetGlobal() &&
867 targetZone != myZone &&
868 targetZone != myZone->GetParent() &&
869 targetZone->GetParent() != myZone) {
873 Endpoint::Ptr myEndpoint = GetLocalEndpoint();
875 std::vector<Endpoint::Ptr> skippedEndpoints;
877 bool relayed = false, log_needed = false, log_done = false;
879 std::set<Endpoint::Ptr> targetEndpoints;
881 if (targetZone->GetGlobal()) {
882 targetEndpoints = myZone->GetEndpoints();
884 for (const Zone::Ptr& zone : ConfigType::GetObjectsByType<Zone>()) {
885 /* Fetch immediate child zone members */
886 if (zone->GetParent() == myZone) {
887 std::set<Endpoint::Ptr> endpoints = zone->GetEndpoints();
888 targetEndpoints.insert(endpoints.begin(), endpoints.end());
892 targetEndpoints = targetZone->GetEndpoints();
895 for (const Endpoint::Ptr& endpoint : targetEndpoints) {
896 /* don't relay messages to ourselves */
897 if (endpoint == GetLocalEndpoint())
902 /* don't relay messages to disconnected endpoints */
903 if (!endpoint->GetConnected()) {
904 if (targetZone == myZone)
912 /* don't relay the message to the zone through more than one endpoint unless this is our own zone */
913 if (relayed && targetZone != myZone) {
914 skippedEndpoints.push_back(endpoint);
918 /* don't relay messages back to the endpoint which we got the message from */
919 if (origin && origin->FromClient && endpoint == origin->FromClient->GetEndpoint()) {
920 skippedEndpoints.push_back(endpoint);
924 /* don't relay messages back to the zone which we got the message from */
925 if (origin && origin->FromZone && targetZone == origin->FromZone) {
926 skippedEndpoints.push_back(endpoint);
930 /* only relay message to the master if we're not currently the master */
931 if (currentMaster != myEndpoint && currentMaster != endpoint) {
932 skippedEndpoints.push_back(endpoint);
938 SyncSendMessage(endpoint, message);
941 if (!skippedEndpoints.empty()) {
942 double ts = message->Get("ts");
944 for (const Endpoint::Ptr& endpoint : skippedEndpoints)
945 endpoint->SetLocalLogPosition(ts);
948 return !log_needed || log_done;
951 void ApiListener::SyncRelayMessage(const MessageOrigin::Ptr& origin,
952 const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
954 double ts = Utility::GetTime();
955 message->Set("ts", ts);
957 Log(LogNotice, "ApiListener")
958 << "Relaying '" << message->Get("method") << "' message";
960 if (origin && origin->FromZone)
961 message->Set("originZone", origin->FromZone->GetName());
963 Zone::Ptr target_zone;
966 if (secobj->GetReflectionType() == Zone::TypeInstance)
967 target_zone = static_pointer_cast<Zone>(secobj);
969 target_zone = static_pointer_cast<Zone>(secobj->GetZone());
973 target_zone = Zone::GetLocalZone();
975 Endpoint::Ptr master = GetMaster();
977 bool need_log = !RelayMessageOne(target_zone, origin, message, master);
979 for (const Zone::Ptr& zone : target_zone->GetAllParents()) {
980 if (!RelayMessageOne(zone, origin, message, master))
985 PersistMessage(message, secobj);
988 /* must hold m_LogLock */
989 void ApiListener::OpenLogFile(void)
991 String path = GetApiDir() + "log/current";
993 Utility::MkDirP(Utility::DirName(path), 0750);
995 std::fstream *fp = new std::fstream(path.CStr(), std::fstream::out | std::ofstream::app);
998 Log(LogWarning, "ApiListener")
999 << "Could not open spool file: " << path;
1003 m_LogFile = new StdioStream(fp, true);
1004 m_LogMessageCount = 0;
1005 SetLogMessageTimestamp(Utility::GetTime());
1008 /* must hold m_LogLock */
1009 void ApiListener::CloseLogFile(void)
1018 /* must hold m_LogLock */
1019 void ApiListener::RotateLogFile(void)
1021 double ts = GetLogMessageTimestamp();
1024 ts = Utility::GetTime();
1026 String oldpath = GetApiDir() + "log/current";
1027 String newpath = GetApiDir() + "log/" + Convert::ToString(static_cast<int>(ts)+1);
1028 (void) rename(oldpath.CStr(), newpath.CStr());
1031 void ApiListener::LogGlobHandler(std::vector<int>& files, const String& file)
1033 String name = Utility::BaseName(file);
1035 if (name == "current")
1041 ts = Convert::ToLong(name);
1042 } catch (const std::exception&) {
1046 files.push_back(ts);
1049 void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
1051 Endpoint::Ptr endpoint = client->GetEndpoint();
1053 if (endpoint->GetLogDuration() == 0) {
1054 ObjectLock olock2(endpoint);
1055 endpoint->SetSyncing(false);
1059 CONTEXT("Replaying log for Endpoint '" + endpoint->GetName() + "'");
1062 double peer_ts = endpoint->GetLocalLogPosition();
1063 double logpos_ts = peer_ts;
1064 bool last_sync = false;
1066 Endpoint::Ptr target_endpoint = client->GetEndpoint();
1067 ASSERT(target_endpoint);
1069 Zone::Ptr target_zone = target_endpoint->GetZone();
1072 ObjectLock olock2(endpoint);
1073 endpoint->SetSyncing(false);
1078 boost::mutex::scoped_lock lock(m_LogLock);
1083 if (count == -1 || count > 50000) {
1092 std::vector<int> files;
1093 Utility::Glob(GetApiDir() + "log/*", std::bind(&ApiListener::LogGlobHandler, std::ref(files), _1), GlobFile);
1094 std::sort(files.begin(), files.end());
1096 for (int ts : files) {
1097 String path = GetApiDir() + "log/" + Convert::ToString(ts);
1102 Log(LogNotice, "ApiListener")
1103 << "Replaying log: " << path;
1105 std::fstream *fp = new std::fstream(path.CStr(), std::fstream::in | std::fstream::binary);
1106 StdioStream::Ptr logStream = new StdioStream(fp, true);
1109 StreamReadContext src;
1111 Dictionary::Ptr pmessage;
1114 StreamReadStatus srs = NetString::ReadStringFromStream(logStream, &message, src);
1116 if (srs == StatusEof)
1119 if (srs != StatusNewItem)
1122 pmessage = JsonDecode(message);
1123 } catch (const std::exception&) {
1124 Log(LogWarning, "ApiListener")
1125 << "Unexpected end-of-file for cluster log: " << path;
1127 /* Log files may be incomplete or corrupted. This is perfectly OK. */
1131 if (pmessage->Get("timestamp") <= peer_ts)
1134 Dictionary::Ptr secname = pmessage->Get("secobj");
1137 ConfigObject::Ptr secobj = ConfigObject::GetObject(secname->Get("type"), secname->Get("name"));
1142 if (!target_zone->CanAccessObject(secobj))
1147 size_t bytesSent = NetString::WriteStringToStream(client->GetStream(), pmessage->Get("message"));
1148 endpoint->AddMessageSent(bytesSent);
1150 } catch (const std::exception& ex) {
1151 Log(LogWarning, "ApiListener")
1152 << "Error while replaying log for endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex, false);
1154 Log(LogDebug, "ApiListener")
1155 << "Error while replaying log for endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex);
1160 peer_ts = pmessage->Get("timestamp");
1162 if (ts > logpos_ts + 10) {
1165 Dictionary::Ptr lparams = new Dictionary();
1166 lparams->Set("log_position", logpos_ts);
1168 Dictionary::Ptr lmessage = new Dictionary();
1169 lmessage->Set("jsonrpc", "2.0");
1170 lmessage->Set("method", "log::SetLogPosition");
1171 lmessage->Set("params", lparams);
1173 size_t bytesSent = JsonRpc::SendMessage(client->GetStream(), lmessage);
1174 endpoint->AddMessageSent(bytesSent);
1182 Log(LogInformation, "ApiListener")
1183 << "Replayed " << count << " messages.";
1186 Log(LogNotice, "ApiListener")
1187 << "Replayed " << count << " messages.";
1191 ObjectLock olock2(endpoint);
1192 endpoint->SetSyncing(false);
1202 void ApiListener::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
1204 std::pair<Dictionary::Ptr, Dictionary::Ptr> stats;
1206 ApiListener::Ptr listener = ApiListener::GetInstance();
1211 stats = listener->GetStatus();
1213 ObjectLock olock(stats.second);
1214 for (const Dictionary::Pair& kv : stats.second)
1215 perfdata->Add(new PerfdataValue("api_" + kv.first, kv.second));
1217 status->Set("api", stats.first);
1220 std::pair<Dictionary::Ptr, Dictionary::Ptr> ApiListener::GetStatus(void)
1222 Dictionary::Ptr status = new Dictionary();
1223 Dictionary::Ptr perfdata = new Dictionary();
1226 status->Set("identity", GetIdentity());
1228 double allEndpoints = 0;
1229 Array::Ptr allNotConnectedEndpoints = new Array();
1230 Array::Ptr allConnectedEndpoints = new Array();
1232 Zone::Ptr my_zone = Zone::GetLocalZone();
1234 Dictionary::Ptr connectedZones = new Dictionary();
1236 for (const Zone::Ptr& zone : ConfigType::GetObjectsByType<Zone>()) {
1237 /* only check endpoints in a) the same zone b) our parent zone c) immediate child zones */
1238 if (my_zone != zone && my_zone != zone->GetParent() && zone != my_zone->GetParent()) {
1239 Log(LogDebug, "ApiListener")
1240 << "Not checking connection to Zone '" << zone->GetName() << "' because it's not in the same zone, a parent or a child zone.";
1244 bool zoneConnected = false;
1245 int countZoneEndpoints = 0;
1248 Array::Ptr zoneEndpoints = new Array();
1250 for (const Endpoint::Ptr& endpoint : zone->GetEndpoints()) {
1251 zoneEndpoints->Add(endpoint->GetName());
1253 if (endpoint->GetName() == GetIdentity())
1256 double eplag = CalculateZoneLag(endpoint);
1258 if (eplag > 0 && eplag > zoneLag)
1262 countZoneEndpoints++;
1264 if (!endpoint->GetConnected()) {
1265 allNotConnectedEndpoints->Add(endpoint->GetName());
1267 allConnectedEndpoints->Add(endpoint->GetName());
1268 zoneConnected = true;
1272 /* if there's only one endpoint inside the zone, we're not connected - that's us, fake it */
1273 if (zone->GetEndpoints().size() == 1 && countZoneEndpoints == 0)
1274 zoneConnected = true;
1276 Dictionary::Ptr zoneStats = new Dictionary();
1277 zoneStats->Set("connected", zoneConnected);
1278 zoneStats->Set("client_log_lag", zoneLag);
1279 zoneStats->Set("endpoints", zoneEndpoints);
1281 String parentZoneName;
1282 Zone::Ptr parentZone = zone->GetParent();
1284 parentZoneName = parentZone->GetName();
1286 zoneStats->Set("parent_zone", parentZoneName);
1288 connectedZones->Set(zone->GetName(), zoneStats);
1291 status->Set("num_endpoints", allEndpoints);
1292 status->Set("num_conn_endpoints", allConnectedEndpoints->GetLength());
1293 status->Set("num_not_conn_endpoints", allNotConnectedEndpoints->GetLength());
1294 status->Set("conn_endpoints", allConnectedEndpoints);
1295 status->Set("not_conn_endpoints", allNotConnectedEndpoints);
1297 status->Set("zones", connectedZones);
1299 /* connection stats */
1300 size_t jsonRpcClients = GetAnonymousClients().size();
1301 size_t httpClients = GetHttpClients().size();
1302 size_t workQueueItems = JsonRpcConnection::GetWorkQueueLength();
1303 size_t workQueueCount = JsonRpcConnection::GetWorkQueueCount();
1304 size_t syncQueueItems = m_SyncQueue.GetLength();
1305 size_t relayQueueItems = m_RelayQueue.GetLength();
1306 double workQueueItemRate = JsonRpcConnection::GetWorkQueueRate();
1307 double syncQueueItemRate = m_SyncQueue.GetTaskCount(60) / 60.0;
1308 double relayQueueItemRate = m_RelayQueue.GetTaskCount(60) / 60.0;
1310 Dictionary::Ptr jsonRpc = new Dictionary();
1311 jsonRpc->Set("clients", jsonRpcClients);
1312 jsonRpc->Set("work_queue_items", workQueueItems);
1313 jsonRpc->Set("work_queue_count", workQueueCount);
1314 jsonRpc->Set("sync_queue_items", syncQueueItems);
1315 jsonRpc->Set("relay_queue_items", relayQueueItems);
1317 jsonRpc->Set("work_queue_item_rate", workQueueItemRate);
1318 jsonRpc->Set("sync_queue_item_rate", syncQueueItemRate);
1319 jsonRpc->Set("relay_queue_item_rate", relayQueueItemRate);
1321 Dictionary::Ptr http = new Dictionary();
1322 http->Set("clients", httpClients);
1324 status->Set("json_rpc", jsonRpc);
1325 status->Set("http", http);
1327 /* performance data */
1328 perfdata->Set("num_endpoints", allEndpoints);
1329 perfdata->Set("num_conn_endpoints", Convert::ToDouble(allConnectedEndpoints->GetLength()));
1330 perfdata->Set("num_not_conn_endpoints", Convert::ToDouble(allNotConnectedEndpoints->GetLength()));
1332 perfdata->Set("num_json_rpc_clients", jsonRpcClients);
1333 perfdata->Set("num_http_clients", httpClients);
1334 perfdata->Set("num_json_rpc_work_queue_items", workQueueItems);
1335 perfdata->Set("num_json_rpc_work_queue_count", workQueueCount);
1336 perfdata->Set("num_json_rpc_sync_queue_items", syncQueueItems);
1337 perfdata->Set("num_json_rpc_relay_queue_items", relayQueueItems);
1339 perfdata->Set("num_json_rpc_work_queue_item_rate", workQueueItemRate);
1340 perfdata->Set("num_json_rpc_sync_queue_item_rate", syncQueueItemRate);
1341 perfdata->Set("num_json_rpc_relay_queue_item_rate", relayQueueItemRate);
1343 return std::make_pair(status, perfdata);
1346 double ApiListener::CalculateZoneLag(const Endpoint::Ptr& endpoint)
1348 double remoteLogPosition = endpoint->GetRemoteLogPosition();
1349 double eplag = Utility::GetTime() - remoteLogPosition;
1351 if ((endpoint->GetSyncing() || !endpoint->GetConnected()) && remoteLogPosition != 0)
1357 void ApiListener::AddAnonymousClient(const JsonRpcConnection::Ptr& aclient)
1359 boost::mutex::scoped_lock lock(m_AnonymousClientsLock);
1360 m_AnonymousClients.insert(aclient);
1363 void ApiListener::RemoveAnonymousClient(const JsonRpcConnection::Ptr& aclient)
1365 boost::mutex::scoped_lock lock(m_AnonymousClientsLock);
1366 m_AnonymousClients.erase(aclient);
1369 std::set<JsonRpcConnection::Ptr> ApiListener::GetAnonymousClients(void) const
1371 boost::mutex::scoped_lock lock(m_AnonymousClientsLock);
1372 return m_AnonymousClients;
1375 void ApiListener::AddHttpClient(const HttpServerConnection::Ptr& aclient)
1377 boost::mutex::scoped_lock lock(m_HttpClientsLock);
1378 m_HttpClients.insert(aclient);
1381 void ApiListener::RemoveHttpClient(const HttpServerConnection::Ptr& aclient)
1383 boost::mutex::scoped_lock lock(m_HttpClientsLock);
1384 m_HttpClients.erase(aclient);
1387 std::set<HttpServerConnection::Ptr> ApiListener::GetHttpClients(void) const
1389 boost::mutex::scoped_lock lock(m_HttpClientsLock);
1390 return m_HttpClients;
1393 Value ApiListener::HelloAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params)
1398 Endpoint::Ptr ApiListener::GetLocalEndpoint(void) const
1400 return m_LocalEndpoint;
1403 void ApiListener::ValidateTlsProtocolmin(const String& value, const ValidationUtils& utils)
1405 ObjectImpl<ApiListener>::ValidateTlsProtocolmin(value, utils);
1407 if (value != SSL_TXT_TLSV1
1408 #ifdef SSL_TXT_TLSV1_1
1409 && value != SSL_TXT_TLSV1_1 &&
1410 value != SSL_TXT_TLSV1_2
1411 #endif /* SSL_TXT_TLSV1_1 */
1413 String message = "Invalid TLS version. Must be one of '" SSL_TXT_TLSV1 "'";
1414 #ifdef SSL_TXT_TLSV1_1
1415 message += ", '" SSL_TXT_TLSV1_1 "' or '" SSL_TXT_TLSV1_2 "'";
1416 #endif /* SSL_TXT_TLSV1_1 */
1418 BOOST_THROW_EXCEPTION(ValidationError(this, { "tls_protocolmin" }, message));
1422 bool ApiListener::IsHACluster(void)
1424 Zone::Ptr zone = Zone::GetLocalZone();
1429 return zone->IsSingleInstance();
1432 /* Provide a helper function for zone origin name. */
1433 String ApiListener::GetFromZoneName(const Zone::Ptr& fromZone)
1435 String fromZoneName;
1438 fromZoneName = fromZone->GetName();
1440 Zone::Ptr lzone = Zone::GetLocalZone();
1443 fromZoneName = lzone->GetName();
1446 return fromZoneName;