1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
3 #include "remote/apilistener.hpp"
4 #include "remote/apilistener-ti.cpp"
5 #include "remote/jsonrpcconnection.hpp"
6 #include "remote/endpoint.hpp"
7 #include "remote/jsonrpc.hpp"
8 #include "remote/apifunction.hpp"
9 #include "remote/configpackageutility.hpp"
10 #include "remote/configobjectutility.hpp"
11 #include "base/convert.hpp"
12 #include "base/defer.hpp"
13 #include "base/io-engine.hpp"
14 #include "base/netstring.hpp"
15 #include "base/json.hpp"
16 #include "base/configtype.hpp"
17 #include "base/logger.hpp"
18 #include "base/objectlock.hpp"
19 #include "base/stdiostream.hpp"
20 #include "base/perfdatavalue.hpp"
21 #include "base/application.hpp"
22 #include "base/context.hpp"
23 #include "base/statsfunction.hpp"
24 #include "base/exception.hpp"
25 #include "base/tcpsocket.hpp"
26 #include <boost/asio/buffer.hpp>
27 #include <boost/asio/ip/tcp.hpp>
28 #include <boost/asio/spawn.hpp>
29 #include <boost/asio/ssl/context.hpp>
30 #include <boost/system/error_code.hpp>
34 #include <openssl/ssl.h>
35 #include <openssl/tls1.h>
36 #include <openssl/x509.h>
40 using namespace icinga;
42 REGISTER_TYPE(ApiListener);
44 boost::signals2::signal<void(bool)> ApiListener::OnMasterChanged;
45 ApiListener::Ptr ApiListener::m_Instance;
47 REGISTER_STATSFUNCTION(ApiListener, &ApiListener::StatsFunc);
49 REGISTER_APIFUNCTION(Hello, icinga, &ApiListener::HelloAPIHandler);
51 ApiListener::ApiListener()
53 m_RelayQueue.SetName("ApiListener, RelayQueue");
54 m_SyncQueue.SetName("ApiListener, SyncQueue");
57 String ApiListener::GetApiDir()
59 return Configuration::DataDir + "/api/";
62 String ApiListener::GetApiZonesDir()
64 return GetApiDir() + "zones/";
67 String ApiListener::GetApiZonesStageDir()
69 return GetApiDir() + "zones-stage/";
72 String ApiListener::GetCertsDir()
74 return Configuration::DataDir + "/certs/";
77 String ApiListener::GetCaDir()
79 return Configuration::DataDir + "/ca/";
82 String ApiListener::GetCertificateRequestsDir()
84 return Configuration::DataDir + "/certificate-requests/";
87 String ApiListener::GetDefaultCertPath()
89 return GetCertsDir() + "/" + ScriptGlobal::Get("NodeName") + ".crt";
92 String ApiListener::GetDefaultKeyPath()
94 return GetCertsDir() + "/" + ScriptGlobal::Get("NodeName") + ".key";
97 String ApiListener::GetDefaultCaPath()
99 return GetCertsDir() + "/ca.crt";
102 double ApiListener::GetTlsHandshakeTimeout() const
104 return Configuration::TlsHandshakeTimeout;
107 void ApiListener::SetTlsHandshakeTimeout(double value, bool suppress_events, const Value& cookie)
109 Configuration::TlsHandshakeTimeout = value;
112 void ApiListener::CopyCertificateFile(const String& oldCertPath, const String& newCertPath)
114 struct stat st1, st2;
116 if (!oldCertPath.IsEmpty() && stat(oldCertPath.CStr(), &st1) >= 0 && (stat(newCertPath.CStr(), &st2) < 0 || st1.st_mtime > st2.st_mtime)) {
117 Log(LogWarning, "ApiListener")
118 << "Copying '" << oldCertPath << "' certificate file to '" << newCertPath << "'";
120 Utility::MkDirP(Utility::DirName(newCertPath), 0700);
121 Utility::CopyFile(oldCertPath, newCertPath);
125 void ApiListener::OnConfigLoaded()
128 BOOST_THROW_EXCEPTION(ScriptError("Only one ApiListener object is allowed.", GetDebugInfo()));
132 String defaultCertPath = GetDefaultCertPath();
133 String defaultKeyPath = GetDefaultKeyPath();
134 String defaultCaPath = GetDefaultCaPath();
136 /* Migrate certificate location < 2.8 to the new default path. */
137 String oldCertPath = GetCertPath();
138 String oldKeyPath = GetKeyPath();
139 String oldCaPath = GetCaPath();
141 CopyCertificateFile(oldCertPath, defaultCertPath);
142 CopyCertificateFile(oldKeyPath, defaultKeyPath);
143 CopyCertificateFile(oldCaPath, defaultCaPath);
145 if (!oldCertPath.IsEmpty() && !oldKeyPath.IsEmpty() && !oldCaPath.IsEmpty()) {
146 Log(LogWarning, "ApiListener", "Please read the upgrading documentation for v2.8: https://icinga.com/docs/icinga2/latest/doc/16-upgrading-icinga-2/");
149 /* Create the internal API object storage. */
150 ConfigObjectUtility::CreateStorage();
152 /* Cache API packages and their active stage name. */
153 UpdateActivePackageStagesCache();
155 /* set up SSL context */
156 std::shared_ptr<X509> cert;
158 cert = GetX509Certificate(defaultCertPath);
159 } catch (const std::exception&) {
160 BOOST_THROW_EXCEPTION(ScriptError("Cannot get certificate from cert path: '"
161 + defaultCertPath + "'.", GetDebugInfo()));
165 SetIdentity(GetCertificateCN(cert));
166 } catch (const std::exception&) {
167 BOOST_THROW_EXCEPTION(ScriptError("Cannot get certificate common name from cert path: '"
168 + defaultCertPath + "'.", GetDebugInfo()));
171 Log(LogInformation, "ApiListener")
172 << "My API identity: " << GetIdentity();
177 void ApiListener::UpdateSSLContext()
179 namespace ssl = boost::asio::ssl;
181 Shared<ssl::context>::Ptr context;
184 context = MakeAsioSslContext(GetDefaultCertPath(), GetDefaultKeyPath(), GetDefaultCaPath());
185 } catch (const std::exception&) {
186 BOOST_THROW_EXCEPTION(ScriptError("Cannot make SSL context for cert path: '"
187 + GetDefaultCertPath() + "' key path: '" + GetDefaultKeyPath() + "' ca path: '" + GetDefaultCaPath() + "'.", GetDebugInfo()));
190 if (!GetCrlPath().IsEmpty()) {
192 AddCRLToSSLContext(context, GetCrlPath());
193 } catch (const std::exception&) {
194 BOOST_THROW_EXCEPTION(ScriptError("Cannot add certificate revocation list to SSL context for crl path: '"
195 + GetCrlPath() + "'.", GetDebugInfo()));
199 if (!GetCipherList().IsEmpty()) {
201 SetCipherListToSSLContext(context, GetCipherList());
202 } catch (const std::exception&) {
203 BOOST_THROW_EXCEPTION(ScriptError("Cannot set cipher list to SSL context for cipher list: '"
204 + GetCipherList() + "'.", GetDebugInfo()));
208 if (!GetTlsProtocolmin().IsEmpty()){
210 SetTlsProtocolminToSSLContext(context, GetTlsProtocolmin());
211 } catch (const std::exception&) {
212 BOOST_THROW_EXCEPTION(ScriptError("Cannot set minimum TLS protocol version to SSL context with tls_protocolmin: '" + GetTlsProtocolmin() + "'.", GetDebugInfo()));
216 m_SSLContext = context;
218 for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) {
219 for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
220 client->Disconnect();
224 for (const JsonRpcConnection::Ptr& client : m_AnonymousClients) {
225 client->Disconnect();
229 void ApiListener::OnAllConfigLoaded()
231 m_LocalEndpoint = Endpoint::GetByName(GetIdentity());
233 if (!m_LocalEndpoint)
234 BOOST_THROW_EXCEPTION(ScriptError("Endpoint object for '" + GetIdentity() + "' is missing.", GetDebugInfo()));
238 * Starts the component.
240 void ApiListener::Start(bool runtimeCreated)
242 Log(LogInformation, "ApiListener")
243 << "'" << GetName() << "' started.";
247 ObjectImpl<ApiListener>::Start(runtimeCreated);
250 boost::mutex::scoped_lock lock(m_LogLock);
254 /* create the primary JSON-RPC listener */
255 if (!AddListener(GetBindHost(), GetBindPort())) {
256 Log(LogCritical, "ApiListener")
257 << "Cannot add listener on host '" << GetBindHost() << "' for port '" << GetBindPort() << "'.";
258 Application::Exit(EXIT_FAILURE);
261 m_Timer = new Timer();
262 m_Timer->OnTimerExpired.connect(std::bind(&ApiListener::ApiTimerHandler, this));
263 m_Timer->SetInterval(5);
265 m_Timer->Reschedule(0);
267 m_ReconnectTimer = new Timer();
268 m_ReconnectTimer->OnTimerExpired.connect(std::bind(&ApiListener::ApiReconnectTimerHandler, this));
269 m_ReconnectTimer->SetInterval(10);
270 m_ReconnectTimer->Start();
271 m_ReconnectTimer->Reschedule(0);
273 /* Keep this in relative sync with the cold startup in UpdateObjectAuthority() and the reconnect interval above.
274 * Previous: 60s reconnect, 30s OA, 60s cold startup.
275 * Now: 10s reconnect, 10s OA, 30s cold startup.
277 m_AuthorityTimer = new Timer();
278 m_AuthorityTimer->OnTimerExpired.connect(std::bind(&ApiListener::UpdateObjectAuthority));
279 m_AuthorityTimer->SetInterval(10);
280 m_AuthorityTimer->Start();
282 m_CleanupCertificateRequestsTimer = new Timer();
283 m_CleanupCertificateRequestsTimer->OnTimerExpired.connect(std::bind(&ApiListener::CleanupCertificateRequestsTimerHandler, this));
284 m_CleanupCertificateRequestsTimer->SetInterval(3600);
285 m_CleanupCertificateRequestsTimer->Start();
286 m_CleanupCertificateRequestsTimer->Reschedule(0);
288 m_ApiPackageIntegrityTimer = new Timer();
289 m_ApiPackageIntegrityTimer->OnTimerExpired.connect(std::bind(&ApiListener::CheckApiPackageIntegrity, this));
290 m_ApiPackageIntegrityTimer->SetInterval(300);
291 m_ApiPackageIntegrityTimer->Start();
293 OnMasterChanged(true);
296 void ApiListener::Stop(bool runtimeDeleted)
298 ObjectImpl<ApiListener>::Stop(runtimeDeleted);
300 Log(LogInformation, "ApiListener")
301 << "'" << GetName() << "' stopped.";
304 boost::mutex::scoped_lock lock(m_LogLock);
312 ApiListener::Ptr ApiListener::GetInstance()
317 Endpoint::Ptr ApiListener::GetMaster() const
319 Zone::Ptr zone = Zone::GetLocalZone();
324 std::vector<String> names;
326 for (const Endpoint::Ptr& endpoint : zone->GetEndpoints())
327 if (endpoint->GetConnected() || endpoint->GetName() == GetIdentity())
328 names.push_back(endpoint->GetName());
330 std::sort(names.begin(), names.end());
332 return Endpoint::GetByName(*names.begin());
335 bool ApiListener::IsMaster() const
337 Endpoint::Ptr master = GetMaster();
342 return master == GetLocalEndpoint();
346 * Creates a new JSON-RPC listener on the specified port.
348 * @param node The host the listener should be bound to.
349 * @param service The port to listen on.
351 bool ApiListener::AddListener(const String& node, const String& service)
353 namespace asio = boost::asio;
354 namespace ip = asio::ip;
357 ObjectLock olock(this);
359 auto sslContext (m_SSLContext);
362 Log(LogCritical, "ApiListener", "SSL context is required for AddListener()");
366 auto& io (IoEngine::Get().GetIoContext());
367 auto acceptor (Shared<tcp::acceptor>::Make(io));
370 tcp::resolver resolver (io);
371 tcp::resolver::query query (node, service, tcp::resolver::query::passive);
373 auto result (resolver.resolve(query));
374 auto current (result.begin());
378 acceptor->open(current->endpoint().protocol());
381 auto fd (acceptor->native_handle());
383 const int optFalse = 0;
384 setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, reinterpret_cast<const char *>(&optFalse), sizeof(optFalse));
386 const int optTrue = 1;
387 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<const char *>(&optTrue), sizeof(optTrue));
389 setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, reinterpret_cast<const char *>(&optTrue), sizeof(optTrue));
393 acceptor->bind(current->endpoint());
396 } catch (const std::exception&) {
397 if (++current == result.end()) {
401 if (acceptor->is_open()) {
406 } catch (const std::exception& ex) {
407 Log(LogCritical, "ApiListener")
408 << "Cannot bind TCP socket for host '" << node << "' on port '" << service << "': " << ex.what();
412 acceptor->listen(INT_MAX);
414 auto localEndpoint (acceptor->local_endpoint());
416 Log(LogInformation, "ApiListener")
417 << "Started new listener on '[" << localEndpoint.address() << "]:" << localEndpoint.port() << "'";
419 IoEngine::SpawnCoroutine(io, [this, acceptor, sslContext](asio::yield_context yc) { ListenerCoroutineProc(yc, acceptor, sslContext); });
421 UpdateStatusFile(localEndpoint);
426 void ApiListener::ListenerCoroutineProc(boost::asio::yield_context yc, const Shared<boost::asio::ip::tcp::acceptor>::Ptr& server, const Shared<boost::asio::ssl::context>::Ptr& sslContext)
428 namespace asio = boost::asio;
430 auto& io (IoEngine::Get().GetIoContext());
434 auto sslConn (Shared<AsioTlsStream>::Make(io, *sslContext));
436 server->async_accept(sslConn->lowest_layer(), yc);
438 IoEngine::SpawnCoroutine(io, [this, sslConn](asio::yield_context yc) { NewClientHandler(yc, sslConn, String(), RoleServer); });
439 } catch (const std::exception& ex) {
440 Log(LogCritical, "ApiListener")
441 << "Cannot accept new connection: " << ex.what();
447 * Creates a new JSON-RPC client and connects to the specified endpoint.
449 * @param endpoint The endpoint.
451 void ApiListener::AddConnection(const Endpoint::Ptr& endpoint)
453 namespace asio = boost::asio;
456 auto sslContext (m_SSLContext);
459 Log(LogCritical, "ApiListener", "SSL context is required for AddConnection()");
463 auto& io (IoEngine::Get().GetIoContext());
465 IoEngine::SpawnCoroutine(io, [this, endpoint, &io, sslContext](asio::yield_context yc) {
466 String host = endpoint->GetHost();
467 String port = endpoint->GetPort();
469 Log(LogInformation, "ApiListener")
470 << "Reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'";
473 auto sslConn (Shared<AsioTlsStream>::Make(io, *sslContext, endpoint->GetName()));
475 Connect(sslConn->lowest_layer(), host, port, yc);
477 NewClientHandler(yc, sslConn, endpoint->GetName(), RoleClient);
479 endpoint->SetConnecting(false);
480 Log(LogInformation, "ApiListener")
481 << "Finished reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'";
482 } catch (const std::exception& ex) {
483 endpoint->SetConnecting(false);
485 Log(LogCritical, "ApiListener")
486 << "Cannot connect to host '" << host << "' on port '" << port << "': " << ex.what();
491 void ApiListener::NewClientHandler(boost::asio::yield_context yc, const Shared<AsioTlsStream>::Ptr& client, const String& hostname, ConnectionRole role)
494 NewClientHandlerInternal(yc, client, hostname, role);
495 } catch (const std::exception& ex) {
496 Log(LogCritical, "ApiListener")
497 << "Exception while handling new API client connection: " << DiagnosticInformation(ex, false);
499 Log(LogDebug, "ApiListener")
500 << "Exception while handling new API client connection: " << DiagnosticInformation(ex);
505 * Processes a new client connection.
507 * @param client The new client.
509 void ApiListener::NewClientHandlerInternal(boost::asio::yield_context yc, const Shared<AsioTlsStream>::Ptr& client, const String& hostname, ConnectionRole role)
511 namespace asio = boost::asio;
512 namespace ssl = asio::ssl;
517 std::ostringstream conninfo_;
519 if (role == RoleClient) {
525 auto endpoint (client->lowest_layer().remote_endpoint());
527 conninfo_ << " [" << endpoint.address() << "]:" << endpoint.port();
529 conninfo = conninfo_.str();
532 auto& sslConn (client->next_layer());
534 boost::system::error_code ec;
536 sslConn.async_handshake(role == RoleClient ? sslConn.client : sslConn.server, yc[ec]);
539 // https://github.com/boostorg/beast/issues/915
540 // Google Chrome 73+ seems not close the connection properly, https://stackoverflow.com/questions/56272906/how-to-fix-certificate-unknown-error-from-chrome-v73
541 if (ec == asio::ssl::error::stream_truncated) {
542 Log(LogNotice, "ApiListener")
543 << "TLS stream was truncated, ignoring connection from " << conninfo;
547 Log(LogCritical, "ApiListener")
548 << "Client TLS handshake failed (" << conninfo << "): " << ec.message();
552 bool willBeShutDown = false;
554 Defer shutDownIfNeeded ([&sslConn, &willBeShutDown, &yc]() {
555 if (!willBeShutDown) {
556 // Ignore the error, but do not throw an exception being swallowed at all cost.
557 // https://github.com/Icinga/icinga2/issues/7351
558 boost::system::error_code ec;
559 sslConn.async_shutdown(yc[ec]);
563 std::shared_ptr<X509> cert (sslConn.GetPeerCertificate());
564 bool verify_ok = false;
566 Endpoint::Ptr endpoint;
569 verify_ok = sslConn.IsVerifyOK();
571 String verifyError = sslConn.GetVerifyError();
574 identity = GetCertificateCN(cert);
575 } catch (const std::exception&) {
576 Log(LogCritical, "ApiListener")
577 << "Cannot get certificate common name from cert path: '" << GetDefaultCertPath() << "'.";
581 if (!hostname.IsEmpty()) {
582 if (identity != hostname) {
583 Log(LogWarning, "ApiListener")
584 << "Unexpected certificate common name while connecting to endpoint '"
585 << hostname << "': got '" << identity << "'";
587 } else if (!verify_ok) {
588 Log(LogWarning, "ApiListener")
589 << "Certificate validation failed for endpoint '" << hostname
590 << "': " << verifyError;
595 endpoint = Endpoint::GetByName(identity);
598 Log log(LogInformation, "ApiListener");
600 log << "New client connection for identity '" << identity << "' " << conninfo;
603 log << " (certificate validation failed: " << verifyError << ")";
604 } else if (!endpoint) {
605 log << " (no Endpoint object found for identity)";
608 Log(LogInformation, "ApiListener")
609 << "New client connection " << conninfo << " (no client certificate)";
614 if (role == RoleClient) {
615 JsonRpc::SendMessage(client, new Dictionary({
616 { "jsonrpc", "2.0" },
617 { "method", "icinga::Hello" },
618 { "params", new Dictionary() }
621 client->async_flush(yc);
623 ctype = ClientJsonRpc;
626 boost::system::error_code ec;
628 if (client->async_fill(yc[ec]) == 0u) {
629 if (identity.IsEmpty()) {
630 Log(LogInformation, "ApiListener")
631 << "No data received on new API connection " << conninfo << ". "
632 << "Ensure that the remote endpoints are properly configured in a cluster setup.";
634 Log(LogWarning, "ApiListener")
635 << "No data received on new API connection " << conninfo << " for identity '" << identity << "'. "
636 << "Ensure that the remote endpoints are properly configured in a cluster setup.";
646 asio::mutable_buffer firstByteBuf (&firstByte, 1);
647 client->peek(firstByteBuf);
650 if (firstByte >= '0' && firstByte <= '9') {
651 ctype = ClientJsonRpc;
657 if (ctype == ClientJsonRpc) {
658 Log(LogNotice, "ApiListener", "New JSON-RPC client");
660 JsonRpcConnection::Ptr aclient = new JsonRpcConnection(identity, verify_ok, client, role);
663 bool needSync = !endpoint->GetConnected();
665 endpoint->AddClient(aclient);
667 IoEngine::SpawnCoroutine(IoEngine::Get().GetIoContext(), [this, aclient, endpoint, needSync](asio::yield_context yc) {
668 CpuBoundWork syncClient (yc);
670 SyncClient(aclient, endpoint, needSync);
673 } else if (!AddAnonymousClient(aclient)) {
674 Log(LogNotice, "ApiListener")
675 << "Ignoring anonymous JSON-RPC connection " << conninfo
676 << ". Max connections (" << GetMaxAnonymousClients() << ") exceeded.";
684 willBeShutDown = true;
687 Log(LogNotice, "ApiListener", "New HTTP client");
689 HttpServerConnection::Ptr aclient = new HttpServerConnection(identity, verify_ok, client);
690 AddHttpClient(aclient);
693 willBeShutDown = true;
697 void ApiListener::SyncClient(const JsonRpcConnection::Ptr& aclient, const Endpoint::Ptr& endpoint, bool needSync)
699 Zone::Ptr eZone = endpoint->GetZone();
703 ObjectLock olock(endpoint);
705 endpoint->SetSyncing(true);
708 Zone::Ptr myZone = Zone::GetLocalZone();
710 if (myZone->GetParent() == eZone) {
711 Log(LogInformation, "ApiListener")
712 << "Requesting new certificate for this Icinga instance from endpoint '" << endpoint->GetName() << "'.";
714 JsonRpcConnection::SendCertificateRequest(aclient, nullptr, String());
716 if (Utility::PathExists(ApiListener::GetCertificateRequestsDir()))
717 Utility::Glob(ApiListener::GetCertificateRequestsDir() + "/*.json", std::bind(&JsonRpcConnection::SendCertificateRequest, aclient, nullptr, _1), GlobFile);
720 /* Make sure that the config updates are synced
721 * before the logs are replayed.
724 Log(LogInformation, "ApiListener")
725 << "Sending config updates for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
727 /* sync zone file config */
728 SendConfigUpdate(aclient);
730 Log(LogInformation, "ApiListener")
731 << "Finished sending config file updates for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
733 /* sync runtime config */
734 SendRuntimeConfigObjects(aclient);
736 Log(LogInformation, "ApiListener")
737 << "Finished sending runtime config updates for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
740 ObjectLock olock2(endpoint);
741 endpoint->SetSyncing(false);
745 Log(LogInformation, "ApiListener")
746 << "Sending replay log for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
750 if (eZone == Zone::GetLocalZone())
751 UpdateObjectAuthority();
753 Log(LogInformation, "ApiListener")
754 << "Finished sending replay log for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
755 } catch (const std::exception& ex) {
757 ObjectLock olock2(endpoint);
758 endpoint->SetSyncing(false);
761 Log(LogCritical, "ApiListener")
762 << "Error while syncing endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex, false);
764 Log(LogDebug, "ApiListener")
765 << "Error while syncing endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex);
768 Log(LogInformation, "ApiListener")
769 << "Finished syncing endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
772 void ApiListener::ApiTimerHandler()
774 double now = Utility::GetTime();
776 std::vector<int> files;
777 Utility::Glob(GetApiDir() + "log/*", std::bind(&ApiListener::LogGlobHandler, std::ref(files), _1), GlobFile);
778 std::sort(files.begin(), files.end());
780 for (int ts : files) {
782 auto localZone (GetLocalEndpoint()->GetZone());
784 for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) {
785 if (endpoint == GetLocalEndpoint())
788 auto zone (endpoint->GetZone());
790 /* only care for endpoints in a) the same zone b) our parent zone c) immediate child zones */
791 if (!(zone == localZone || zone == localZone->GetParent() || zone->GetParent() == localZone)) {
795 if (endpoint->GetLogDuration() >= 0 && ts < now - endpoint->GetLogDuration())
798 if (ts > endpoint->GetLocalLogPosition()) {
805 String path = GetApiDir() + "log/" + Convert::ToString(ts);
806 Log(LogNotice, "ApiListener")
807 << "Removing old log file: " << path;
808 (void)unlink(path.CStr());
812 for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) {
813 if (!endpoint->GetConnected())
816 double ts = endpoint->GetRemoteLogPosition();
821 Dictionary::Ptr lmessage = new Dictionary({
822 { "jsonrpc", "2.0" },
823 { "method", "log::SetLogPosition" },
824 { "params", new Dictionary({
825 { "log_position", ts }
831 for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
832 if (client->GetTimestamp() > maxTs)
833 maxTs = client->GetTimestamp();
836 for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
837 if (client->GetTimestamp() == maxTs) {
838 client->SendMessage(lmessage);
840 client->Disconnect();
844 Log(LogNotice, "ApiListener")
845 << "Setting log position for identity '" << endpoint->GetName() << "': "
846 << Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", ts);
850 void ApiListener::ApiReconnectTimerHandler()
852 Zone::Ptr my_zone = Zone::GetLocalZone();
854 for (const Zone::Ptr& zone : ConfigType::GetObjectsByType<Zone>()) {
855 /* don't connect to global zones */
856 if (zone->GetGlobal())
859 /* only connect to endpoints in a) the same zone b) our parent zone c) immediate child zones */
860 if (my_zone != zone && my_zone != zone->GetParent() && zone != my_zone->GetParent()) {
861 Log(LogDebug, "ApiListener")
862 << "Not connecting to Zone '" << zone->GetName()
863 << "' because it's not in the same zone, a parent or a child zone.";
867 for (const Endpoint::Ptr& endpoint : zone->GetEndpoints()) {
868 /* don't connect to ourselves */
869 if (endpoint == GetLocalEndpoint()) {
870 Log(LogDebug, "ApiListener")
871 << "Not connecting to Endpoint '" << endpoint->GetName() << "' because that's us.";
875 /* don't try to connect to endpoints which don't have a host and port */
876 if (endpoint->GetHost().IsEmpty() || endpoint->GetPort().IsEmpty()) {
877 Log(LogDebug, "ApiListener")
878 << "Not connecting to Endpoint '" << endpoint->GetName()
879 << "' because the host/port attributes are missing.";
883 /* don't try to connect if there's already a connection attempt */
884 if (endpoint->GetConnecting()) {
885 Log(LogDebug, "ApiListener")
886 << "Not connecting to Endpoint '" << endpoint->GetName()
887 << "' because we're already trying to connect to it.";
891 /* don't try to connect if we're already connected */
892 if (endpoint->GetConnected()) {
893 Log(LogDebug, "ApiListener")
894 << "Not connecting to Endpoint '" << endpoint->GetName()
895 << "' because we're already connected to it.";
899 /* Set connecting state to prevent duplicated queue inserts later. */
900 endpoint->SetConnecting(true);
902 AddConnection(endpoint);
906 Endpoint::Ptr master = GetMaster();
909 Log(LogNotice, "ApiListener")
910 << "Current zone master: " << master->GetName();
912 std::vector<String> names;
913 for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>())
914 if (endpoint->GetConnected())
915 names.emplace_back(endpoint->GetName() + " (" + Convert::ToString(endpoint->GetClients().size()) + ")");
917 Log(LogNotice, "ApiListener")
918 << "Connected endpoints: " << Utility::NaturalJoin(names);
921 static void CleanupCertificateRequest(const String& path, double expiryTime)
925 if (lstat(path.CStr(), &statbuf) < 0)
928 struct _stat statbuf;
929 if (_stat(path.CStr(), &statbuf) < 0)
933 if (statbuf.st_mtime < expiryTime)
934 (void) unlink(path.CStr());
937 void ApiListener::CleanupCertificateRequestsTimerHandler()
939 String requestsDir = GetCertificateRequestsDir();
941 if (Utility::PathExists(requestsDir)) {
942 /* remove certificate requests that are older than a week */
943 double expiryTime = Utility::GetTime() - 7 * 24 * 60 * 60;
944 Utility::Glob(requestsDir + "/*.json", std::bind(&CleanupCertificateRequest, _1, expiryTime), GlobFile);
948 void ApiListener::RelayMessage(const MessageOrigin::Ptr& origin,
949 const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
954 m_RelayQueue.Enqueue(std::bind(&ApiListener::SyncRelayMessage, this, origin, secobj, message, log), PriorityNormal, true);
957 void ApiListener::PersistMessage(const Dictionary::Ptr& message, const ConfigObject::Ptr& secobj)
959 double ts = message->Get("ts");
963 Dictionary::Ptr pmessage = new Dictionary();
964 pmessage->Set("timestamp", ts);
966 pmessage->Set("message", JsonEncode(message));
969 Dictionary::Ptr secname = new Dictionary();
970 secname->Set("type", secobj->GetReflectionType()->GetName());
971 secname->Set("name", secobj->GetName());
972 pmessage->Set("secobj", secname);
975 boost::mutex::scoped_lock lock(m_LogLock);
977 NetString::WriteStringToStream(m_LogFile, JsonEncode(pmessage));
979 SetLogMessageTimestamp(ts);
981 if (m_LogMessageCount > 50000) {
989 void ApiListener::SyncSendMessage(const Endpoint::Ptr& endpoint, const Dictionary::Ptr& message)
991 ObjectLock olock(endpoint);
993 if (!endpoint->GetSyncing()) {
994 Log(LogNotice, "ApiListener")
995 << "Sending message '" << message->Get("method") << "' to '" << endpoint->GetName() << "'";
999 for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
1000 if (client->GetTimestamp() > maxTs)
1001 maxTs = client->GetTimestamp();
1004 for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
1005 if (client->GetTimestamp() != maxTs)
1008 client->SendMessage(message);
1013 bool ApiListener::RelayMessageOne(const Zone::Ptr& targetZone, const MessageOrigin::Ptr& origin, const Dictionary::Ptr& message, const Endpoint::Ptr& currentZoneMaster)
1017 Zone::Ptr localZone = Zone::GetLocalZone();
1019 /* only relay the message to a) the same local zone, b) the parent zone and c) direct child zones. Exception is a global zone. */
1020 if (!targetZone->GetGlobal() &&
1021 targetZone != localZone &&
1022 targetZone != localZone->GetParent() &&
1023 targetZone->GetParent() != localZone) {
1027 Endpoint::Ptr localEndpoint = GetLocalEndpoint();
1029 std::vector<Endpoint::Ptr> skippedEndpoints;
1031 bool relayed = false, log_needed = false, log_done = false;
1033 std::set<Endpoint::Ptr> targetEndpoints;
1035 if (targetZone->GetGlobal()) {
1036 targetEndpoints = localZone->GetEndpoints();
1038 for (const Zone::Ptr& zone : ConfigType::GetObjectsByType<Zone>()) {
1039 /* Fetch immediate child zone members */
1040 if (zone->GetParent() == localZone) {
1041 std::set<Endpoint::Ptr> endpoints = zone->GetEndpoints();
1042 targetEndpoints.insert(endpoints.begin(), endpoints.end());
1046 targetEndpoints = targetZone->GetEndpoints();
1049 for (const Endpoint::Ptr& targetEndpoint : targetEndpoints) {
1050 /* Don't relay messages to ourselves. */
1051 if (targetEndpoint == localEndpoint)
1056 /* Don't relay messages to disconnected endpoints. */
1057 if (!targetEndpoint->GetConnected()) {
1058 if (targetZone == localZone)
1066 /* Don't relay the message to the zone through more than one endpoint unless this is our own zone.
1067 * 'relayed' is set to true on success below, enabling the checks in the second iteration.
1069 if (relayed && targetZone != localZone) {
1070 skippedEndpoints.push_back(targetEndpoint);
1074 /* Don't relay messages back to the endpoint which we got the message from. */
1075 if (origin && origin->FromClient && targetEndpoint == origin->FromClient->GetEndpoint()) {
1076 skippedEndpoints.push_back(targetEndpoint);
1080 /* Don't relay messages back to the zone which we got the message from. */
1081 if (origin && origin->FromZone && targetZone == origin->FromZone) {
1082 skippedEndpoints.push_back(targetEndpoint);
1086 /* Only relay message to the zone master if we're not currently the zone master.
1087 * e1 is zone master, e2 and e3 are zone members.
1089 * Message is sent from e2 or e3:
1091 * targetEndpoint e1 is zone master -> send the message
1092 * targetEndpoint e3 is not zone master -> skip it, avoid routing loops
1094 * Message is sent from e1:
1095 * !isMaster == false -> send the messages to e2 and e3 being the zone routing master.
1097 bool isMaster = (currentZoneMaster == localEndpoint);
1099 if (!isMaster && targetEndpoint != currentZoneMaster) {
1100 skippedEndpoints.push_back(targetEndpoint);
1106 SyncSendMessage(targetEndpoint, message);
1109 if (!skippedEndpoints.empty()) {
1110 double ts = message->Get("ts");
1112 for (const Endpoint::Ptr& skippedEndpoint : skippedEndpoints)
1113 skippedEndpoint->SetLocalLogPosition(ts);
1116 return !log_needed || log_done;
1119 void ApiListener::SyncRelayMessage(const MessageOrigin::Ptr& origin,
1120 const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
1122 double ts = Utility::GetTime();
1123 message->Set("ts", ts);
1125 Log(LogNotice, "ApiListener")
1126 << "Relaying '" << message->Get("method") << "' message";
1128 if (origin && origin->FromZone)
1129 message->Set("originZone", origin->FromZone->GetName());
1131 Zone::Ptr target_zone;
1134 if (secobj->GetReflectionType() == Zone::TypeInstance)
1135 target_zone = static_pointer_cast<Zone>(secobj);
1137 target_zone = static_pointer_cast<Zone>(secobj->GetZone());
1141 target_zone = Zone::GetLocalZone();
1143 Endpoint::Ptr master = GetMaster();
1145 bool need_log = !RelayMessageOne(target_zone, origin, message, master);
1147 for (const Zone::Ptr& zone : target_zone->GetAllParentsRaw()) {
1148 if (!RelayMessageOne(zone, origin, message, master))
1152 if (log && need_log)
1153 PersistMessage(message, secobj);
1156 /* must hold m_LogLock */
1157 void ApiListener::OpenLogFile()
1159 String path = GetApiDir() + "log/current";
1161 Utility::MkDirP(Utility::DirName(path), 0750);
1163 auto *fp = new std::fstream(path.CStr(), std::fstream::out | std::ofstream::app);
1166 Log(LogWarning, "ApiListener")
1167 << "Could not open spool file: " << path;
1171 m_LogFile = new StdioStream(fp, true);
1172 m_LogMessageCount = 0;
1173 SetLogMessageTimestamp(Utility::GetTime());
1176 /* must hold m_LogLock */
1177 void ApiListener::CloseLogFile()
1186 /* must hold m_LogLock */
1187 void ApiListener::RotateLogFile()
1189 double ts = GetLogMessageTimestamp();
1192 ts = Utility::GetTime();
1194 String oldpath = GetApiDir() + "log/current";
1195 String newpath = GetApiDir() + "log/" + Convert::ToString(static_cast<int>(ts)+1);
1197 // If the log is being rotated more than once per second,
1198 // don't overwrite the previous one, but silently deny rotation.
1199 if (!Utility::PathExists(newpath)) {
1200 (void) rename(oldpath.CStr(), newpath.CStr());
1204 void ApiListener::LogGlobHandler(std::vector<int>& files, const String& file)
1206 String name = Utility::BaseName(file);
1208 if (name == "current")
1214 ts = Convert::ToLong(name);
1215 } catch (const std::exception&) {
1219 files.push_back(ts);
1222 void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
1224 Endpoint::Ptr endpoint = client->GetEndpoint();
1226 if (endpoint->GetLogDuration() == 0) {
1227 ObjectLock olock2(endpoint);
1228 endpoint->SetSyncing(false);
1232 CONTEXT("Replaying log for Endpoint '" + endpoint->GetName() + "'");
1235 double peer_ts = endpoint->GetLocalLogPosition();
1236 double logpos_ts = peer_ts;
1237 bool last_sync = false;
1239 Endpoint::Ptr target_endpoint = client->GetEndpoint();
1240 ASSERT(target_endpoint);
1242 Zone::Ptr target_zone = target_endpoint->GetZone();
1245 ObjectLock olock2(endpoint);
1246 endpoint->SetSyncing(false);
1251 boost::mutex::scoped_lock lock(m_LogLock);
1255 if (count == -1 || count > 50000) {
1264 std::vector<int> files;
1265 Utility::Glob(GetApiDir() + "log/*", std::bind(&ApiListener::LogGlobHandler, std::ref(files), _1), GlobFile);
1266 std::sort(files.begin(), files.end());
1268 std::vector<std::pair<int, String>> allFiles;
1270 for (int ts : files) {
1271 if (ts >= peer_ts) {
1272 allFiles.emplace_back(ts, GetApiDir() + "log/" + Convert::ToString(ts));
1276 allFiles.emplace_back(Utility::GetTime() + 1, GetApiDir() + "log/current");
1278 for (auto& file : allFiles) {
1279 Log(LogNotice, "ApiListener")
1280 << "Replaying log: " << file.second;
1282 auto *fp = new std::fstream(file.second.CStr(), std::fstream::in | std::fstream::binary);
1283 StdioStream::Ptr logStream = new StdioStream(fp, true);
1286 StreamReadContext src;
1288 Dictionary::Ptr pmessage;
1291 StreamReadStatus srs = NetString::ReadStringFromStream(logStream, &message, src);
1293 if (srs == StatusEof)
1296 if (srs != StatusNewItem)
1299 pmessage = JsonDecode(message);
1300 } catch (const std::exception&) {
1301 Log(LogWarning, "ApiListener")
1302 << "Unexpected end-of-file for cluster log: " << file.second;
1304 /* Log files may be incomplete or corrupted. This is perfectly OK. */
1308 if (pmessage->Get("timestamp") <= peer_ts)
1311 Dictionary::Ptr secname = pmessage->Get("secobj");
1314 ConfigObject::Ptr secobj = ConfigObject::GetObject(secname->Get("type"), secname->Get("name"));
1319 if (!target_zone->CanAccessObject(secobj))
1324 client->SendRawMessage(pmessage->Get("message"));
1326 } catch (const std::exception& ex) {
1327 Log(LogWarning, "ApiListener")
1328 << "Error while replaying log for endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex, false);
1330 Log(LogDebug, "ApiListener")
1331 << "Error while replaying log for endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex);
1336 peer_ts = pmessage->Get("timestamp");
1338 if (file.first > logpos_ts + 10) {
1339 logpos_ts = file.first;
1341 Dictionary::Ptr lmessage = new Dictionary({
1342 { "jsonrpc", "2.0" },
1343 { "method", "log::SetLogPosition" },
1344 { "params", new Dictionary({
1345 { "log_position", logpos_ts }
1349 client->SendMessage(lmessage);
1357 Log(LogInformation, "ApiListener")
1358 << "Replayed " << count << " messages.";
1361 Log(LogNotice, "ApiListener")
1362 << "Replayed " << count << " messages.";
1367 ObjectLock olock2(endpoint);
1368 endpoint->SetSyncing(false);
1378 void ApiListener::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
1380 std::pair<Dictionary::Ptr, Dictionary::Ptr> stats;
1382 ApiListener::Ptr listener = ApiListener::GetInstance();
1387 stats = listener->GetStatus();
1389 ObjectLock olock(stats.second);
1390 for (const Dictionary::Pair& kv : stats.second)
1391 perfdata->Add(new PerfdataValue("api_" + kv.first, kv.second));
1393 status->Set("api", stats.first);
1396 std::pair<Dictionary::Ptr, Dictionary::Ptr> ApiListener::GetStatus()
1398 Dictionary::Ptr perfdata = new Dictionary();
1402 double allEndpoints = 0;
1403 Array::Ptr allNotConnectedEndpoints = new Array();
1404 Array::Ptr allConnectedEndpoints = new Array();
1406 Zone::Ptr my_zone = Zone::GetLocalZone();
1408 Dictionary::Ptr connectedZones = new Dictionary();
1410 for (const Zone::Ptr& zone : ConfigType::GetObjectsByType<Zone>()) {
1411 /* only check endpoints in a) the same zone b) our parent zone c) immediate child zones */
1412 if (my_zone != zone && my_zone != zone->GetParent() && zone != my_zone->GetParent()) {
1413 Log(LogDebug, "ApiListener")
1414 << "Not checking connection to Zone '" << zone->GetName() << "' because it's not in the same zone, a parent or a child zone.";
1418 bool zoneConnected = false;
1419 int countZoneEndpoints = 0;
1422 ArrayData zoneEndpoints;
1424 for (const Endpoint::Ptr& endpoint : zone->GetEndpoints()) {
1425 zoneEndpoints.emplace_back(endpoint->GetName());
1427 if (endpoint->GetName() == GetIdentity())
1430 double eplag = CalculateZoneLag(endpoint);
1432 if (eplag > 0 && eplag > zoneLag)
1436 countZoneEndpoints++;
1438 if (!endpoint->GetConnected()) {
1439 allNotConnectedEndpoints->Add(endpoint->GetName());
1441 allConnectedEndpoints->Add(endpoint->GetName());
1442 zoneConnected = true;
1446 /* if there's only one endpoint inside the zone, we're not connected - that's us, fake it */
1447 if (zone->GetEndpoints().size() == 1 && countZoneEndpoints == 0)
1448 zoneConnected = true;
1450 String parentZoneName;
1451 Zone::Ptr parentZone = zone->GetParent();
1453 parentZoneName = parentZone->GetName();
1455 Dictionary::Ptr zoneStats = new Dictionary({
1456 { "connected", zoneConnected },
1457 { "client_log_lag", zoneLag },
1458 { "endpoints", new Array(std::move(zoneEndpoints)) },
1459 { "parent_zone", parentZoneName }
1462 connectedZones->Set(zone->GetName(), zoneStats);
1465 /* connection stats */
1466 size_t jsonRpcAnonymousClients = GetAnonymousClients().size();
1467 size_t httpClients = GetHttpClients().size();
1468 size_t syncQueueItems = m_SyncQueue.GetLength();
1469 size_t relayQueueItems = m_RelayQueue.GetLength();
1470 double workQueueItemRate = JsonRpcConnection::GetWorkQueueRate();
1471 double syncQueueItemRate = m_SyncQueue.GetTaskCount(60) / 60.0;
1472 double relayQueueItemRate = m_RelayQueue.GetTaskCount(60) / 60.0;
1474 Dictionary::Ptr status = new Dictionary({
1475 { "identity", GetIdentity() },
1476 { "num_endpoints", allEndpoints },
1477 { "num_conn_endpoints", allConnectedEndpoints->GetLength() },
1478 { "num_not_conn_endpoints", allNotConnectedEndpoints->GetLength() },
1479 { "conn_endpoints", allConnectedEndpoints },
1480 { "not_conn_endpoints", allNotConnectedEndpoints },
1482 { "zones", connectedZones },
1484 { "json_rpc", new Dictionary({
1485 { "anonymous_clients", jsonRpcAnonymousClients },
1486 { "sync_queue_items", syncQueueItems },
1487 { "relay_queue_items", relayQueueItems },
1488 { "work_queue_item_rate", workQueueItemRate },
1489 { "sync_queue_item_rate", syncQueueItemRate },
1490 { "relay_queue_item_rate", relayQueueItemRate }
1493 { "http", new Dictionary({
1494 { "clients", httpClients }
1498 /* performance data */
1499 perfdata->Set("num_endpoints", allEndpoints);
1500 perfdata->Set("num_conn_endpoints", Convert::ToDouble(allConnectedEndpoints->GetLength()));
1501 perfdata->Set("num_not_conn_endpoints", Convert::ToDouble(allNotConnectedEndpoints->GetLength()));
1503 perfdata->Set("num_json_rpc_anonymous_clients", jsonRpcAnonymousClients);
1504 perfdata->Set("num_http_clients", httpClients);
1505 perfdata->Set("num_json_rpc_sync_queue_items", syncQueueItems);
1506 perfdata->Set("num_json_rpc_relay_queue_items", relayQueueItems);
1508 perfdata->Set("num_json_rpc_work_queue_item_rate", workQueueItemRate);
1509 perfdata->Set("num_json_rpc_sync_queue_item_rate", syncQueueItemRate);
1510 perfdata->Set("num_json_rpc_relay_queue_item_rate", relayQueueItemRate);
1512 return std::make_pair(status, perfdata);
1515 double ApiListener::CalculateZoneLag(const Endpoint::Ptr& endpoint)
1517 double remoteLogPosition = endpoint->GetRemoteLogPosition();
1518 double eplag = Utility::GetTime() - remoteLogPosition;
1520 if ((endpoint->GetSyncing() || !endpoint->GetConnected()) && remoteLogPosition != 0)
1526 bool ApiListener::AddAnonymousClient(const JsonRpcConnection::Ptr& aclient)
1528 boost::mutex::scoped_lock lock(m_AnonymousClientsLock);
1530 if (GetMaxAnonymousClients() >= 0 && (long)m_AnonymousClients.size() + 1 > (long)GetMaxAnonymousClients())
1533 m_AnonymousClients.insert(aclient);
1537 void ApiListener::RemoveAnonymousClient(const JsonRpcConnection::Ptr& aclient)
1539 boost::mutex::scoped_lock lock(m_AnonymousClientsLock);
1540 m_AnonymousClients.erase(aclient);
1543 std::set<JsonRpcConnection::Ptr> ApiListener::GetAnonymousClients() const
1545 boost::mutex::scoped_lock lock(m_AnonymousClientsLock);
1546 return m_AnonymousClients;
1549 void ApiListener::AddHttpClient(const HttpServerConnection::Ptr& aclient)
1551 boost::mutex::scoped_lock lock(m_HttpClientsLock);
1552 m_HttpClients.insert(aclient);
1555 void ApiListener::RemoveHttpClient(const HttpServerConnection::Ptr& aclient)
1557 boost::mutex::scoped_lock lock(m_HttpClientsLock);
1558 m_HttpClients.erase(aclient);
1561 std::set<HttpServerConnection::Ptr> ApiListener::GetHttpClients() const
1563 boost::mutex::scoped_lock lock(m_HttpClientsLock);
1564 return m_HttpClients;
1567 Value ApiListener::HelloAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params)
1572 Endpoint::Ptr ApiListener::GetLocalEndpoint() const
1574 return m_LocalEndpoint;
1577 void ApiListener::UpdateActivePackageStagesCache()
1579 boost::mutex::scoped_lock lock(m_ActivePackageStagesLock);
1581 for (auto package : ConfigPackageUtility::GetPackages()) {
1585 activeStage = ConfigPackageUtility::GetActiveStageFromFile(package);
1586 } catch (const std::exception& ex) {
1587 Log(LogCritical, "ApiListener")
1592 Log(LogNotice, "ApiListener")
1593 << "Updating cache: Config package '" << package << "' has active stage '" << activeStage << "'.";
1595 m_ActivePackageStages[package] = activeStage;
1599 void ApiListener::CheckApiPackageIntegrity()
1601 boost::mutex::scoped_lock lock(m_ActivePackageStagesLock);
1603 for (auto package : ConfigPackageUtility::GetPackages()) {
1606 activeStage = ConfigPackageUtility::GetActiveStageFromFile(package);
1607 } catch (const std::exception& ex) {
1608 /* An error means that the stage is broken, try to repair it. */
1609 auto it = m_ActivePackageStages.find(package);
1611 if (it == m_ActivePackageStages.end())
1614 String activeStageCached = it->second;
1616 Log(LogInformation, "ApiListener")
1617 << "Repairing broken API config package '" << package
1618 << "', setting active stage '" << activeStageCached << "'.";
1620 ConfigPackageUtility::SetActiveStageToFile(package, activeStageCached);
1625 void ApiListener::SetActivePackageStage(const String& package, const String& stage)
1627 boost::mutex::scoped_lock lock(m_ActivePackageStagesLock);
1628 m_ActivePackageStages[package] = stage;
1631 String ApiListener::GetActivePackageStage(const String& package)
1633 boost::mutex::scoped_lock lock(m_ActivePackageStagesLock);
1635 if (m_ActivePackageStages.find(package) == m_ActivePackageStages.end())
1636 BOOST_THROW_EXCEPTION(ScriptError("Package " + package + " has no active stage."));
1638 return m_ActivePackageStages[package];
1641 void ApiListener::RemoveActivePackageStage(const String& package)
1643 /* This is the rare occassion when a package has been deleted. */
1644 boost::mutex::scoped_lock lock(m_ActivePackageStagesLock);
1646 auto it = m_ActivePackageStages.find(package);
1648 if (it == m_ActivePackageStages.end())
1651 m_ActivePackageStages.erase(it);
1654 void ApiListener::ValidateTlsProtocolmin(const Lazy<String>& lvalue, const ValidationUtils& utils)
1656 ObjectImpl<ApiListener>::ValidateTlsProtocolmin(lvalue, utils);
1658 if (lvalue() != SSL_TXT_TLSV1_2) {
1659 String message = "Invalid TLS version. Must be '" SSL_TXT_TLSV1_2 "'";
1661 BOOST_THROW_EXCEPTION(ValidationError(this, { "tls_protocolmin" }, message));
1665 void ApiListener::ValidateTlsHandshakeTimeout(const Lazy<double>& lvalue, const ValidationUtils& utils)
1667 ObjectImpl<ApiListener>::ValidateTlsHandshakeTimeout(lvalue, utils);
1670 BOOST_THROW_EXCEPTION(ValidationError(this, { "tls_handshake_timeout" }, "Value must be greater than 0."));
1673 bool ApiListener::IsHACluster()
1675 Zone::Ptr zone = Zone::GetLocalZone();
1680 return zone->IsSingleInstance();
1683 /* Provide a helper function for zone origin name. */
1684 String ApiListener::GetFromZoneName(const Zone::Ptr& fromZone)
1686 String fromZoneName;
1689 fromZoneName = fromZone->GetName();
1691 Zone::Ptr lzone = Zone::GetLocalZone();
1694 fromZoneName = lzone->GetName();
1697 return fromZoneName;
1700 void ApiListener::UpdateStatusFile(boost::asio::ip::tcp::endpoint localEndpoint)
1702 String path = Configuration::CacheDir + "/api-state.json";
1704 Utility::SaveJsonFile(path, 0644, new Dictionary({
1705 {"host", String(localEndpoint.address().to_string())},
1706 {"port", localEndpoint.port()}
1710 void ApiListener::RemoveStatusFile()
1712 String path = Configuration::CacheDir + "/api-state.json";
1714 Utility::Remove(path);