]> granicus.if.org Git - icinga2/blob - lib/remote/apilistener.cpp
d680cc76a34a8ee13d7be9161c0cf9a59f2c8343
[icinga2] / lib / remote / apilistener.cpp
1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2
3 #include "remote/apilistener.hpp"
4 #include "remote/apilistener-ti.cpp"
5 #include "remote/jsonrpcconnection.hpp"
6 #include "remote/endpoint.hpp"
7 #include "remote/jsonrpc.hpp"
8 #include "remote/apifunction.hpp"
9 #include "base/convert.hpp"
10 #include "base/io-engine.hpp"
11 #include "base/netstring.hpp"
12 #include "base/json.hpp"
13 #include "base/configtype.hpp"
14 #include "base/logger.hpp"
15 #include "base/objectlock.hpp"
16 #include "base/stdiostream.hpp"
17 #include "base/perfdatavalue.hpp"
18 #include "base/application.hpp"
19 #include "base/context.hpp"
20 #include "base/statsfunction.hpp"
21 #include "base/exception.hpp"
22 #include <boost/asio/ip/tcp.hpp>
23 #include <boost/asio/ip/v6_only.hpp>
24 #include <boost/asio/spawn.hpp>
25 #include <boost/asio/ssl/context.hpp>
26 #include <climits>
27 #include <fstream>
28 #include <memory>
29
30 using namespace icinga;
31
32 REGISTER_TYPE(ApiListener);
33
34 boost::signals2::signal<void(bool)> ApiListener::OnMasterChanged;
35 ApiListener::Ptr ApiListener::m_Instance;
36
37 REGISTER_STATSFUNCTION(ApiListener, &ApiListener::StatsFunc);
38
39 REGISTER_APIFUNCTION(Hello, icinga, &ApiListener::HelloAPIHandler);
40
41 ApiListener::ApiListener()
42 {
43         m_RelayQueue.SetName("ApiListener, RelayQueue");
44         m_SyncQueue.SetName("ApiListener, SyncQueue");
45 }
46
47 String ApiListener::GetApiDir()
48 {
49         return Configuration::DataDir + "/api/";
50 }
51
52 String ApiListener::GetCertsDir()
53 {
54         return Configuration::DataDir + "/certs/";
55 }
56
57 String ApiListener::GetCaDir()
58 {
59         return Configuration::DataDir + "/ca/";
60 }
61
62 String ApiListener::GetCertificateRequestsDir()
63 {
64         return Configuration::DataDir + "/certificate-requests/";
65 }
66
67 String ApiListener::GetDefaultCertPath()
68 {
69         return GetCertsDir() + "/" + ScriptGlobal::Get("NodeName") + ".crt";
70 }
71
72 String ApiListener::GetDefaultKeyPath()
73 {
74         return GetCertsDir() + "/" + ScriptGlobal::Get("NodeName") + ".key";
75 }
76
77 String ApiListener::GetDefaultCaPath()
78 {
79         return GetCertsDir() + "/ca.crt";
80 }
81
82 double ApiListener::GetTlsHandshakeTimeout() const
83 {
84         return Configuration::TlsHandshakeTimeout;
85 }
86
87 void ApiListener::SetTlsHandshakeTimeout(double value, bool suppress_events, const Value& cookie)
88 {
89         Configuration::TlsHandshakeTimeout = value;
90 }
91
92 void ApiListener::CopyCertificateFile(const String& oldCertPath, const String& newCertPath)
93 {
94         struct stat st1, st2;
95
96         if (!oldCertPath.IsEmpty() && stat(oldCertPath.CStr(), &st1) >= 0 && (stat(newCertPath.CStr(), &st2) < 0 || st1.st_mtime > st2.st_mtime)) {
97                 Log(LogWarning, "ApiListener")
98                         << "Copying '" << oldCertPath << "' certificate file to '" << newCertPath << "'";
99
100                 Utility::MkDirP(Utility::DirName(newCertPath), 0700);
101                 Utility::CopyFile(oldCertPath, newCertPath);
102         }
103 }
104
105 /**
106  * Returns the API thread pool.
107  *
108  * @returns The API thread pool.
109  */
110 ThreadPool& ApiListener::GetTP()
111 {
112         static ThreadPool tp;
113         return tp;
114 }
115
116 void ApiListener::EnqueueAsyncCallback(const std::function<void ()>& callback, SchedulerPolicy policy)
117 {
118         GetTP().Post(callback, policy);
119 }
120
121 void ApiListener::OnConfigLoaded()
122 {
123         if (m_Instance)
124                 BOOST_THROW_EXCEPTION(ScriptError("Only one ApiListener object is allowed.", GetDebugInfo()));
125
126         m_Instance = this;
127
128         String defaultCertPath = GetDefaultCertPath();
129         String defaultKeyPath = GetDefaultKeyPath();
130         String defaultCaPath = GetDefaultCaPath();
131
132         /* Migrate certificate location < 2.8 to the new default path. */
133         String oldCertPath = GetCertPath();
134         String oldKeyPath = GetKeyPath();
135         String oldCaPath = GetCaPath();
136
137         CopyCertificateFile(oldCertPath, defaultCertPath);
138         CopyCertificateFile(oldKeyPath, defaultKeyPath);
139         CopyCertificateFile(oldCaPath, defaultCaPath);
140
141         if (!oldCertPath.IsEmpty() && !oldKeyPath.IsEmpty() && !oldCaPath.IsEmpty()) {
142                 Log(LogWarning, "ApiListener", "Please read the upgrading documentation for v2.8: https://icinga.com/docs/icinga2/latest/doc/16-upgrading-icinga-2/");
143         }
144
145         /* set up SSL context */
146         std::shared_ptr<X509> cert;
147         try {
148                 cert = GetX509Certificate(defaultCertPath);
149         } catch (const std::exception&) {
150                 BOOST_THROW_EXCEPTION(ScriptError("Cannot get certificate from cert path: '"
151                         + defaultCertPath + "'.", GetDebugInfo()));
152         }
153
154         try {
155                 SetIdentity(GetCertificateCN(cert));
156         } catch (const std::exception&) {
157                 BOOST_THROW_EXCEPTION(ScriptError("Cannot get certificate common name from cert path: '"
158                         + defaultCertPath + "'.", GetDebugInfo()));
159         }
160
161         Log(LogInformation, "ApiListener")
162                 << "My API identity: " << GetIdentity();
163
164         UpdateSSLContext();
165 }
166
167 void ApiListener::UpdateSSLContext()
168 {
169         namespace ssl = boost::asio::ssl;
170
171         std::shared_ptr<ssl::context> context;
172
173         try {
174                 context = MakeAsioSslContext(GetDefaultCertPath(), GetDefaultKeyPath(), GetDefaultCaPath());
175         } catch (const std::exception&) {
176                 BOOST_THROW_EXCEPTION(ScriptError("Cannot make SSL context for cert path: '"
177                         + GetDefaultCertPath() + "' key path: '" + GetDefaultKeyPath() + "' ca path: '" + GetDefaultCaPath() + "'.", GetDebugInfo()));
178         }
179
180         if (!GetCrlPath().IsEmpty()) {
181                 try {
182                         AddCRLToSSLContext(context, GetCrlPath());
183                 } catch (const std::exception&) {
184                         BOOST_THROW_EXCEPTION(ScriptError("Cannot add certificate revocation list to SSL context for crl path: '"
185                                 + GetCrlPath() + "'.", GetDebugInfo()));
186                 }
187         }
188
189         if (!GetCipherList().IsEmpty()) {
190                 try {
191                         SetCipherListToSSLContext(context, GetCipherList());
192                 } catch (const std::exception&) {
193                         BOOST_THROW_EXCEPTION(ScriptError("Cannot set cipher list to SSL context for cipher list: '"
194                                 + GetCipherList() + "'.", GetDebugInfo()));
195                 }
196         }
197
198         if (!GetTlsProtocolmin().IsEmpty()){
199                 try {
200                         SetTlsProtocolminToSSLContext(context, GetTlsProtocolmin());
201                 } catch (const std::exception&) {
202                         BOOST_THROW_EXCEPTION(ScriptError("Cannot set minimum TLS protocol version to SSL context with tls_protocolmin: '" + GetTlsProtocolmin() + "'.", GetDebugInfo()));
203                 }
204         }
205
206         m_SSLContext = context;
207
208         for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) {
209                 for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
210                         client->Disconnect();
211                 }
212         }
213
214         for (const JsonRpcConnection::Ptr& client : m_AnonymousClients) {
215                 client->Disconnect();
216         }
217 }
218
219 void ApiListener::OnAllConfigLoaded()
220 {
221         m_LocalEndpoint = Endpoint::GetByName(GetIdentity());
222
223         if (!m_LocalEndpoint)
224                 BOOST_THROW_EXCEPTION(ScriptError("Endpoint object for '" + GetIdentity() + "' is missing.", GetDebugInfo()));
225 }
226
227 /**
228  * Starts the component.
229  */
230 void ApiListener::Start(bool runtimeCreated)
231 {
232         Log(LogInformation, "ApiListener")
233                 << "'" << GetName() << "' started.";
234
235         SyncZoneDirs();
236
237         ObjectImpl<ApiListener>::Start(runtimeCreated);
238
239         {
240                 boost::mutex::scoped_lock lock(m_LogLock);
241                 RotateLogFile();
242                 OpenLogFile();
243         }
244
245         /* create the primary JSON-RPC listener */
246         if (!AddListener(GetBindHost(), GetBindPort())) {
247                 Log(LogCritical, "ApiListener")
248                         << "Cannot add listener on host '" << GetBindHost() << "' for port '" << GetBindPort() << "'.";
249                 Application::Exit(EXIT_FAILURE);
250         }
251
252         m_Timer = new Timer();
253         m_Timer->OnTimerExpired.connect(std::bind(&ApiListener::ApiTimerHandler, this));
254         m_Timer->SetInterval(5);
255         m_Timer->Start();
256         m_Timer->Reschedule(0);
257
258         m_ReconnectTimer = new Timer();
259         m_ReconnectTimer->OnTimerExpired.connect(std::bind(&ApiListener::ApiReconnectTimerHandler, this));
260         m_ReconnectTimer->SetInterval(10);
261         m_ReconnectTimer->Start();
262         m_ReconnectTimer->Reschedule(0);
263
264         /* Keep this in relative sync with the cold startup in UpdateObjectAuthority() and the reconnect interval above.
265          * Previous: 60s reconnect, 30s OA, 60s cold startup.
266          * Now: 10s reconnect, 10s OA, 30s cold startup. 
267          */
268         m_AuthorityTimer = new Timer();
269         m_AuthorityTimer->OnTimerExpired.connect(std::bind(&ApiListener::UpdateObjectAuthority));
270         m_AuthorityTimer->SetInterval(10);
271         m_AuthorityTimer->Start();
272
273         m_CleanupCertificateRequestsTimer = new Timer();
274         m_CleanupCertificateRequestsTimer->OnTimerExpired.connect(std::bind(&ApiListener::CleanupCertificateRequestsTimerHandler, this));
275         m_CleanupCertificateRequestsTimer->SetInterval(3600);
276         m_CleanupCertificateRequestsTimer->Start();
277         m_CleanupCertificateRequestsTimer->Reschedule(0);
278
279         OnMasterChanged(true);
280 }
281
282 void ApiListener::Stop(bool runtimeDeleted)
283 {
284         ObjectImpl<ApiListener>::Stop(runtimeDeleted);
285
286         Log(LogInformation, "ApiListener")
287                 << "'" << GetName() << "' stopped.";
288
289         {
290                 boost::mutex::scoped_lock lock(m_LogLock);
291                 CloseLogFile();
292         }
293
294         RemoveStatusFile();
295 }
296
297 ApiListener::Ptr ApiListener::GetInstance()
298 {
299         return m_Instance;
300 }
301
302 Endpoint::Ptr ApiListener::GetMaster() const
303 {
304         Zone::Ptr zone = Zone::GetLocalZone();
305
306         if (!zone)
307                 return nullptr;
308
309         std::vector<String> names;
310
311         for (const Endpoint::Ptr& endpoint : zone->GetEndpoints())
312                 if (endpoint->GetConnected() || endpoint->GetName() == GetIdentity())
313                         names.push_back(endpoint->GetName());
314
315         std::sort(names.begin(), names.end());
316
317         return Endpoint::GetByName(*names.begin());
318 }
319
320 bool ApiListener::IsMaster() const
321 {
322         Endpoint::Ptr master = GetMaster();
323
324         if (!master)
325                 return false;
326
327         return master == GetLocalEndpoint();
328 }
329
330 /**
331  * Creates a new JSON-RPC listener on the specified port.
332  *
333  * @param node The host the listener should be bound to.
334  * @param service The port to listen on.
335  */
336 bool ApiListener::AddListener(const String& node, const String& service)
337 {
338         namespace asio = boost::asio;
339         namespace ip = asio::ip;
340         using ip::tcp;
341
342         ObjectLock olock(this);
343
344         auto sslContext (m_SSLContext);
345
346         if (!sslContext) {
347                 Log(LogCritical, "ApiListener", "SSL context is required for AddListener()");
348                 return false;
349         }
350
351         auto& io (IoEngine::Get().GetIoService());
352         auto acceptor (std::make_shared<tcp::acceptor>(io));
353
354         try {
355                 tcp::resolver resolver (io);
356                 tcp::resolver::query query (node, service, tcp::resolver::query::passive);
357                 auto endpoint (resolver.resolve(query)->endpoint());
358
359                 acceptor->open(endpoint.protocol());
360                 acceptor->set_option(ip::v6_only(false));
361                 acceptor->set_option(tcp::acceptor::reuse_address(true));
362                 acceptor->bind(endpoint);
363         } catch (const std::exception&) {
364                 Log(LogCritical, "ApiListener")
365                         << "Cannot bind TCP socket for host '" << node << "' on port '" << service << "'.";
366                 return false;
367         }
368
369         acceptor->listen(INT_MAX);
370
371         auto localEndpoint (acceptor->local_endpoint());
372
373         Log(LogInformation, "ApiListener")
374                 << "Started new listener on '[" << localEndpoint.address() << "]:" << localEndpoint.port() << "'";
375
376         asio::spawn(io, [acceptor](asio::yield_context yc) {
377                 // TODO
378         });
379
380         UpdateStatusFile(localEndpoint);
381
382         return true;
383 }
384
385 /**
386  * Creates a new JSON-RPC client and connects to the specified endpoint.
387  *
388  * @param endpoint The endpoint.
389  */
390 void ApiListener::AddConnection(const Endpoint::Ptr& endpoint)
391 {
392         {
393                 ObjectLock olock(this);
394
395                 auto sslContext (m_SSLContext);
396
397                 if (!sslContext) {
398                         Log(LogCritical, "ApiListener", "SSL context is required for AddConnection()");
399                         return;
400                 }
401         }
402
403         String host = endpoint->GetHost();
404         String port = endpoint->GetPort();
405
406         Log(LogInformation, "ApiListener")
407                 << "Reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'";
408
409         TcpSocket::Ptr client = new TcpSocket();
410
411         try {
412                 client->Connect(host, port);
413
414                 NewClientHandler(client, endpoint->GetName(), RoleClient);
415
416                 endpoint->SetConnecting(false);
417                 Log(LogInformation, "ApiListener")
418                                 << "Finished reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'";
419         } catch (const std::exception& ex) {
420                 endpoint->SetConnecting(false);
421                 client->Close();
422
423                 std::ostringstream info;
424                 info << "Cannot connect to host '" << host << "' on port '" << port << "'";
425                 Log(LogCritical, "ApiListener", info.str());
426                 Log(LogDebug, "ApiListener")
427                         << info.str() << "\n" << DiagnosticInformation(ex);
428         }
429 }
430
431 void ApiListener::NewClientHandler(const Socket::Ptr& client, const String& hostname, ConnectionRole role)
432 {
433         try {
434                 NewClientHandlerInternal(client, hostname, role);
435         } catch (const std::exception& ex) {
436                 Log(LogCritical, "ApiListener")
437                         << "Exception while handling new API client connection: " << DiagnosticInformation(ex, false);
438
439                 Log(LogDebug, "ApiListener")
440                         << "Exception while handling new API client connection: " << DiagnosticInformation(ex);
441         }
442 }
443
444 /**
445  * Processes a new client connection.
446  *
447  * @param client The new client.
448  */
449 void ApiListener::NewClientHandlerInternal(const Socket::Ptr& client, const String& hostname, ConnectionRole role)
450 {
451         CONTEXT("Handling new API client connection");
452
453         String conninfo;
454
455         if (role == RoleClient)
456                 conninfo = "to";
457         else
458                 conninfo = "from";
459
460         conninfo += " " + client->GetPeerAddress();
461
462         TlsStream::Ptr tlsStream;
463
464         String environmentName = Application::GetAppEnvironment();
465
466         String serverName = hostname;
467
468         if (!environmentName.IsEmpty())
469                 serverName += ":" + environmentName;
470
471         {
472                 ObjectLock olock(this);
473                 try {
474                         tlsStream = new TlsStream(client, serverName, role, m_SSLContext);
475                 } catch (const std::exception&) {
476                         Log(LogCritical, "ApiListener")
477                                 << "Cannot create TLS stream from client connection (" << conninfo << ")";
478                         return;
479                 }
480         }
481
482         try {
483                 tlsStream->Handshake();
484         } catch (const std::exception& ex) {
485                 Log(LogCritical, "ApiListener")
486                         << "Client TLS handshake failed (" << conninfo << "): " << DiagnosticInformation(ex, false);
487                 tlsStream->Close();
488                 return;
489         }
490
491         std::shared_ptr<X509> cert = tlsStream->GetPeerCertificate();
492         String identity;
493         Endpoint::Ptr endpoint;
494         bool verify_ok = false;
495
496         if (cert) {
497                 try {
498                         identity = GetCertificateCN(cert);
499                 } catch (const std::exception&) {
500                         Log(LogCritical, "ApiListener")
501                                 << "Cannot get certificate common name from cert path: '" << GetDefaultCertPath() << "'.";
502                         tlsStream->Close();
503                         return;
504                 }
505
506                 verify_ok = tlsStream->IsVerifyOK();
507                 if (!hostname.IsEmpty()) {
508                         if (identity != hostname) {
509                                 Log(LogWarning, "ApiListener")
510                                         << "Unexpected certificate common name while connecting to endpoint '"
511                                         << hostname << "': got '" << identity << "'";
512                                 tlsStream->Close();
513                                 return;
514                         } else if (!verify_ok) {
515                                 Log(LogWarning, "ApiListener")
516                                         << "Certificate validation failed for endpoint '" << hostname
517                                         << "': " << tlsStream->GetVerifyError();
518                         }
519                 }
520
521                 if (verify_ok)
522                         endpoint = Endpoint::GetByName(identity);
523
524                 {
525                         Log log(LogInformation, "ApiListener");
526
527                         log << "New client connection for identity '" << identity << "' " << conninfo;
528
529                         if (!verify_ok)
530                                 log << " (certificate validation failed: " << tlsStream->GetVerifyError() << ")";
531                         else if (!endpoint)
532                                 log << " (no Endpoint object found for identity)";
533                 }
534         } else {
535                 Log(LogInformation, "ApiListener")
536                         << "New client connection " << conninfo << " (no client certificate)";
537         }
538
539         ClientType ctype;
540
541         if (role == RoleClient) {
542                 Dictionary::Ptr message = new Dictionary({
543                         { "jsonrpc", "2.0" },
544                         { "method", "icinga::Hello" },
545                         { "params", new Dictionary() }
546                 });
547
548                 JsonRpc::SendMessage(tlsStream, message);
549                 ctype = ClientJsonRpc;
550         } else {
551                 tlsStream->WaitForData(10);
552
553                 if (!tlsStream->IsDataAvailable()) {
554                         if (identity.IsEmpty())
555                                 Log(LogInformation, "ApiListener")
556                                         << "No data received on new API connection. "
557                                         << "Ensure that the remote endpoints are properly configured in a cluster setup.";
558                         else
559                                 Log(LogWarning, "ApiListener")
560                                         << "No data received on new API connection for identity '" << identity << "'. "
561                                         << "Ensure that the remote endpoints are properly configured in a cluster setup.";
562                         tlsStream->Close();
563                         return;
564                 }
565
566                 char firstByte;
567                 tlsStream->Peek(&firstByte, 1, false);
568
569                 if (firstByte >= '0' && firstByte <= '9')
570                         ctype = ClientJsonRpc;
571                 else
572                         ctype = ClientHttp;
573         }
574
575         if (ctype == ClientJsonRpc) {
576                 Log(LogNotice, "ApiListener", "New JSON-RPC client");
577
578                 JsonRpcConnection::Ptr aclient = new JsonRpcConnection(identity, verify_ok, tlsStream, role);
579                 aclient->Start();
580
581                 if (endpoint) {
582                         bool needSync = !endpoint->GetConnected();
583
584                         endpoint->AddClient(aclient);
585
586                         m_SyncQueue.Enqueue(std::bind(&ApiListener::SyncClient, this, aclient, endpoint, needSync));
587                 } else {
588                         if (!AddAnonymousClient(aclient)) {
589                                 Log(LogNotice, "ApiListener")
590                                         << "Ignoring anonymous JSON-RPC connection " << conninfo
591                                         << ". Max connections (" << GetMaxAnonymousClients() << ") exceeded.";
592                                 aclient->Disconnect();
593                         }
594                 }
595         } else {
596                 Log(LogNotice, "ApiListener", "New HTTP client");
597
598                 HttpServerConnection::Ptr aclient = new HttpServerConnection(identity, verify_ok, tlsStream);
599                 aclient->Start();
600                 AddHttpClient(aclient);
601         }
602 }
603
604 void ApiListener::SyncClient(const JsonRpcConnection::Ptr& aclient, const Endpoint::Ptr& endpoint, bool needSync)
605 {
606         Zone::Ptr eZone = endpoint->GetZone();
607
608         try {
609                 {
610                         ObjectLock olock(endpoint);
611
612                         endpoint->SetSyncing(true);
613                 }
614
615                 Zone::Ptr myZone = Zone::GetLocalZone();
616
617                 if (myZone->GetParent() == eZone) {
618                         Log(LogInformation, "ApiListener")
619                                 << "Requesting new certificate for this Icinga instance from endpoint '" << endpoint->GetName() << "'.";
620
621                         JsonRpcConnection::SendCertificateRequest(aclient, nullptr, String());
622
623                         if (Utility::PathExists(ApiListener::GetCertificateRequestsDir()))
624                                 Utility::Glob(ApiListener::GetCertificateRequestsDir() + "/*.json", std::bind(&JsonRpcConnection::SendCertificateRequest, aclient, nullptr, _1), GlobFile);
625                 }
626
627                 /* Make sure that the config updates are synced
628                  * before the logs are replayed.
629                  */
630
631                 Log(LogInformation, "ApiListener")
632                         << "Sending config updates for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
633
634                 /* sync zone file config */
635                 SendConfigUpdate(aclient);
636
637                 Log(LogInformation, "ApiListener")
638                         << "Finished sending config file updates for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
639
640                 /* sync runtime config */
641                 SendRuntimeConfigObjects(aclient);
642
643                 Log(LogInformation, "ApiListener")
644                         << "Finished sending runtime config updates for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
645
646                 if (!needSync) {
647                         ObjectLock olock2(endpoint);
648                         endpoint->SetSyncing(false);
649                         return;
650                 }
651
652                 Log(LogInformation, "ApiListener")
653                         << "Sending replay log for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
654
655                 ReplayLog(aclient);
656
657                 if (eZone == Zone::GetLocalZone())
658                         UpdateObjectAuthority();
659
660                 Log(LogInformation, "ApiListener")
661                         << "Finished sending replay log for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
662         } catch (const std::exception& ex) {
663                 {
664                         ObjectLock olock2(endpoint);
665                         endpoint->SetSyncing(false);
666                 }
667
668                 Log(LogCritical, "ApiListener")
669                         << "Error while syncing endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex, false);
670
671                 Log(LogDebug, "ApiListener")
672                         << "Error while syncing endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex);
673         }
674
675         Log(LogInformation, "ApiListener")
676                 << "Finished syncing endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
677 }
678
679 void ApiListener::ApiTimerHandler()
680 {
681         double now = Utility::GetTime();
682
683         std::vector<int> files;
684         Utility::Glob(GetApiDir() + "log/*", std::bind(&ApiListener::LogGlobHandler, std::ref(files), _1), GlobFile);
685         std::sort(files.begin(), files.end());
686
687         for (int ts : files) {
688                 bool need = false;
689
690                 for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) {
691                         if (endpoint == GetLocalEndpoint())
692                                 continue;
693
694                         if (endpoint->GetLogDuration() >= 0 && ts < now - endpoint->GetLogDuration())
695                                 continue;
696
697                         if (ts > endpoint->GetLocalLogPosition()) {
698                                 need = true;
699                                 break;
700                         }
701                 }
702
703                 if (!need) {
704                         String path = GetApiDir() + "log/" + Convert::ToString(ts);
705                         Log(LogNotice, "ApiListener")
706                                 << "Removing old log file: " << path;
707                         (void)unlink(path.CStr());
708                 }
709         }
710
711         for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) {
712                 if (!endpoint->GetConnected())
713                         continue;
714
715                 double ts = endpoint->GetRemoteLogPosition();
716
717                 if (ts == 0)
718                         continue;
719
720                 Dictionary::Ptr lmessage = new Dictionary({
721                         { "jsonrpc", "2.0" },
722                         { "method", "log::SetLogPosition" },
723                         { "params", new Dictionary({
724                                 { "log_position", ts }
725                         }) }
726                 });
727
728                 double maxTs = 0;
729
730                 for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
731                         if (client->GetTimestamp() > maxTs)
732                                 maxTs = client->GetTimestamp();
733                 }
734
735                 for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
736                         if (client->GetTimestamp() != maxTs)
737                                 client->Disconnect();
738                         else
739                                 client->SendMessage(lmessage);
740                 }
741
742                 Log(LogNotice, "ApiListener")
743                         << "Setting log position for identity '" << endpoint->GetName() << "': "
744                         << Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", ts);
745         }
746 }
747
748 void ApiListener::ApiReconnectTimerHandler()
749 {
750         Zone::Ptr my_zone = Zone::GetLocalZone();
751
752         for (const Zone::Ptr& zone : ConfigType::GetObjectsByType<Zone>()) {
753                 /* don't connect to global zones */
754                 if (zone->GetGlobal())
755                         continue;
756
757                 /* only connect to endpoints in a) the same zone b) our parent zone c) immediate child zones */
758                 if (my_zone != zone && my_zone != zone->GetParent() && zone != my_zone->GetParent()) {
759                         Log(LogDebug, "ApiListener")
760                                 << "Not connecting to Zone '" << zone->GetName()
761                                 << "' because it's not in the same zone, a parent or a child zone.";
762                         continue;
763                 }
764
765                 for (const Endpoint::Ptr& endpoint : zone->GetEndpoints()) {
766                         /* don't connect to ourselves */
767                         if (endpoint == GetLocalEndpoint()) {
768                                 Log(LogDebug, "ApiListener")
769                                         << "Not connecting to Endpoint '" << endpoint->GetName() << "' because that's us.";
770                                 continue;
771                         }
772
773                         /* don't try to connect to endpoints which don't have a host and port */
774                         if (endpoint->GetHost().IsEmpty() || endpoint->GetPort().IsEmpty()) {
775                                 Log(LogDebug, "ApiListener")
776                                         << "Not connecting to Endpoint '" << endpoint->GetName()
777                                         << "' because the host/port attributes are missing.";
778                                 continue;
779                         }
780
781                         /* don't try to connect if there's already a connection attempt */
782                         if (endpoint->GetConnecting()) {
783                                 Log(LogDebug, "ApiListener")
784                                         << "Not connecting to Endpoint '" << endpoint->GetName()
785                                         << "' because we're already trying to connect to it.";
786                                 continue;
787                         }
788
789                         /* don't try to connect if we're already connected */
790                         if (endpoint->GetConnected()) {
791                                 Log(LogDebug, "ApiListener")
792                                         << "Not connecting to Endpoint '" << endpoint->GetName()
793                                         << "' because we're already connected to it.";
794                                 continue;
795                         }
796
797                         /* Set connecting state to prevent duplicated queue inserts later. */
798                         endpoint->SetConnecting(true);
799
800                         /* Use dynamic thread pool with additional on demand resources with fast throughput. */
801                         EnqueueAsyncCallback(std::bind(&ApiListener::AddConnection, this, endpoint), LowLatencyScheduler);
802                 }
803         }
804
805         Endpoint::Ptr master = GetMaster();
806
807         if (master)
808                 Log(LogNotice, "ApiListener")
809                         << "Current zone master: " << master->GetName();
810
811         std::vector<String> names;
812         for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>())
813                 if (endpoint->GetConnected())
814                         names.emplace_back(endpoint->GetName() + " (" + Convert::ToString(endpoint->GetClients().size()) + ")");
815
816         Log(LogNotice, "ApiListener")
817                 << "Connected endpoints: " << Utility::NaturalJoin(names);
818 }
819
820 static void CleanupCertificateRequest(const String& path, double expiryTime)
821 {
822 #ifndef _WIN32
823         struct stat statbuf;
824         if (lstat(path.CStr(), &statbuf) < 0)
825                 return;
826 #else /* _WIN32 */
827         struct _stat statbuf;
828         if (_stat(path.CStr(), &statbuf) < 0)
829                 return;
830 #endif /* _WIN32 */
831
832         if (statbuf.st_mtime < expiryTime)
833                 (void) unlink(path.CStr());
834 }
835
836 void ApiListener::CleanupCertificateRequestsTimerHandler()
837 {
838         String requestsDir = GetCertificateRequestsDir();
839
840         if (Utility::PathExists(requestsDir)) {
841                 /* remove certificate requests that are older than a week */
842                 double expiryTime = Utility::GetTime() - 7 * 24 * 60 * 60;
843                 Utility::Glob(requestsDir + "/*.json", std::bind(&CleanupCertificateRequest, _1, expiryTime), GlobFile);
844         }
845 }
846
847 void ApiListener::RelayMessage(const MessageOrigin::Ptr& origin,
848         const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
849 {
850         if (!IsActive())
851                 return;
852
853         m_RelayQueue.Enqueue(std::bind(&ApiListener::SyncRelayMessage, this, origin, secobj, message, log), PriorityNormal, true);
854 }
855
856 void ApiListener::PersistMessage(const Dictionary::Ptr& message, const ConfigObject::Ptr& secobj)
857 {
858         double ts = message->Get("ts");
859
860         ASSERT(ts != 0);
861
862         Dictionary::Ptr pmessage = new Dictionary();
863         pmessage->Set("timestamp", ts);
864
865         pmessage->Set("message", JsonEncode(message));
866
867         if (secobj) {
868                 Dictionary::Ptr secname = new Dictionary();
869                 secname->Set("type", secobj->GetReflectionType()->GetName());
870                 secname->Set("name", secobj->GetName());
871                 pmessage->Set("secobj", secname);
872         }
873
874         boost::mutex::scoped_lock lock(m_LogLock);
875         if (m_LogFile) {
876                 NetString::WriteStringToStream(m_LogFile, JsonEncode(pmessage));
877                 m_LogMessageCount++;
878                 SetLogMessageTimestamp(ts);
879
880                 if (m_LogMessageCount > 50000) {
881                         CloseLogFile();
882                         RotateLogFile();
883                         OpenLogFile();
884                 }
885         }
886 }
887
888 void ApiListener::SyncSendMessage(const Endpoint::Ptr& endpoint, const Dictionary::Ptr& message)
889 {
890         ObjectLock olock(endpoint);
891
892         if (!endpoint->GetSyncing()) {
893                 Log(LogNotice, "ApiListener")
894                         << "Sending message '" << message->Get("method") << "' to '" << endpoint->GetName() << "'";
895
896                 double maxTs = 0;
897
898                 for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
899                         if (client->GetTimestamp() > maxTs)
900                                 maxTs = client->GetTimestamp();
901                 }
902
903                 for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
904                         if (client->GetTimestamp() != maxTs)
905                                 continue;
906
907                         client->SendMessage(message);
908                 }
909         }
910 }
911
912 bool ApiListener::RelayMessageOne(const Zone::Ptr& targetZone, const MessageOrigin::Ptr& origin, const Dictionary::Ptr& message, const Endpoint::Ptr& currentMaster)
913 {
914         ASSERT(targetZone);
915
916         Zone::Ptr myZone = Zone::GetLocalZone();
917
918         /* only relay the message to a) the same zone, b) the parent zone and c) direct child zones. Exception is a global zone. */
919         if (!targetZone->GetGlobal() &&
920                 targetZone != myZone &&
921                 targetZone != myZone->GetParent() &&
922                 targetZone->GetParent() != myZone) {
923                 return true;
924         }
925
926         Endpoint::Ptr myEndpoint = GetLocalEndpoint();
927
928         std::vector<Endpoint::Ptr> skippedEndpoints;
929
930         bool relayed = false, log_needed = false, log_done = false;
931
932         std::set<Endpoint::Ptr> targetEndpoints;
933
934         if (targetZone->GetGlobal()) {
935                 targetEndpoints = myZone->GetEndpoints();
936
937                 for (const Zone::Ptr& zone : ConfigType::GetObjectsByType<Zone>()) {
938                         /* Fetch immediate child zone members */
939                         if (zone->GetParent() == myZone) {
940                                 std::set<Endpoint::Ptr> endpoints = zone->GetEndpoints();
941                                 targetEndpoints.insert(endpoints.begin(), endpoints.end());
942                         }
943                 }
944         } else {
945                 targetEndpoints = targetZone->GetEndpoints();
946         }
947
948         for (const Endpoint::Ptr& endpoint : targetEndpoints) {
949                 /* don't relay messages to ourselves */
950                 if (endpoint == GetLocalEndpoint())
951                         continue;
952
953                 log_needed = true;
954
955                 /* don't relay messages to disconnected endpoints */
956                 if (!endpoint->GetConnected()) {
957                         if (targetZone == myZone)
958                                 log_done = false;
959
960                         continue;
961                 }
962
963                 log_done = true;
964
965                 /* don't relay the message to the zone through more than one endpoint unless this is our own zone */
966                 if (relayed && targetZone != myZone) {
967                         skippedEndpoints.push_back(endpoint);
968                         continue;
969                 }
970
971                 /* don't relay messages back to the endpoint which we got the message from */
972                 if (origin && origin->FromClient && endpoint == origin->FromClient->GetEndpoint()) {
973                         skippedEndpoints.push_back(endpoint);
974                         continue;
975                 }
976
977                 /* don't relay messages back to the zone which we got the message from */
978                 if (origin && origin->FromZone && targetZone == origin->FromZone) {
979                         skippedEndpoints.push_back(endpoint);
980                         continue;
981                 }
982
983                 /* only relay message to the master if we're not currently the master */
984                 if (currentMaster != myEndpoint && currentMaster != endpoint) {
985                         skippedEndpoints.push_back(endpoint);
986                         continue;
987                 }
988
989                 relayed = true;
990
991                 SyncSendMessage(endpoint, message);
992         }
993
994         if (!skippedEndpoints.empty()) {
995                 double ts = message->Get("ts");
996
997                 for (const Endpoint::Ptr& endpoint : skippedEndpoints)
998                         endpoint->SetLocalLogPosition(ts);
999         }
1000
1001         return !log_needed || log_done;
1002 }
1003
1004 void ApiListener::SyncRelayMessage(const MessageOrigin::Ptr& origin,
1005         const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
1006 {
1007         double ts = Utility::GetTime();
1008         message->Set("ts", ts);
1009
1010         Log(LogNotice, "ApiListener")
1011                 << "Relaying '" << message->Get("method") << "' message";
1012
1013         if (origin && origin->FromZone)
1014                 message->Set("originZone", origin->FromZone->GetName());
1015
1016         Zone::Ptr target_zone;
1017
1018         if (secobj) {
1019                 if (secobj->GetReflectionType() == Zone::TypeInstance)
1020                         target_zone = static_pointer_cast<Zone>(secobj);
1021                 else
1022                         target_zone = static_pointer_cast<Zone>(secobj->GetZone());
1023         }
1024
1025         if (!target_zone)
1026                 target_zone = Zone::GetLocalZone();
1027
1028         Endpoint::Ptr master = GetMaster();
1029
1030         bool need_log = !RelayMessageOne(target_zone, origin, message, master);
1031
1032         for (const Zone::Ptr& zone : target_zone->GetAllParentsRaw()) {
1033                 if (!RelayMessageOne(zone, origin, message, master))
1034                         need_log = true;
1035         }
1036
1037         if (log && need_log)
1038                 PersistMessage(message, secobj);
1039 }
1040
1041 /* must hold m_LogLock */
1042 void ApiListener::OpenLogFile()
1043 {
1044         String path = GetApiDir() + "log/current";
1045
1046         Utility::MkDirP(Utility::DirName(path), 0750);
1047
1048         auto *fp = new std::fstream(path.CStr(), std::fstream::out | std::ofstream::app);
1049
1050         if (!fp->good()) {
1051                 Log(LogWarning, "ApiListener")
1052                         << "Could not open spool file: " << path;
1053                 return;
1054         }
1055
1056         m_LogFile = new StdioStream(fp, true);
1057         m_LogMessageCount = 0;
1058         SetLogMessageTimestamp(Utility::GetTime());
1059 }
1060
1061 /* must hold m_LogLock */
1062 void ApiListener::CloseLogFile()
1063 {
1064         if (!m_LogFile)
1065                 return;
1066
1067         m_LogFile->Close();
1068         m_LogFile.reset();
1069 }
1070
1071 /* must hold m_LogLock */
1072 void ApiListener::RotateLogFile()
1073 {
1074         double ts = GetLogMessageTimestamp();
1075
1076         if (ts == 0)
1077                 ts = Utility::GetTime();
1078
1079         String oldpath = GetApiDir() + "log/current";
1080         String newpath = GetApiDir() + "log/" + Convert::ToString(static_cast<int>(ts)+1);
1081
1082
1083 #ifdef _WIN32
1084         _unlink(newpath.CStr());
1085 #endif /* _WIN32 */
1086
1087
1088         (void) rename(oldpath.CStr(), newpath.CStr());
1089 }
1090
1091 void ApiListener::LogGlobHandler(std::vector<int>& files, const String& file)
1092 {
1093         String name = Utility::BaseName(file);
1094
1095         if (name == "current")
1096                 return;
1097
1098         int ts;
1099
1100         try {
1101                 ts = Convert::ToLong(name);
1102         } catch (const std::exception&) {
1103                 return;
1104         }
1105
1106         files.push_back(ts);
1107 }
1108
1109 void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
1110 {
1111         Endpoint::Ptr endpoint = client->GetEndpoint();
1112
1113         if (endpoint->GetLogDuration() == 0) {
1114                 ObjectLock olock2(endpoint);
1115                 endpoint->SetSyncing(false);
1116                 return;
1117         }
1118
1119         CONTEXT("Replaying log for Endpoint '" + endpoint->GetName() + "'");
1120
1121         int count = -1;
1122         double peer_ts = endpoint->GetLocalLogPosition();
1123         double logpos_ts = peer_ts;
1124         bool last_sync = false;
1125
1126         Endpoint::Ptr target_endpoint = client->GetEndpoint();
1127         ASSERT(target_endpoint);
1128
1129         Zone::Ptr target_zone = target_endpoint->GetZone();
1130
1131         if (!target_zone) {
1132                 ObjectLock olock2(endpoint);
1133                 endpoint->SetSyncing(false);
1134                 return;
1135         }
1136
1137         for (;;) {
1138                 boost::mutex::scoped_lock lock(m_LogLock);
1139
1140                 CloseLogFile();
1141                 RotateLogFile();
1142
1143                 if (count == -1 || count > 50000) {
1144                         OpenLogFile();
1145                         lock.unlock();
1146                 } else {
1147                         last_sync = true;
1148                 }
1149
1150                 count = 0;
1151
1152                 std::vector<int> files;
1153                 Utility::Glob(GetApiDir() + "log/*", std::bind(&ApiListener::LogGlobHandler, std::ref(files), _1), GlobFile);
1154                 std::sort(files.begin(), files.end());
1155
1156                 for (int ts : files) {
1157                         String path = GetApiDir() + "log/" + Convert::ToString(ts);
1158
1159                         if (ts < peer_ts)
1160                                 continue;
1161
1162                         Log(LogNotice, "ApiListener")
1163                                 << "Replaying log: " << path;
1164
1165                         auto *fp = new std::fstream(path.CStr(), std::fstream::in | std::fstream::binary);
1166                         StdioStream::Ptr logStream = new StdioStream(fp, true);
1167
1168                         String message;
1169                         StreamReadContext src;
1170                         while (true) {
1171                                 Dictionary::Ptr pmessage;
1172
1173                                 try {
1174                                         StreamReadStatus srs = NetString::ReadStringFromStream(logStream, &message, src);
1175
1176                                         if (srs == StatusEof)
1177                                                 break;
1178
1179                                         if (srs != StatusNewItem)
1180                                                 continue;
1181
1182                                         pmessage = JsonDecode(message);
1183                                 } catch (const std::exception&) {
1184                                         Log(LogWarning, "ApiListener")
1185                                                 << "Unexpected end-of-file for cluster log: " << path;
1186
1187                                         /* Log files may be incomplete or corrupted. This is perfectly OK. */
1188                                         break;
1189                                 }
1190
1191                                 if (pmessage->Get("timestamp") <= peer_ts)
1192                                         continue;
1193
1194                                 Dictionary::Ptr secname = pmessage->Get("secobj");
1195
1196                                 if (secname) {
1197                                         ConfigObject::Ptr secobj = ConfigObject::GetObject(secname->Get("type"), secname->Get("name"));
1198
1199                                         if (!secobj)
1200                                                 continue;
1201
1202                                         if (!target_zone->CanAccessObject(secobj))
1203                                                 continue;
1204                                 }
1205
1206                                 try  {
1207                                         size_t bytesSent = NetString::WriteStringToStream(client->GetStream(), pmessage->Get("message"));
1208                                         endpoint->AddMessageSent(bytesSent);
1209                                         count++;
1210                                 } catch (const std::exception& ex) {
1211                                         Log(LogWarning, "ApiListener")
1212                                                 << "Error while replaying log for endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex, false);
1213
1214                                         Log(LogDebug, "ApiListener")
1215                                                 << "Error while replaying log for endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex);
1216
1217                                         break;
1218                                 }
1219
1220                                 peer_ts = pmessage->Get("timestamp");
1221
1222                                 if (ts > logpos_ts + 10) {
1223                                         logpos_ts = ts;
1224
1225                                         Dictionary::Ptr lmessage = new Dictionary({
1226                                                 { "jsonrpc", "2.0" },
1227                                                 { "method", "log::SetLogPosition" },
1228                                                 { "params", new Dictionary({
1229                                                         { "log_position", logpos_ts }
1230                                                 }) }
1231                                         });
1232
1233                                         size_t bytesSent = JsonRpc::SendMessage(client->GetStream(), lmessage);
1234                                         endpoint->AddMessageSent(bytesSent);
1235                                 }
1236                         }
1237
1238                         logStream->Close();
1239                 }
1240
1241                 if (count > 0) {
1242                         Log(LogInformation, "ApiListener")
1243                                 << "Replayed " << count << " messages.";
1244                 }
1245                 else {
1246                         Log(LogNotice, "ApiListener")
1247                                 << "Replayed " << count << " messages.";
1248                 }
1249
1250                 if (last_sync) {
1251                         {
1252                                 ObjectLock olock2(endpoint);
1253                                 endpoint->SetSyncing(false);
1254                         }
1255
1256                         OpenLogFile();
1257
1258                         break;
1259                 }
1260         }
1261 }
1262
1263 void ApiListener::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
1264 {
1265         std::pair<Dictionary::Ptr, Dictionary::Ptr> stats;
1266
1267         ApiListener::Ptr listener = ApiListener::GetInstance();
1268
1269         if (!listener)
1270                 return;
1271
1272         stats = listener->GetStatus();
1273
1274         ObjectLock olock(stats.second);
1275         for (const Dictionary::Pair& kv : stats.second)
1276                 perfdata->Add(new PerfdataValue("api_" + kv.first, kv.second));
1277
1278         status->Set("api", stats.first);
1279 }
1280
1281 std::pair<Dictionary::Ptr, Dictionary::Ptr> ApiListener::GetStatus()
1282 {
1283         Dictionary::Ptr perfdata = new Dictionary();
1284
1285         /* cluster stats */
1286
1287         double allEndpoints = 0;
1288         Array::Ptr allNotConnectedEndpoints = new Array();
1289         Array::Ptr allConnectedEndpoints = new Array();
1290
1291         Zone::Ptr my_zone = Zone::GetLocalZone();
1292
1293         Dictionary::Ptr connectedZones = new Dictionary();
1294
1295         for (const Zone::Ptr& zone : ConfigType::GetObjectsByType<Zone>()) {
1296                 /* only check endpoints in a) the same zone b) our parent zone c) immediate child zones */
1297                 if (my_zone != zone && my_zone != zone->GetParent() && zone != my_zone->GetParent()) {
1298                         Log(LogDebug, "ApiListener")
1299                                 << "Not checking connection to Zone '" << zone->GetName() << "' because it's not in the same zone, a parent or a child zone.";
1300                         continue;
1301                 }
1302
1303                 bool zoneConnected = false;
1304                 int countZoneEndpoints = 0;
1305                 double zoneLag = 0;
1306
1307                 ArrayData zoneEndpoints;
1308
1309                 for (const Endpoint::Ptr& endpoint : zone->GetEndpoints()) {
1310                         zoneEndpoints.emplace_back(endpoint->GetName());
1311
1312                         if (endpoint->GetName() == GetIdentity())
1313                                 continue;
1314
1315                         double eplag = CalculateZoneLag(endpoint);
1316
1317                         if (eplag > 0 && eplag > zoneLag)
1318                                 zoneLag = eplag;
1319
1320                         allEndpoints++;
1321                         countZoneEndpoints++;
1322
1323                         if (!endpoint->GetConnected()) {
1324                                 allNotConnectedEndpoints->Add(endpoint->GetName());
1325                         } else {
1326                                 allConnectedEndpoints->Add(endpoint->GetName());
1327                                 zoneConnected = true;
1328                         }
1329                 }
1330
1331                 /* if there's only one endpoint inside the zone, we're not connected - that's us, fake it */
1332                 if (zone->GetEndpoints().size() == 1 && countZoneEndpoints == 0)
1333                         zoneConnected = true;
1334
1335                 String parentZoneName;
1336                 Zone::Ptr parentZone = zone->GetParent();
1337                 if (parentZone)
1338                         parentZoneName = parentZone->GetName();
1339
1340                 Dictionary::Ptr zoneStats = new Dictionary({
1341                         { "connected", zoneConnected },
1342                         { "client_log_lag", zoneLag },
1343                         { "endpoints", new Array(std::move(zoneEndpoints)) },
1344                         { "parent_zone", parentZoneName }
1345                 });
1346
1347                 connectedZones->Set(zone->GetName(), zoneStats);
1348         }
1349
1350         /* connection stats */
1351         size_t jsonRpcAnonymousClients = GetAnonymousClients().size();
1352         size_t httpClients = GetHttpClients().size();
1353         size_t workQueueItems = JsonRpcConnection::GetWorkQueueLength();
1354         size_t workQueueCount = JsonRpcConnection::GetWorkQueueCount();
1355         size_t syncQueueItems = m_SyncQueue.GetLength();
1356         size_t relayQueueItems = m_RelayQueue.GetLength();
1357         double workQueueItemRate = JsonRpcConnection::GetWorkQueueRate();
1358         double syncQueueItemRate = m_SyncQueue.GetTaskCount(60) / 60.0;
1359         double relayQueueItemRate = m_RelayQueue.GetTaskCount(60) / 60.0;
1360
1361         Dictionary::Ptr status = new Dictionary({
1362                 { "identity", GetIdentity() },
1363                 { "num_endpoints", allEndpoints },
1364                 { "num_conn_endpoints", allConnectedEndpoints->GetLength() },
1365                 { "num_not_conn_endpoints", allNotConnectedEndpoints->GetLength() },
1366                 { "conn_endpoints", allConnectedEndpoints },
1367                 { "not_conn_endpoints", allNotConnectedEndpoints },
1368
1369                 { "zones", connectedZones },
1370
1371                 { "json_rpc", new Dictionary({
1372                         { "anonymous_clients", jsonRpcAnonymousClients },
1373                         { "work_queue_items", workQueueItems },
1374                         { "work_queue_count", workQueueCount },
1375                         { "sync_queue_items", syncQueueItems },
1376                         { "relay_queue_items", relayQueueItems },
1377                         { "work_queue_item_rate", workQueueItemRate },
1378                         { "sync_queue_item_rate", syncQueueItemRate },
1379                         { "relay_queue_item_rate", relayQueueItemRate }
1380                 }) },
1381
1382                 { "http", new Dictionary({
1383                         { "clients", httpClients }
1384                 }) }
1385         });
1386
1387         /* performance data */
1388         perfdata->Set("num_endpoints", allEndpoints);
1389         perfdata->Set("num_conn_endpoints", Convert::ToDouble(allConnectedEndpoints->GetLength()));
1390         perfdata->Set("num_not_conn_endpoints", Convert::ToDouble(allNotConnectedEndpoints->GetLength()));
1391
1392         perfdata->Set("num_json_rpc_anonymous_clients", jsonRpcAnonymousClients);
1393         perfdata->Set("num_http_clients", httpClients);
1394         perfdata->Set("num_json_rpc_work_queue_items", workQueueItems);
1395         perfdata->Set("num_json_rpc_work_queue_count", workQueueCount);
1396         perfdata->Set("num_json_rpc_sync_queue_items", syncQueueItems);
1397         perfdata->Set("num_json_rpc_relay_queue_items", relayQueueItems);
1398
1399         perfdata->Set("num_json_rpc_work_queue_item_rate", workQueueItemRate);
1400         perfdata->Set("num_json_rpc_sync_queue_item_rate", syncQueueItemRate);
1401         perfdata->Set("num_json_rpc_relay_queue_item_rate", relayQueueItemRate);
1402
1403         return std::make_pair(status, perfdata);
1404 }
1405
1406 double ApiListener::CalculateZoneLag(const Endpoint::Ptr& endpoint)
1407 {
1408         double remoteLogPosition = endpoint->GetRemoteLogPosition();
1409         double eplag = Utility::GetTime() - remoteLogPosition;
1410
1411         if ((endpoint->GetSyncing() || !endpoint->GetConnected()) && remoteLogPosition != 0)
1412                 return eplag;
1413
1414         return 0;
1415 }
1416
1417 bool ApiListener::AddAnonymousClient(const JsonRpcConnection::Ptr& aclient)
1418 {
1419         boost::mutex::scoped_lock lock(m_AnonymousClientsLock);
1420
1421         if (GetMaxAnonymousClients() >= 0 && (long)m_AnonymousClients.size() + 1 > (long)GetMaxAnonymousClients())
1422                 return false;
1423
1424         m_AnonymousClients.insert(aclient);
1425         return true;
1426 }
1427
1428 void ApiListener::RemoveAnonymousClient(const JsonRpcConnection::Ptr& aclient)
1429 {
1430         boost::mutex::scoped_lock lock(m_AnonymousClientsLock);
1431         m_AnonymousClients.erase(aclient);
1432 }
1433
1434 std::set<JsonRpcConnection::Ptr> ApiListener::GetAnonymousClients() const
1435 {
1436         boost::mutex::scoped_lock lock(m_AnonymousClientsLock);
1437         return m_AnonymousClients;
1438 }
1439
1440 void ApiListener::AddHttpClient(const HttpServerConnection::Ptr& aclient)
1441 {
1442         boost::mutex::scoped_lock lock(m_HttpClientsLock);
1443         m_HttpClients.insert(aclient);
1444 }
1445
1446 void ApiListener::RemoveHttpClient(const HttpServerConnection::Ptr& aclient)
1447 {
1448         boost::mutex::scoped_lock lock(m_HttpClientsLock);
1449         m_HttpClients.erase(aclient);
1450 }
1451
1452 std::set<HttpServerConnection::Ptr> ApiListener::GetHttpClients() const
1453 {
1454         boost::mutex::scoped_lock lock(m_HttpClientsLock);
1455         return m_HttpClients;
1456 }
1457
1458 Value ApiListener::HelloAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params)
1459 {
1460         return Empty;
1461 }
1462
1463 Endpoint::Ptr ApiListener::GetLocalEndpoint() const
1464 {
1465         return m_LocalEndpoint;
1466 }
1467
1468 void ApiListener::ValidateTlsProtocolmin(const Lazy<String>& lvalue, const ValidationUtils& utils)
1469 {
1470         ObjectImpl<ApiListener>::ValidateTlsProtocolmin(lvalue, utils);
1471
1472         if (lvalue() != SSL_TXT_TLSV1
1473 #ifdef SSL_TXT_TLSV1_1
1474                 && lvalue() != SSL_TXT_TLSV1_1 &&
1475                 lvalue() != SSL_TXT_TLSV1_2
1476 #endif /* SSL_TXT_TLSV1_1 */
1477                 ) {
1478                 String message = "Invalid TLS version. Must be one of '" SSL_TXT_TLSV1 "'";
1479 #ifdef SSL_TXT_TLSV1_1
1480                 message += ", '" SSL_TXT_TLSV1_1 "' or '" SSL_TXT_TLSV1_2 "'";
1481 #endif /* SSL_TXT_TLSV1_1 */
1482
1483                 BOOST_THROW_EXCEPTION(ValidationError(this, { "tls_protocolmin" }, message));
1484         }
1485 }
1486
1487 void ApiListener::ValidateTlsHandshakeTimeout(const Lazy<double>& lvalue, const ValidationUtils& utils)
1488 {
1489         ObjectImpl<ApiListener>::ValidateTlsHandshakeTimeout(lvalue, utils);
1490
1491         if (lvalue() <= 0)
1492                 BOOST_THROW_EXCEPTION(ValidationError(this, { "tls_handshake_timeout" }, "Value must be greater than 0."));
1493 }
1494
1495 bool ApiListener::IsHACluster()
1496 {
1497         Zone::Ptr zone = Zone::GetLocalZone();
1498
1499         if (!zone)
1500                 return false;
1501
1502         return zone->IsSingleInstance();
1503 }
1504
1505 /* Provide a helper function for zone origin name. */
1506 String ApiListener::GetFromZoneName(const Zone::Ptr& fromZone)
1507 {
1508         String fromZoneName;
1509
1510         if (fromZone) {
1511                 fromZoneName = fromZone->GetName();
1512         } else {
1513                 Zone::Ptr lzone = Zone::GetLocalZone();
1514
1515                 if (lzone)
1516                         fromZoneName = lzone->GetName();
1517         }
1518
1519         return fromZoneName;
1520 }
1521
1522 void ApiListener::UpdateStatusFile(boost::asio::ip::tcp::endpoint localEndpoint)
1523 {
1524         String path = Configuration::CacheDir + "/api-state.json";
1525
1526         Utility::SaveJsonFile(path, 0644, new Dictionary({
1527                 {"host", String(localEndpoint.address().to_string())},
1528                 {"port", localEndpoint.port()}
1529         }));
1530 }
1531
1532 void ApiListener::RemoveStatusFile()
1533 {
1534         String path = Configuration::CacheDir + "/api-state.json";
1535
1536         if (Utility::PathExists(path)) {
1537                 if (unlink(path.CStr()) < 0 && errno != ENOENT) {
1538                         BOOST_THROW_EXCEPTION(posix_error()
1539                                 << boost::errinfo_api_function("unlink")
1540                                 << boost::errinfo_errno(errno)
1541                                 << boost::errinfo_file_name(path));
1542                 }
1543         }
1544 }