1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
3 #include "remote/apilistener.hpp"
4 #include "remote/apilistener-ti.cpp"
5 #include "remote/jsonrpcconnection.hpp"
6 #include "remote/endpoint.hpp"
7 #include "remote/jsonrpc.hpp"
8 #include "remote/apifunction.hpp"
9 #include "base/convert.hpp"
10 #include "base/io-engine.hpp"
11 #include "base/netstring.hpp"
12 #include "base/json.hpp"
13 #include "base/configtype.hpp"
14 #include "base/logger.hpp"
15 #include "base/objectlock.hpp"
16 #include "base/stdiostream.hpp"
17 #include "base/perfdatavalue.hpp"
18 #include "base/application.hpp"
19 #include "base/context.hpp"
20 #include "base/statsfunction.hpp"
21 #include "base/exception.hpp"
22 #include <boost/asio/ip/tcp.hpp>
23 #include <boost/asio/ip/v6_only.hpp>
24 #include <boost/asio/spawn.hpp>
25 #include <boost/asio/ssl/context.hpp>
30 using namespace icinga;
32 REGISTER_TYPE(ApiListener);
34 boost::signals2::signal<void(bool)> ApiListener::OnMasterChanged;
35 ApiListener::Ptr ApiListener::m_Instance;
37 REGISTER_STATSFUNCTION(ApiListener, &ApiListener::StatsFunc);
39 REGISTER_APIFUNCTION(Hello, icinga, &ApiListener::HelloAPIHandler);
41 ApiListener::ApiListener()
43 m_RelayQueue.SetName("ApiListener, RelayQueue");
44 m_SyncQueue.SetName("ApiListener, SyncQueue");
47 String ApiListener::GetApiDir()
49 return Configuration::DataDir + "/api/";
52 String ApiListener::GetCertsDir()
54 return Configuration::DataDir + "/certs/";
57 String ApiListener::GetCaDir()
59 return Configuration::DataDir + "/ca/";
62 String ApiListener::GetCertificateRequestsDir()
64 return Configuration::DataDir + "/certificate-requests/";
67 String ApiListener::GetDefaultCertPath()
69 return GetCertsDir() + "/" + ScriptGlobal::Get("NodeName") + ".crt";
72 String ApiListener::GetDefaultKeyPath()
74 return GetCertsDir() + "/" + ScriptGlobal::Get("NodeName") + ".key";
77 String ApiListener::GetDefaultCaPath()
79 return GetCertsDir() + "/ca.crt";
82 double ApiListener::GetTlsHandshakeTimeout() const
84 return Configuration::TlsHandshakeTimeout;
87 void ApiListener::SetTlsHandshakeTimeout(double value, bool suppress_events, const Value& cookie)
89 Configuration::TlsHandshakeTimeout = value;
92 void ApiListener::CopyCertificateFile(const String& oldCertPath, const String& newCertPath)
96 if (!oldCertPath.IsEmpty() && stat(oldCertPath.CStr(), &st1) >= 0 && (stat(newCertPath.CStr(), &st2) < 0 || st1.st_mtime > st2.st_mtime)) {
97 Log(LogWarning, "ApiListener")
98 << "Copying '" << oldCertPath << "' certificate file to '" << newCertPath << "'";
100 Utility::MkDirP(Utility::DirName(newCertPath), 0700);
101 Utility::CopyFile(oldCertPath, newCertPath);
106 * Returns the API thread pool.
108 * @returns The API thread pool.
110 ThreadPool& ApiListener::GetTP()
112 static ThreadPool tp;
116 void ApiListener::EnqueueAsyncCallback(const std::function<void ()>& callback, SchedulerPolicy policy)
118 GetTP().Post(callback, policy);
121 void ApiListener::OnConfigLoaded()
124 BOOST_THROW_EXCEPTION(ScriptError("Only one ApiListener object is allowed.", GetDebugInfo()));
128 String defaultCertPath = GetDefaultCertPath();
129 String defaultKeyPath = GetDefaultKeyPath();
130 String defaultCaPath = GetDefaultCaPath();
132 /* Migrate certificate location < 2.8 to the new default path. */
133 String oldCertPath = GetCertPath();
134 String oldKeyPath = GetKeyPath();
135 String oldCaPath = GetCaPath();
137 CopyCertificateFile(oldCertPath, defaultCertPath);
138 CopyCertificateFile(oldKeyPath, defaultKeyPath);
139 CopyCertificateFile(oldCaPath, defaultCaPath);
141 if (!oldCertPath.IsEmpty() && !oldKeyPath.IsEmpty() && !oldCaPath.IsEmpty()) {
142 Log(LogWarning, "ApiListener", "Please read the upgrading documentation for v2.8: https://icinga.com/docs/icinga2/latest/doc/16-upgrading-icinga-2/");
145 /* set up SSL context */
146 std::shared_ptr<X509> cert;
148 cert = GetX509Certificate(defaultCertPath);
149 } catch (const std::exception&) {
150 BOOST_THROW_EXCEPTION(ScriptError("Cannot get certificate from cert path: '"
151 + defaultCertPath + "'.", GetDebugInfo()));
155 SetIdentity(GetCertificateCN(cert));
156 } catch (const std::exception&) {
157 BOOST_THROW_EXCEPTION(ScriptError("Cannot get certificate common name from cert path: '"
158 + defaultCertPath + "'.", GetDebugInfo()));
161 Log(LogInformation, "ApiListener")
162 << "My API identity: " << GetIdentity();
167 void ApiListener::UpdateSSLContext()
169 namespace ssl = boost::asio::ssl;
171 std::shared_ptr<ssl::context> context;
174 context = MakeAsioSslContext(GetDefaultCertPath(), GetDefaultKeyPath(), GetDefaultCaPath());
175 } catch (const std::exception&) {
176 BOOST_THROW_EXCEPTION(ScriptError("Cannot make SSL context for cert path: '"
177 + GetDefaultCertPath() + "' key path: '" + GetDefaultKeyPath() + "' ca path: '" + GetDefaultCaPath() + "'.", GetDebugInfo()));
180 if (!GetCrlPath().IsEmpty()) {
182 AddCRLToSSLContext(context, GetCrlPath());
183 } catch (const std::exception&) {
184 BOOST_THROW_EXCEPTION(ScriptError("Cannot add certificate revocation list to SSL context for crl path: '"
185 + GetCrlPath() + "'.", GetDebugInfo()));
189 if (!GetCipherList().IsEmpty()) {
191 SetCipherListToSSLContext(context, GetCipherList());
192 } catch (const std::exception&) {
193 BOOST_THROW_EXCEPTION(ScriptError("Cannot set cipher list to SSL context for cipher list: '"
194 + GetCipherList() + "'.", GetDebugInfo()));
198 if (!GetTlsProtocolmin().IsEmpty()){
200 SetTlsProtocolminToSSLContext(context, GetTlsProtocolmin());
201 } catch (const std::exception&) {
202 BOOST_THROW_EXCEPTION(ScriptError("Cannot set minimum TLS protocol version to SSL context with tls_protocolmin: '" + GetTlsProtocolmin() + "'.", GetDebugInfo()));
206 m_SSLContext = context;
208 for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) {
209 for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
210 client->Disconnect();
214 for (const JsonRpcConnection::Ptr& client : m_AnonymousClients) {
215 client->Disconnect();
219 void ApiListener::OnAllConfigLoaded()
221 m_LocalEndpoint = Endpoint::GetByName(GetIdentity());
223 if (!m_LocalEndpoint)
224 BOOST_THROW_EXCEPTION(ScriptError("Endpoint object for '" + GetIdentity() + "' is missing.", GetDebugInfo()));
228 * Starts the component.
230 void ApiListener::Start(bool runtimeCreated)
232 Log(LogInformation, "ApiListener")
233 << "'" << GetName() << "' started.";
237 ObjectImpl<ApiListener>::Start(runtimeCreated);
240 boost::mutex::scoped_lock lock(m_LogLock);
245 /* create the primary JSON-RPC listener */
246 if (!AddListener(GetBindHost(), GetBindPort())) {
247 Log(LogCritical, "ApiListener")
248 << "Cannot add listener on host '" << GetBindHost() << "' for port '" << GetBindPort() << "'.";
249 Application::Exit(EXIT_FAILURE);
252 m_Timer = new Timer();
253 m_Timer->OnTimerExpired.connect(std::bind(&ApiListener::ApiTimerHandler, this));
254 m_Timer->SetInterval(5);
256 m_Timer->Reschedule(0);
258 m_ReconnectTimer = new Timer();
259 m_ReconnectTimer->OnTimerExpired.connect(std::bind(&ApiListener::ApiReconnectTimerHandler, this));
260 m_ReconnectTimer->SetInterval(10);
261 m_ReconnectTimer->Start();
262 m_ReconnectTimer->Reschedule(0);
264 /* Keep this in relative sync with the cold startup in UpdateObjectAuthority() and the reconnect interval above.
265 * Previous: 60s reconnect, 30s OA, 60s cold startup.
266 * Now: 10s reconnect, 10s OA, 30s cold startup.
268 m_AuthorityTimer = new Timer();
269 m_AuthorityTimer->OnTimerExpired.connect(std::bind(&ApiListener::UpdateObjectAuthority));
270 m_AuthorityTimer->SetInterval(10);
271 m_AuthorityTimer->Start();
273 m_CleanupCertificateRequestsTimer = new Timer();
274 m_CleanupCertificateRequestsTimer->OnTimerExpired.connect(std::bind(&ApiListener::CleanupCertificateRequestsTimerHandler, this));
275 m_CleanupCertificateRequestsTimer->SetInterval(3600);
276 m_CleanupCertificateRequestsTimer->Start();
277 m_CleanupCertificateRequestsTimer->Reschedule(0);
279 OnMasterChanged(true);
282 void ApiListener::Stop(bool runtimeDeleted)
284 ObjectImpl<ApiListener>::Stop(runtimeDeleted);
286 Log(LogInformation, "ApiListener")
287 << "'" << GetName() << "' stopped.";
290 boost::mutex::scoped_lock lock(m_LogLock);
297 ApiListener::Ptr ApiListener::GetInstance()
302 Endpoint::Ptr ApiListener::GetMaster() const
304 Zone::Ptr zone = Zone::GetLocalZone();
309 std::vector<String> names;
311 for (const Endpoint::Ptr& endpoint : zone->GetEndpoints())
312 if (endpoint->GetConnected() || endpoint->GetName() == GetIdentity())
313 names.push_back(endpoint->GetName());
315 std::sort(names.begin(), names.end());
317 return Endpoint::GetByName(*names.begin());
320 bool ApiListener::IsMaster() const
322 Endpoint::Ptr master = GetMaster();
327 return master == GetLocalEndpoint();
331 * Creates a new JSON-RPC listener on the specified port.
333 * @param node The host the listener should be bound to.
334 * @param service The port to listen on.
336 bool ApiListener::AddListener(const String& node, const String& service)
338 namespace asio = boost::asio;
339 namespace ip = asio::ip;
342 ObjectLock olock(this);
344 auto sslContext (m_SSLContext);
347 Log(LogCritical, "ApiListener", "SSL context is required for AddListener()");
351 auto& io (IoEngine::Get().GetIoService());
352 auto acceptor (std::make_shared<tcp::acceptor>(io));
355 tcp::resolver resolver (io);
356 tcp::resolver::query query (node, service, tcp::resolver::query::passive);
357 auto endpoint (resolver.resolve(query)->endpoint());
359 acceptor->open(endpoint.protocol());
360 acceptor->set_option(ip::v6_only(false));
361 acceptor->set_option(tcp::acceptor::reuse_address(true));
362 acceptor->bind(endpoint);
363 } catch (const std::exception&) {
364 Log(LogCritical, "ApiListener")
365 << "Cannot bind TCP socket for host '" << node << "' on port '" << service << "'.";
369 acceptor->listen(INT_MAX);
371 auto localEndpoint (acceptor->local_endpoint());
373 Log(LogInformation, "ApiListener")
374 << "Started new listener on '[" << localEndpoint.address() << "]:" << localEndpoint.port() << "'";
376 asio::spawn(io, [acceptor](asio::yield_context yc) {
380 UpdateStatusFile(localEndpoint);
386 * Creates a new JSON-RPC client and connects to the specified endpoint.
388 * @param endpoint The endpoint.
390 void ApiListener::AddConnection(const Endpoint::Ptr& endpoint)
393 ObjectLock olock(this);
395 auto sslContext (m_SSLContext);
398 Log(LogCritical, "ApiListener", "SSL context is required for AddConnection()");
403 String host = endpoint->GetHost();
404 String port = endpoint->GetPort();
406 Log(LogInformation, "ApiListener")
407 << "Reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'";
409 TcpSocket::Ptr client = new TcpSocket();
412 client->Connect(host, port);
414 NewClientHandler(client, endpoint->GetName(), RoleClient);
416 endpoint->SetConnecting(false);
417 Log(LogInformation, "ApiListener")
418 << "Finished reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'";
419 } catch (const std::exception& ex) {
420 endpoint->SetConnecting(false);
423 std::ostringstream info;
424 info << "Cannot connect to host '" << host << "' on port '" << port << "'";
425 Log(LogCritical, "ApiListener", info.str());
426 Log(LogDebug, "ApiListener")
427 << info.str() << "\n" << DiagnosticInformation(ex);
431 void ApiListener::NewClientHandler(const Socket::Ptr& client, const String& hostname, ConnectionRole role)
434 NewClientHandlerInternal(client, hostname, role);
435 } catch (const std::exception& ex) {
436 Log(LogCritical, "ApiListener")
437 << "Exception while handling new API client connection: " << DiagnosticInformation(ex, false);
439 Log(LogDebug, "ApiListener")
440 << "Exception while handling new API client connection: " << DiagnosticInformation(ex);
445 * Processes a new client connection.
447 * @param client The new client.
449 void ApiListener::NewClientHandlerInternal(const Socket::Ptr& client, const String& hostname, ConnectionRole role)
451 CONTEXT("Handling new API client connection");
455 if (role == RoleClient)
460 conninfo += " " + client->GetPeerAddress();
462 TlsStream::Ptr tlsStream;
464 String environmentName = Application::GetAppEnvironment();
466 String serverName = hostname;
468 if (!environmentName.IsEmpty())
469 serverName += ":" + environmentName;
472 ObjectLock olock(this);
474 tlsStream = new TlsStream(client, serverName, role, m_SSLContext);
475 } catch (const std::exception&) {
476 Log(LogCritical, "ApiListener")
477 << "Cannot create TLS stream from client connection (" << conninfo << ")";
483 tlsStream->Handshake();
484 } catch (const std::exception& ex) {
485 Log(LogCritical, "ApiListener")
486 << "Client TLS handshake failed (" << conninfo << "): " << DiagnosticInformation(ex, false);
491 std::shared_ptr<X509> cert = tlsStream->GetPeerCertificate();
493 Endpoint::Ptr endpoint;
494 bool verify_ok = false;
498 identity = GetCertificateCN(cert);
499 } catch (const std::exception&) {
500 Log(LogCritical, "ApiListener")
501 << "Cannot get certificate common name from cert path: '" << GetDefaultCertPath() << "'.";
506 verify_ok = tlsStream->IsVerifyOK();
507 if (!hostname.IsEmpty()) {
508 if (identity != hostname) {
509 Log(LogWarning, "ApiListener")
510 << "Unexpected certificate common name while connecting to endpoint '"
511 << hostname << "': got '" << identity << "'";
514 } else if (!verify_ok) {
515 Log(LogWarning, "ApiListener")
516 << "Certificate validation failed for endpoint '" << hostname
517 << "': " << tlsStream->GetVerifyError();
522 endpoint = Endpoint::GetByName(identity);
525 Log log(LogInformation, "ApiListener");
527 log << "New client connection for identity '" << identity << "' " << conninfo;
530 log << " (certificate validation failed: " << tlsStream->GetVerifyError() << ")";
532 log << " (no Endpoint object found for identity)";
535 Log(LogInformation, "ApiListener")
536 << "New client connection " << conninfo << " (no client certificate)";
541 if (role == RoleClient) {
542 Dictionary::Ptr message = new Dictionary({
543 { "jsonrpc", "2.0" },
544 { "method", "icinga::Hello" },
545 { "params", new Dictionary() }
548 JsonRpc::SendMessage(tlsStream, message);
549 ctype = ClientJsonRpc;
551 tlsStream->WaitForData(10);
553 if (!tlsStream->IsDataAvailable()) {
554 if (identity.IsEmpty())
555 Log(LogInformation, "ApiListener")
556 << "No data received on new API connection. "
557 << "Ensure that the remote endpoints are properly configured in a cluster setup.";
559 Log(LogWarning, "ApiListener")
560 << "No data received on new API connection for identity '" << identity << "'. "
561 << "Ensure that the remote endpoints are properly configured in a cluster setup.";
567 tlsStream->Peek(&firstByte, 1, false);
569 if (firstByte >= '0' && firstByte <= '9')
570 ctype = ClientJsonRpc;
575 if (ctype == ClientJsonRpc) {
576 Log(LogNotice, "ApiListener", "New JSON-RPC client");
578 JsonRpcConnection::Ptr aclient = new JsonRpcConnection(identity, verify_ok, tlsStream, role);
582 bool needSync = !endpoint->GetConnected();
584 endpoint->AddClient(aclient);
586 m_SyncQueue.Enqueue(std::bind(&ApiListener::SyncClient, this, aclient, endpoint, needSync));
588 if (!AddAnonymousClient(aclient)) {
589 Log(LogNotice, "ApiListener")
590 << "Ignoring anonymous JSON-RPC connection " << conninfo
591 << ". Max connections (" << GetMaxAnonymousClients() << ") exceeded.";
592 aclient->Disconnect();
596 Log(LogNotice, "ApiListener", "New HTTP client");
598 HttpServerConnection::Ptr aclient = new HttpServerConnection(identity, verify_ok, tlsStream);
600 AddHttpClient(aclient);
604 void ApiListener::SyncClient(const JsonRpcConnection::Ptr& aclient, const Endpoint::Ptr& endpoint, bool needSync)
606 Zone::Ptr eZone = endpoint->GetZone();
610 ObjectLock olock(endpoint);
612 endpoint->SetSyncing(true);
615 Zone::Ptr myZone = Zone::GetLocalZone();
617 if (myZone->GetParent() == eZone) {
618 Log(LogInformation, "ApiListener")
619 << "Requesting new certificate for this Icinga instance from endpoint '" << endpoint->GetName() << "'.";
621 JsonRpcConnection::SendCertificateRequest(aclient, nullptr, String());
623 if (Utility::PathExists(ApiListener::GetCertificateRequestsDir()))
624 Utility::Glob(ApiListener::GetCertificateRequestsDir() + "/*.json", std::bind(&JsonRpcConnection::SendCertificateRequest, aclient, nullptr, _1), GlobFile);
627 /* Make sure that the config updates are synced
628 * before the logs are replayed.
631 Log(LogInformation, "ApiListener")
632 << "Sending config updates for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
634 /* sync zone file config */
635 SendConfigUpdate(aclient);
637 Log(LogInformation, "ApiListener")
638 << "Finished sending config file updates for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
640 /* sync runtime config */
641 SendRuntimeConfigObjects(aclient);
643 Log(LogInformation, "ApiListener")
644 << "Finished sending runtime config updates for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
647 ObjectLock olock2(endpoint);
648 endpoint->SetSyncing(false);
652 Log(LogInformation, "ApiListener")
653 << "Sending replay log for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
657 if (eZone == Zone::GetLocalZone())
658 UpdateObjectAuthority();
660 Log(LogInformation, "ApiListener")
661 << "Finished sending replay log for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
662 } catch (const std::exception& ex) {
664 ObjectLock olock2(endpoint);
665 endpoint->SetSyncing(false);
668 Log(LogCritical, "ApiListener")
669 << "Error while syncing endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex, false);
671 Log(LogDebug, "ApiListener")
672 << "Error while syncing endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex);
675 Log(LogInformation, "ApiListener")
676 << "Finished syncing endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
679 void ApiListener::ApiTimerHandler()
681 double now = Utility::GetTime();
683 std::vector<int> files;
684 Utility::Glob(GetApiDir() + "log/*", std::bind(&ApiListener::LogGlobHandler, std::ref(files), _1), GlobFile);
685 std::sort(files.begin(), files.end());
687 for (int ts : files) {
690 for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) {
691 if (endpoint == GetLocalEndpoint())
694 if (endpoint->GetLogDuration() >= 0 && ts < now - endpoint->GetLogDuration())
697 if (ts > endpoint->GetLocalLogPosition()) {
704 String path = GetApiDir() + "log/" + Convert::ToString(ts);
705 Log(LogNotice, "ApiListener")
706 << "Removing old log file: " << path;
707 (void)unlink(path.CStr());
711 for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) {
712 if (!endpoint->GetConnected())
715 double ts = endpoint->GetRemoteLogPosition();
720 Dictionary::Ptr lmessage = new Dictionary({
721 { "jsonrpc", "2.0" },
722 { "method", "log::SetLogPosition" },
723 { "params", new Dictionary({
724 { "log_position", ts }
730 for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
731 if (client->GetTimestamp() > maxTs)
732 maxTs = client->GetTimestamp();
735 for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
736 if (client->GetTimestamp() != maxTs)
737 client->Disconnect();
739 client->SendMessage(lmessage);
742 Log(LogNotice, "ApiListener")
743 << "Setting log position for identity '" << endpoint->GetName() << "': "
744 << Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", ts);
748 void ApiListener::ApiReconnectTimerHandler()
750 Zone::Ptr my_zone = Zone::GetLocalZone();
752 for (const Zone::Ptr& zone : ConfigType::GetObjectsByType<Zone>()) {
753 /* don't connect to global zones */
754 if (zone->GetGlobal())
757 /* only connect to endpoints in a) the same zone b) our parent zone c) immediate child zones */
758 if (my_zone != zone && my_zone != zone->GetParent() && zone != my_zone->GetParent()) {
759 Log(LogDebug, "ApiListener")
760 << "Not connecting to Zone '" << zone->GetName()
761 << "' because it's not in the same zone, a parent or a child zone.";
765 for (const Endpoint::Ptr& endpoint : zone->GetEndpoints()) {
766 /* don't connect to ourselves */
767 if (endpoint == GetLocalEndpoint()) {
768 Log(LogDebug, "ApiListener")
769 << "Not connecting to Endpoint '" << endpoint->GetName() << "' because that's us.";
773 /* don't try to connect to endpoints which don't have a host and port */
774 if (endpoint->GetHost().IsEmpty() || endpoint->GetPort().IsEmpty()) {
775 Log(LogDebug, "ApiListener")
776 << "Not connecting to Endpoint '" << endpoint->GetName()
777 << "' because the host/port attributes are missing.";
781 /* don't try to connect if there's already a connection attempt */
782 if (endpoint->GetConnecting()) {
783 Log(LogDebug, "ApiListener")
784 << "Not connecting to Endpoint '" << endpoint->GetName()
785 << "' because we're already trying to connect to it.";
789 /* don't try to connect if we're already connected */
790 if (endpoint->GetConnected()) {
791 Log(LogDebug, "ApiListener")
792 << "Not connecting to Endpoint '" << endpoint->GetName()
793 << "' because we're already connected to it.";
797 /* Set connecting state to prevent duplicated queue inserts later. */
798 endpoint->SetConnecting(true);
800 /* Use dynamic thread pool with additional on demand resources with fast throughput. */
801 EnqueueAsyncCallback(std::bind(&ApiListener::AddConnection, this, endpoint), LowLatencyScheduler);
805 Endpoint::Ptr master = GetMaster();
808 Log(LogNotice, "ApiListener")
809 << "Current zone master: " << master->GetName();
811 std::vector<String> names;
812 for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>())
813 if (endpoint->GetConnected())
814 names.emplace_back(endpoint->GetName() + " (" + Convert::ToString(endpoint->GetClients().size()) + ")");
816 Log(LogNotice, "ApiListener")
817 << "Connected endpoints: " << Utility::NaturalJoin(names);
820 static void CleanupCertificateRequest(const String& path, double expiryTime)
824 if (lstat(path.CStr(), &statbuf) < 0)
827 struct _stat statbuf;
828 if (_stat(path.CStr(), &statbuf) < 0)
832 if (statbuf.st_mtime < expiryTime)
833 (void) unlink(path.CStr());
836 void ApiListener::CleanupCertificateRequestsTimerHandler()
838 String requestsDir = GetCertificateRequestsDir();
840 if (Utility::PathExists(requestsDir)) {
841 /* remove certificate requests that are older than a week */
842 double expiryTime = Utility::GetTime() - 7 * 24 * 60 * 60;
843 Utility::Glob(requestsDir + "/*.json", std::bind(&CleanupCertificateRequest, _1, expiryTime), GlobFile);
847 void ApiListener::RelayMessage(const MessageOrigin::Ptr& origin,
848 const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
853 m_RelayQueue.Enqueue(std::bind(&ApiListener::SyncRelayMessage, this, origin, secobj, message, log), PriorityNormal, true);
856 void ApiListener::PersistMessage(const Dictionary::Ptr& message, const ConfigObject::Ptr& secobj)
858 double ts = message->Get("ts");
862 Dictionary::Ptr pmessage = new Dictionary();
863 pmessage->Set("timestamp", ts);
865 pmessage->Set("message", JsonEncode(message));
868 Dictionary::Ptr secname = new Dictionary();
869 secname->Set("type", secobj->GetReflectionType()->GetName());
870 secname->Set("name", secobj->GetName());
871 pmessage->Set("secobj", secname);
874 boost::mutex::scoped_lock lock(m_LogLock);
876 NetString::WriteStringToStream(m_LogFile, JsonEncode(pmessage));
878 SetLogMessageTimestamp(ts);
880 if (m_LogMessageCount > 50000) {
888 void ApiListener::SyncSendMessage(const Endpoint::Ptr& endpoint, const Dictionary::Ptr& message)
890 ObjectLock olock(endpoint);
892 if (!endpoint->GetSyncing()) {
893 Log(LogNotice, "ApiListener")
894 << "Sending message '" << message->Get("method") << "' to '" << endpoint->GetName() << "'";
898 for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
899 if (client->GetTimestamp() > maxTs)
900 maxTs = client->GetTimestamp();
903 for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
904 if (client->GetTimestamp() != maxTs)
907 client->SendMessage(message);
912 bool ApiListener::RelayMessageOne(const Zone::Ptr& targetZone, const MessageOrigin::Ptr& origin, const Dictionary::Ptr& message, const Endpoint::Ptr& currentMaster)
916 Zone::Ptr myZone = Zone::GetLocalZone();
918 /* only relay the message to a) the same zone, b) the parent zone and c) direct child zones. Exception is a global zone. */
919 if (!targetZone->GetGlobal() &&
920 targetZone != myZone &&
921 targetZone != myZone->GetParent() &&
922 targetZone->GetParent() != myZone) {
926 Endpoint::Ptr myEndpoint = GetLocalEndpoint();
928 std::vector<Endpoint::Ptr> skippedEndpoints;
930 bool relayed = false, log_needed = false, log_done = false;
932 std::set<Endpoint::Ptr> targetEndpoints;
934 if (targetZone->GetGlobal()) {
935 targetEndpoints = myZone->GetEndpoints();
937 for (const Zone::Ptr& zone : ConfigType::GetObjectsByType<Zone>()) {
938 /* Fetch immediate child zone members */
939 if (zone->GetParent() == myZone) {
940 std::set<Endpoint::Ptr> endpoints = zone->GetEndpoints();
941 targetEndpoints.insert(endpoints.begin(), endpoints.end());
945 targetEndpoints = targetZone->GetEndpoints();
948 for (const Endpoint::Ptr& endpoint : targetEndpoints) {
949 /* don't relay messages to ourselves */
950 if (endpoint == GetLocalEndpoint())
955 /* don't relay messages to disconnected endpoints */
956 if (!endpoint->GetConnected()) {
957 if (targetZone == myZone)
965 /* don't relay the message to the zone through more than one endpoint unless this is our own zone */
966 if (relayed && targetZone != myZone) {
967 skippedEndpoints.push_back(endpoint);
971 /* don't relay messages back to the endpoint which we got the message from */
972 if (origin && origin->FromClient && endpoint == origin->FromClient->GetEndpoint()) {
973 skippedEndpoints.push_back(endpoint);
977 /* don't relay messages back to the zone which we got the message from */
978 if (origin && origin->FromZone && targetZone == origin->FromZone) {
979 skippedEndpoints.push_back(endpoint);
983 /* only relay message to the master if we're not currently the master */
984 if (currentMaster != myEndpoint && currentMaster != endpoint) {
985 skippedEndpoints.push_back(endpoint);
991 SyncSendMessage(endpoint, message);
994 if (!skippedEndpoints.empty()) {
995 double ts = message->Get("ts");
997 for (const Endpoint::Ptr& endpoint : skippedEndpoints)
998 endpoint->SetLocalLogPosition(ts);
1001 return !log_needed || log_done;
1004 void ApiListener::SyncRelayMessage(const MessageOrigin::Ptr& origin,
1005 const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
1007 double ts = Utility::GetTime();
1008 message->Set("ts", ts);
1010 Log(LogNotice, "ApiListener")
1011 << "Relaying '" << message->Get("method") << "' message";
1013 if (origin && origin->FromZone)
1014 message->Set("originZone", origin->FromZone->GetName());
1016 Zone::Ptr target_zone;
1019 if (secobj->GetReflectionType() == Zone::TypeInstance)
1020 target_zone = static_pointer_cast<Zone>(secobj);
1022 target_zone = static_pointer_cast<Zone>(secobj->GetZone());
1026 target_zone = Zone::GetLocalZone();
1028 Endpoint::Ptr master = GetMaster();
1030 bool need_log = !RelayMessageOne(target_zone, origin, message, master);
1032 for (const Zone::Ptr& zone : target_zone->GetAllParentsRaw()) {
1033 if (!RelayMessageOne(zone, origin, message, master))
1037 if (log && need_log)
1038 PersistMessage(message, secobj);
1041 /* must hold m_LogLock */
1042 void ApiListener::OpenLogFile()
1044 String path = GetApiDir() + "log/current";
1046 Utility::MkDirP(Utility::DirName(path), 0750);
1048 auto *fp = new std::fstream(path.CStr(), std::fstream::out | std::ofstream::app);
1051 Log(LogWarning, "ApiListener")
1052 << "Could not open spool file: " << path;
1056 m_LogFile = new StdioStream(fp, true);
1057 m_LogMessageCount = 0;
1058 SetLogMessageTimestamp(Utility::GetTime());
1061 /* must hold m_LogLock */
1062 void ApiListener::CloseLogFile()
1071 /* must hold m_LogLock */
1072 void ApiListener::RotateLogFile()
1074 double ts = GetLogMessageTimestamp();
1077 ts = Utility::GetTime();
1079 String oldpath = GetApiDir() + "log/current";
1080 String newpath = GetApiDir() + "log/" + Convert::ToString(static_cast<int>(ts)+1);
1084 _unlink(newpath.CStr());
1088 (void) rename(oldpath.CStr(), newpath.CStr());
1091 void ApiListener::LogGlobHandler(std::vector<int>& files, const String& file)
1093 String name = Utility::BaseName(file);
1095 if (name == "current")
1101 ts = Convert::ToLong(name);
1102 } catch (const std::exception&) {
1106 files.push_back(ts);
1109 void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
1111 Endpoint::Ptr endpoint = client->GetEndpoint();
1113 if (endpoint->GetLogDuration() == 0) {
1114 ObjectLock olock2(endpoint);
1115 endpoint->SetSyncing(false);
1119 CONTEXT("Replaying log for Endpoint '" + endpoint->GetName() + "'");
1122 double peer_ts = endpoint->GetLocalLogPosition();
1123 double logpos_ts = peer_ts;
1124 bool last_sync = false;
1126 Endpoint::Ptr target_endpoint = client->GetEndpoint();
1127 ASSERT(target_endpoint);
1129 Zone::Ptr target_zone = target_endpoint->GetZone();
1132 ObjectLock olock2(endpoint);
1133 endpoint->SetSyncing(false);
1138 boost::mutex::scoped_lock lock(m_LogLock);
1143 if (count == -1 || count > 50000) {
1152 std::vector<int> files;
1153 Utility::Glob(GetApiDir() + "log/*", std::bind(&ApiListener::LogGlobHandler, std::ref(files), _1), GlobFile);
1154 std::sort(files.begin(), files.end());
1156 for (int ts : files) {
1157 String path = GetApiDir() + "log/" + Convert::ToString(ts);
1162 Log(LogNotice, "ApiListener")
1163 << "Replaying log: " << path;
1165 auto *fp = new std::fstream(path.CStr(), std::fstream::in | std::fstream::binary);
1166 StdioStream::Ptr logStream = new StdioStream(fp, true);
1169 StreamReadContext src;
1171 Dictionary::Ptr pmessage;
1174 StreamReadStatus srs = NetString::ReadStringFromStream(logStream, &message, src);
1176 if (srs == StatusEof)
1179 if (srs != StatusNewItem)
1182 pmessage = JsonDecode(message);
1183 } catch (const std::exception&) {
1184 Log(LogWarning, "ApiListener")
1185 << "Unexpected end-of-file for cluster log: " << path;
1187 /* Log files may be incomplete or corrupted. This is perfectly OK. */
1191 if (pmessage->Get("timestamp") <= peer_ts)
1194 Dictionary::Ptr secname = pmessage->Get("secobj");
1197 ConfigObject::Ptr secobj = ConfigObject::GetObject(secname->Get("type"), secname->Get("name"));
1202 if (!target_zone->CanAccessObject(secobj))
1207 size_t bytesSent = NetString::WriteStringToStream(client->GetStream(), pmessage->Get("message"));
1208 endpoint->AddMessageSent(bytesSent);
1210 } catch (const std::exception& ex) {
1211 Log(LogWarning, "ApiListener")
1212 << "Error while replaying log for endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex, false);
1214 Log(LogDebug, "ApiListener")
1215 << "Error while replaying log for endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex);
1220 peer_ts = pmessage->Get("timestamp");
1222 if (ts > logpos_ts + 10) {
1225 Dictionary::Ptr lmessage = new Dictionary({
1226 { "jsonrpc", "2.0" },
1227 { "method", "log::SetLogPosition" },
1228 { "params", new Dictionary({
1229 { "log_position", logpos_ts }
1233 size_t bytesSent = JsonRpc::SendMessage(client->GetStream(), lmessage);
1234 endpoint->AddMessageSent(bytesSent);
1242 Log(LogInformation, "ApiListener")
1243 << "Replayed " << count << " messages.";
1246 Log(LogNotice, "ApiListener")
1247 << "Replayed " << count << " messages.";
1252 ObjectLock olock2(endpoint);
1253 endpoint->SetSyncing(false);
1263 void ApiListener::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
1265 std::pair<Dictionary::Ptr, Dictionary::Ptr> stats;
1267 ApiListener::Ptr listener = ApiListener::GetInstance();
1272 stats = listener->GetStatus();
1274 ObjectLock olock(stats.second);
1275 for (const Dictionary::Pair& kv : stats.second)
1276 perfdata->Add(new PerfdataValue("api_" + kv.first, kv.second));
1278 status->Set("api", stats.first);
1281 std::pair<Dictionary::Ptr, Dictionary::Ptr> ApiListener::GetStatus()
1283 Dictionary::Ptr perfdata = new Dictionary();
1287 double allEndpoints = 0;
1288 Array::Ptr allNotConnectedEndpoints = new Array();
1289 Array::Ptr allConnectedEndpoints = new Array();
1291 Zone::Ptr my_zone = Zone::GetLocalZone();
1293 Dictionary::Ptr connectedZones = new Dictionary();
1295 for (const Zone::Ptr& zone : ConfigType::GetObjectsByType<Zone>()) {
1296 /* only check endpoints in a) the same zone b) our parent zone c) immediate child zones */
1297 if (my_zone != zone && my_zone != zone->GetParent() && zone != my_zone->GetParent()) {
1298 Log(LogDebug, "ApiListener")
1299 << "Not checking connection to Zone '" << zone->GetName() << "' because it's not in the same zone, a parent or a child zone.";
1303 bool zoneConnected = false;
1304 int countZoneEndpoints = 0;
1307 ArrayData zoneEndpoints;
1309 for (const Endpoint::Ptr& endpoint : zone->GetEndpoints()) {
1310 zoneEndpoints.emplace_back(endpoint->GetName());
1312 if (endpoint->GetName() == GetIdentity())
1315 double eplag = CalculateZoneLag(endpoint);
1317 if (eplag > 0 && eplag > zoneLag)
1321 countZoneEndpoints++;
1323 if (!endpoint->GetConnected()) {
1324 allNotConnectedEndpoints->Add(endpoint->GetName());
1326 allConnectedEndpoints->Add(endpoint->GetName());
1327 zoneConnected = true;
1331 /* if there's only one endpoint inside the zone, we're not connected - that's us, fake it */
1332 if (zone->GetEndpoints().size() == 1 && countZoneEndpoints == 0)
1333 zoneConnected = true;
1335 String parentZoneName;
1336 Zone::Ptr parentZone = zone->GetParent();
1338 parentZoneName = parentZone->GetName();
1340 Dictionary::Ptr zoneStats = new Dictionary({
1341 { "connected", zoneConnected },
1342 { "client_log_lag", zoneLag },
1343 { "endpoints", new Array(std::move(zoneEndpoints)) },
1344 { "parent_zone", parentZoneName }
1347 connectedZones->Set(zone->GetName(), zoneStats);
1350 /* connection stats */
1351 size_t jsonRpcAnonymousClients = GetAnonymousClients().size();
1352 size_t httpClients = GetHttpClients().size();
1353 size_t workQueueItems = JsonRpcConnection::GetWorkQueueLength();
1354 size_t workQueueCount = JsonRpcConnection::GetWorkQueueCount();
1355 size_t syncQueueItems = m_SyncQueue.GetLength();
1356 size_t relayQueueItems = m_RelayQueue.GetLength();
1357 double workQueueItemRate = JsonRpcConnection::GetWorkQueueRate();
1358 double syncQueueItemRate = m_SyncQueue.GetTaskCount(60) / 60.0;
1359 double relayQueueItemRate = m_RelayQueue.GetTaskCount(60) / 60.0;
1361 Dictionary::Ptr status = new Dictionary({
1362 { "identity", GetIdentity() },
1363 { "num_endpoints", allEndpoints },
1364 { "num_conn_endpoints", allConnectedEndpoints->GetLength() },
1365 { "num_not_conn_endpoints", allNotConnectedEndpoints->GetLength() },
1366 { "conn_endpoints", allConnectedEndpoints },
1367 { "not_conn_endpoints", allNotConnectedEndpoints },
1369 { "zones", connectedZones },
1371 { "json_rpc", new Dictionary({
1372 { "anonymous_clients", jsonRpcAnonymousClients },
1373 { "work_queue_items", workQueueItems },
1374 { "work_queue_count", workQueueCount },
1375 { "sync_queue_items", syncQueueItems },
1376 { "relay_queue_items", relayQueueItems },
1377 { "work_queue_item_rate", workQueueItemRate },
1378 { "sync_queue_item_rate", syncQueueItemRate },
1379 { "relay_queue_item_rate", relayQueueItemRate }
1382 { "http", new Dictionary({
1383 { "clients", httpClients }
1387 /* performance data */
1388 perfdata->Set("num_endpoints", allEndpoints);
1389 perfdata->Set("num_conn_endpoints", Convert::ToDouble(allConnectedEndpoints->GetLength()));
1390 perfdata->Set("num_not_conn_endpoints", Convert::ToDouble(allNotConnectedEndpoints->GetLength()));
1392 perfdata->Set("num_json_rpc_anonymous_clients", jsonRpcAnonymousClients);
1393 perfdata->Set("num_http_clients", httpClients);
1394 perfdata->Set("num_json_rpc_work_queue_items", workQueueItems);
1395 perfdata->Set("num_json_rpc_work_queue_count", workQueueCount);
1396 perfdata->Set("num_json_rpc_sync_queue_items", syncQueueItems);
1397 perfdata->Set("num_json_rpc_relay_queue_items", relayQueueItems);
1399 perfdata->Set("num_json_rpc_work_queue_item_rate", workQueueItemRate);
1400 perfdata->Set("num_json_rpc_sync_queue_item_rate", syncQueueItemRate);
1401 perfdata->Set("num_json_rpc_relay_queue_item_rate", relayQueueItemRate);
1403 return std::make_pair(status, perfdata);
1406 double ApiListener::CalculateZoneLag(const Endpoint::Ptr& endpoint)
1408 double remoteLogPosition = endpoint->GetRemoteLogPosition();
1409 double eplag = Utility::GetTime() - remoteLogPosition;
1411 if ((endpoint->GetSyncing() || !endpoint->GetConnected()) && remoteLogPosition != 0)
1417 bool ApiListener::AddAnonymousClient(const JsonRpcConnection::Ptr& aclient)
1419 boost::mutex::scoped_lock lock(m_AnonymousClientsLock);
1421 if (GetMaxAnonymousClients() >= 0 && (long)m_AnonymousClients.size() + 1 > (long)GetMaxAnonymousClients())
1424 m_AnonymousClients.insert(aclient);
1428 void ApiListener::RemoveAnonymousClient(const JsonRpcConnection::Ptr& aclient)
1430 boost::mutex::scoped_lock lock(m_AnonymousClientsLock);
1431 m_AnonymousClients.erase(aclient);
1434 std::set<JsonRpcConnection::Ptr> ApiListener::GetAnonymousClients() const
1436 boost::mutex::scoped_lock lock(m_AnonymousClientsLock);
1437 return m_AnonymousClients;
1440 void ApiListener::AddHttpClient(const HttpServerConnection::Ptr& aclient)
1442 boost::mutex::scoped_lock lock(m_HttpClientsLock);
1443 m_HttpClients.insert(aclient);
1446 void ApiListener::RemoveHttpClient(const HttpServerConnection::Ptr& aclient)
1448 boost::mutex::scoped_lock lock(m_HttpClientsLock);
1449 m_HttpClients.erase(aclient);
1452 std::set<HttpServerConnection::Ptr> ApiListener::GetHttpClients() const
1454 boost::mutex::scoped_lock lock(m_HttpClientsLock);
1455 return m_HttpClients;
1458 Value ApiListener::HelloAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params)
1463 Endpoint::Ptr ApiListener::GetLocalEndpoint() const
1465 return m_LocalEndpoint;
1468 void ApiListener::ValidateTlsProtocolmin(const Lazy<String>& lvalue, const ValidationUtils& utils)
1470 ObjectImpl<ApiListener>::ValidateTlsProtocolmin(lvalue, utils);
1472 if (lvalue() != SSL_TXT_TLSV1
1473 #ifdef SSL_TXT_TLSV1_1
1474 && lvalue() != SSL_TXT_TLSV1_1 &&
1475 lvalue() != SSL_TXT_TLSV1_2
1476 #endif /* SSL_TXT_TLSV1_1 */
1478 String message = "Invalid TLS version. Must be one of '" SSL_TXT_TLSV1 "'";
1479 #ifdef SSL_TXT_TLSV1_1
1480 message += ", '" SSL_TXT_TLSV1_1 "' or '" SSL_TXT_TLSV1_2 "'";
1481 #endif /* SSL_TXT_TLSV1_1 */
1483 BOOST_THROW_EXCEPTION(ValidationError(this, { "tls_protocolmin" }, message));
1487 void ApiListener::ValidateTlsHandshakeTimeout(const Lazy<double>& lvalue, const ValidationUtils& utils)
1489 ObjectImpl<ApiListener>::ValidateTlsHandshakeTimeout(lvalue, utils);
1492 BOOST_THROW_EXCEPTION(ValidationError(this, { "tls_handshake_timeout" }, "Value must be greater than 0."));
1495 bool ApiListener::IsHACluster()
1497 Zone::Ptr zone = Zone::GetLocalZone();
1502 return zone->IsSingleInstance();
1505 /* Provide a helper function for zone origin name. */
1506 String ApiListener::GetFromZoneName(const Zone::Ptr& fromZone)
1508 String fromZoneName;
1511 fromZoneName = fromZone->GetName();
1513 Zone::Ptr lzone = Zone::GetLocalZone();
1516 fromZoneName = lzone->GetName();
1519 return fromZoneName;
1522 void ApiListener::UpdateStatusFile(boost::asio::ip::tcp::endpoint localEndpoint)
1524 String path = Configuration::CacheDir + "/api-state.json";
1526 Utility::SaveJsonFile(path, 0644, new Dictionary({
1527 {"host", String(localEndpoint.address().to_string())},
1528 {"port", localEndpoint.port()}
1532 void ApiListener::RemoveStatusFile()
1534 String path = Configuration::CacheDir + "/api-state.json";
1536 if (Utility::PathExists(path)) {
1537 if (unlink(path.CStr()) < 0 && errno != ENOENT) {
1538 BOOST_THROW_EXCEPTION(posix_error()
1539 << boost::errinfo_api_function("unlink")
1540 << boost::errinfo_errno(errno)
1541 << boost::errinfo_file_name(path));